Skip to main content

commonware_storage/qmdb/immutable/
batch.rs

1//! Batch mutation API for Immutable QMDBs.
2
3use super::Immutable;
4use crate::{
5    journal::{authenticated, contiguous::Mutable, Error as JournalError},
6    merkle::{Family, Location},
7    qmdb::{any::ValueEncoding, immutable::operation::Operation, operation::Key, Error},
8    translator::Translator,
9    Context, Persistable,
10};
11use commonware_codec::EncodeShared;
12use commonware_cryptography::{Digest, Hasher as CHasher};
13use core::iter;
14use std::{
15    collections::BTreeMap,
16    sync::{Arc, Weak},
17};
18
19/// What happened to a key in this batch.
20#[derive(Clone)]
21pub(crate) struct DiffEntry<F: Family, V> {
22    pub(crate) value: V,
23    pub(crate) loc: Location<F>,
24}
25
26/// A speculative batch of operations whose root digest has not yet been computed, in contrast
27/// to [`MerkleizedBatch`].
28///
29/// Consuming [`UnmerkleizedBatch::merkleize`] produces an `Arc<MerkleizedBatch>`.
30/// Methods that need the committed DB (e.g. [`get`](Self::get)) accept it as a parameter.
31#[allow(clippy::type_complexity)]
32pub struct UnmerkleizedBatch<F, H, K, V>
33where
34    F: Family,
35    K: Key,
36    V: ValueEncoding,
37    H: CHasher,
38{
39    /// Authenticated journal batch for computing the speculative Merkle root.
40    journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<K, V>>,
41
42    /// Pending mutations.
43    mutations: BTreeMap<K, V::Value>,
44
45    /// Parent batch in the chain. `None` for batches created directly from the DB.
46    parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V>>>,
47
48    /// Total operation count before this batch (committed DB + prior batches).
49    /// This batch's i-th operation lands at location `base_size + i`.
50    base_size: u64,
51
52    /// The database size when this batch was created, used to detect stale batches.
53    db_size: u64,
54}
55
56/// A speculative batch of operations whose root digest has been computed,
57/// in contrast to [`UnmerkleizedBatch`].
58#[derive(Clone)]
59pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding> {
60    /// Authenticated journal batch (Merkle state + local items).
61    pub(super) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<K, V>>>,
62
63    /// This batch's local key-level changes only (not accumulated from ancestors).
64    pub(super) diff: Arc<BTreeMap<K, DiffEntry<F, V::Value>>>,
65
66    /// The parent batch in the chain, if any.
67    pub(super) parent: Option<Weak<Self>>,
68
69    /// Total operations before this batch's own ops (DB + ancestor batches).
70    pub(super) base_size: u64,
71
72    /// Total operation count after this batch.
73    pub(super) total_size: u64,
74
75    /// The database size when the initial batch was created.
76    pub(super) db_size: u64,
77
78    /// Arc refs to each ancestor's diff, collected during `merkleize()` while the parent
79    /// is alive. Used by `apply_batch` to apply uncommitted ancestor snapshot diffs.
80    /// 1:1 with `ancestor_diff_ends` (same length, same ordering).
81    #[allow(clippy::type_complexity)]
82    pub(super) ancestor_diffs: Vec<Arc<BTreeMap<K, DiffEntry<F, V::Value>>>>,
83
84    /// Each ancestor's `total_size` (operation count after that ancestor).
85    /// 1:1 with `ancestor_diffs`: `ancestor_diff_ends[i]` is the boundary for
86    /// `ancestor_diffs[i]`. A batch is committed when `ancestor_diff_ends[i] <= db_size`.
87    pub(super) ancestor_diff_ends: Vec<u64>,
88}
89
90impl<F, H, K, V> UnmerkleizedBatch<F, H, K, V>
91where
92    F: Family,
93    K: Key,
94    V: ValueEncoding,
95    H: CHasher,
96    Operation<K, V>: EncodeShared,
97{
98    /// Create a batch from a committed DB (no parent chain).
99    pub(super) fn new<E, C, T>(
100        immutable: &Immutable<F, E, K, V, C, H, T>,
101        journal_size: u64,
102    ) -> Self
103    where
104        E: Context,
105        C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
106        C::Item: EncodeShared,
107        T: Translator,
108    {
109        Self {
110            journal_batch: immutable.journal.new_batch(),
111            mutations: BTreeMap::new(),
112            parent: None,
113            base_size: journal_size,
114            db_size: journal_size,
115        }
116    }
117
118    /// Set a key to a value.
119    ///
120    /// The key must not already exist in the database or in any ancestor batch
121    /// in the chain. Setting a key that already exists causes undefined behavior.
122    pub fn set(mut self, key: K, value: V::Value) -> Self {
123        self.mutations.insert(key, value);
124        self
125    }
126
127    /// Read through: mutations -> ancestor diffs -> committed DB.
128    pub async fn get<E, C, T>(
129        &self,
130        key: &K,
131        db: &Immutable<F, E, K, V, C, H, T>,
132    ) -> Result<Option<V::Value>, Error<F>>
133    where
134        E: Context,
135        C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
136        C::Item: EncodeShared,
137        T: Translator,
138    {
139        // Check this batch's pending mutations.
140        if let Some(value) = self.mutations.get(key) {
141            return Ok(Some(value.clone()));
142        }
143        // Walk parent chain. The first parent is a strong Arc (held by UnmerkleizedBatch),
144        // subsequent parents are Weak refs.
145        if let Some(parent) = self.parent.as_ref() {
146            if let Some(entry) = parent.diff.get(key) {
147                return Ok(Some(entry.value.clone()));
148            }
149            for batch in parent.ancestors() {
150                if let Some(entry) = batch.diff.get(key) {
151                    return Ok(Some(entry.value.clone()));
152                }
153            }
154        }
155        // Fall through to base DB.
156        db.get(key).await
157    }
158
159    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
160    pub fn merkleize<E, C, T>(
161        self,
162        db: &Immutable<F, E, K, V, C, H, T>,
163        metadata: Option<V::Value>,
164    ) -> Arc<MerkleizedBatch<F, H::Digest, K, V>>
165    where
166        E: Context,
167        C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
168        C::Item: EncodeShared,
169        T: Translator,
170    {
171        let base = self.base_size;
172
173        // Build operations: one Set per key (BTreeMap iterates in sorted order), then Commit.
174        let mut ops: Vec<Operation<K, V>> = Vec::with_capacity(self.mutations.len() + 1);
175        let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
176
177        for (key, value) in self.mutations {
178            let loc = Location::new(base + ops.len() as u64);
179            ops.push(Operation::Set(key.clone(), value.clone()));
180            diff.insert(key, DiffEntry { value, loc });
181        }
182
183        ops.push(Operation::Commit(metadata));
184
185        let total_size = base + ops.len() as u64;
186
187        // Add operations to the journal batch and merkleize.
188        let mut journal_batch = self.journal_batch;
189        for op in &ops {
190            journal_batch = journal_batch.add(op.clone());
191        }
192        let journal_merkleized = db.journal.with_mem(|mem| journal_batch.merkleize(mem));
193
194        let mut ancestor_diffs = Vec::new();
195        let mut ancestor_diff_ends = Vec::new();
196        if let Some(parent) = &self.parent {
197            ancestor_diffs.push(Arc::clone(&parent.diff));
198            ancestor_diff_ends.push(parent.total_size);
199            for batch in parent.ancestors() {
200                ancestor_diffs.push(Arc::clone(&batch.diff));
201                ancestor_diff_ends.push(batch.total_size);
202            }
203        }
204
205        Arc::new(MerkleizedBatch {
206            journal_batch: journal_merkleized,
207            diff: Arc::new(diff),
208            parent: self.parent.as_ref().map(Arc::downgrade),
209            base_size: self.base_size,
210            total_size,
211            db_size: self.db_size,
212            ancestor_diffs,
213            ancestor_diff_ends,
214        })
215    }
216}
217
218impl<F: Family, D: Digest, K: Key, V: ValueEncoding> MerkleizedBatch<F, D, K, V>
219where
220    Operation<K, V>: EncodeShared,
221{
222    /// Return the speculative root.
223    pub fn root(&self) -> D {
224        self.journal_batch.root()
225    }
226
227    /// Iterate over ancestor batches (parent first, then grandparent, etc.).
228    pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
229        let mut next = self.parent.as_ref().and_then(Weak::upgrade);
230        iter::from_fn(move || {
231            let batch = next.take()?;
232            next = batch.parent.as_ref().and_then(Weak::upgrade);
233            Some(batch)
234        })
235    }
236
237    /// Read through: local diff -> ancestor diffs -> committed DB.
238    pub async fn get<E, C, H, T>(
239        &self,
240        key: &K,
241        db: &Immutable<F, E, K, V, C, H, T>,
242    ) -> Result<Option<V::Value>, Error<F>>
243    where
244        E: Context,
245        C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
246        C::Item: EncodeShared,
247        H: CHasher<Digest = D>,
248        T: Translator,
249    {
250        if let Some(entry) = self.diff.get(key) {
251            return Ok(Some(entry.value.clone()));
252        }
253        for batch in self.ancestors() {
254            if let Some(entry) = batch.diff.get(key) {
255                return Ok(Some(entry.value.clone()));
256            }
257        }
258        db.get(key).await
259    }
260
261    /// Create a new speculative batch of operations with this batch as its parent.
262    ///
263    /// All uncommitted ancestors in the chain must be kept alive until the child (or any
264    /// descendant) is merkleized. Dropping an uncommitted ancestor causes data
265    /// loss detected at `apply_batch` time.
266    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V>
267    where
268        H: CHasher<Digest = D>,
269    {
270        UnmerkleizedBatch {
271            journal_batch: self.journal_batch.new_batch::<H>(),
272            mutations: BTreeMap::new(),
273            parent: Some(Arc::clone(self)),
274            base_size: self.total_size,
275            db_size: self.db_size,
276        }
277    }
278}
279
280impl<F, E, K, V, C, H, T> Immutable<F, E, K, V, C, H, T>
281where
282    F: Family,
283    E: Context,
284    K: Key,
285    V: ValueEncoding,
286    C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
287    C::Item: EncodeShared,
288    H: CHasher,
289    T: Translator,
290{
291    /// Create an initial [`MerkleizedBatch`] from the committed DB state.
292    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V>> {
293        let journal_size = *self.last_commit_loc + 1;
294        Arc::new(MerkleizedBatch {
295            journal_batch: self.journal.to_merkleized_batch(),
296            diff: Arc::new(BTreeMap::new()),
297            parent: None,
298            base_size: journal_size,
299            total_size: journal_size,
300            db_size: journal_size,
301            ancestor_diffs: Vec::new(),
302            ancestor_diff_ends: Vec::new(),
303        })
304    }
305}