Skip to main content

commonware_sync/databases/
immutable.rs

1//! Immutable database types and helpers for the sync example.
2
3use crate::{Hasher, Key, Translator, Value};
4use commonware_cryptography::{Hasher as CryptoHasher, Sha256};
5use commonware_parallel::Sequential;
6use commonware_runtime::{BufferPooler, Clock, Metrics, Storage};
7use commonware_storage::{
8    journal::contiguous::fixed::Config as FConfig,
9    merkle::{
10        full::Config as MmrConfig,
11        mmr::{self, Location, Proof},
12    },
13    qmdb::{
14        self,
15        immutable::{fixed, Config},
16        sync::compact,
17    },
18};
19use commonware_utils::{NZUsize, NZU16, NZU64};
20use std::{future::Future, num::NonZeroU64};
21use tracing::error;
22
23/// Database type alias.
24pub type Database<E> = fixed::Db<mmr::Family, E, Key, Value, Hasher, Translator, Sequential>;
25
26/// Operation type alias.
27pub type Operation = fixed::Operation<mmr::Family, Key, Value>;
28
29/// Create a database configuration with appropriate partitioning for Immutable.
30pub fn create_config(context: &impl BufferPooler) -> Config<Translator, FConfig, Sequential> {
31    let page_cache = commonware_runtime::buffer::paged::CacheRef::from_pooler(
32        context,
33        NZU16!(2048),
34        NZUsize!(10),
35    );
36    Config {
37        merkle_config: MmrConfig {
38            journal_partition: "mmr-journal".into(),
39            metadata_partition: "mmr-metadata".into(),
40            items_per_blob: NZU64!(4096),
41            write_buffer: NZUsize!(4096),
42            strategy: Sequential,
43            page_cache: page_cache.clone(),
44        },
45        log: FConfig {
46            partition: "log".into(),
47            items_per_blob: NZU64!(4096),
48            write_buffer: NZUsize!(4096),
49            page_cache,
50        },
51        translator: commonware_storage::translator::EightCap,
52    }
53}
54
55/// Create deterministic test operations for demonstration purposes.
56///
57/// Generates Set operations and periodic Commit operations. Every commit in the stream
58/// carries `starting_loc` as its inactivity floor. Pass `0` for a fresh db; for growth, pass
59/// the live db's [`super::ExampleDatabase::current_floor`] so floors stay monotonic.
60pub fn create_test_operations(count: usize, seed: u64, starting_loc: u64) -> Vec<Operation> {
61    let mut operations = Vec::new();
62    let mut hasher = <Hasher as CryptoHasher>::new();
63    let floor = Location::new(starting_loc);
64
65    for i in 0..count {
66        let key = {
67            hasher.update(&i.to_be_bytes());
68            hasher.update(&seed.to_be_bytes());
69            hasher.finalize()
70        };
71
72        let value = {
73            hasher.update(&key);
74            hasher.update(b"value");
75            hasher.finalize()
76        };
77
78        operations.push(Operation::Set(key, value));
79
80        if (i + 1) % 10 == 0 {
81            operations.push(Operation::Commit(None, floor));
82        }
83    }
84
85    // Always end with a commit
86    operations.push(Operation::Commit(Some(Sha256::fill(1)), floor));
87    operations
88}
89
90impl<E> super::ExampleDatabase for Database<E>
91where
92    E: Storage + Clock + Metrics,
93{
94    type Family = mmr::Family;
95    type Operation = Operation;
96
97    fn create_test_operations(count: usize, seed: u64, starting_loc: u64) -> Vec<Self::Operation> {
98        create_test_operations(count, seed, starting_loc)
99    }
100
101    async fn add_operations(
102        &mut self,
103        operations: Vec<Self::Operation>,
104    ) -> Result<(), commonware_storage::qmdb::Error<mmr::Family>> {
105        if operations.last().is_none() || !operations.last().unwrap().is_commit() {
106            // Ignore bad inputs rather than return errors.
107            error!("operations must end with a commit");
108            return Ok(());
109        }
110
111        let mut batch = self.new_batch();
112        for operation in operations {
113            match operation {
114                Operation::Set(key, value) => {
115                    batch = batch.set(key, value);
116                }
117                Operation::Commit(metadata, floor) => {
118                    let merkleized = batch.merkleize(self, metadata, floor);
119                    self.apply_batch(merkleized).await?;
120                    self.commit().await?;
121                    batch = self.new_batch();
122                }
123            }
124        }
125        Ok(())
126    }
127
128    fn current_floor(&self) -> u64 {
129        *self.inactivity_floor_loc()
130    }
131
132    fn root(&self) -> Key {
133        self.root()
134    }
135
136    fn name() -> &'static str {
137        "immutable"
138    }
139}
140
141impl<E> super::Syncable for Database<E>
142where
143    E: Storage + Clock + Metrics,
144{
145    async fn size(&self) -> Location {
146        self.bounds().await.end
147    }
148
149    async fn sync_boundary(&self) -> Location {
150        self.sync_boundary()
151    }
152
153    fn historical_proof(
154        &self,
155        op_count: Location,
156        start_loc: Location,
157        max_ops: NonZeroU64,
158    ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
159    {
160        self.historical_proof(op_count, start_loc, max_ops)
161    }
162
163    fn pinned_nodes_at(
164        &self,
165        loc: Location,
166    ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
167        self.pinned_nodes_at(loc)
168    }
169}
170
171impl<E> super::CompactSyncable for Database<E>
172where
173    E: Storage + Clock + Metrics,
174{
175    async fn current_target(&self) -> compact::Target<Self::Family, Key> {
176        compact::Target::new(self.root(), self.bounds().await.end)
177    }
178}