Skip to main content

commonware_storage/qmdb/immutable/
batch.rs

1//! Batch mutation API for Immutable QMDBs.
2
3use super::Immutable;
4use crate::{
5    journal::{authenticated, contiguous::Mutable, Error as JournalError},
6    merkle::{Family, Location},
7    qmdb::{
8        any::{batch::lookup_sorted, ValueEncoding},
9        batch_chain::{self, Bounds},
10        immutable::operation::Operation,
11        operation::Key,
12        Error,
13    },
14    translator::Translator,
15    Context, Persistable,
16};
17use commonware_codec::EncodeShared;
18use commonware_cryptography::{Digest, Hasher as CHasher};
19use commonware_parallel::Strategy;
20use std::{
21    collections::BTreeMap,
22    sync::{Arc, Weak},
23};
24
25type DiffVec<K, F, V> = Vec<(K, DiffEntry<F, V>)>;
26
27/// What happened to a key in this batch.
28#[derive(Clone)]
29pub(crate) struct DiffEntry<F: Family, V> {
30    pub(crate) value: V,
31    pub(crate) loc: Location<F>,
32}
33
34/// A speculative batch of operations whose root digest has not yet been computed, in contrast
35/// to [`MerkleizedBatch`].
36///
37/// Consuming [`UnmerkleizedBatch::merkleize`] produces an `Arc<MerkleizedBatch>`.
38/// Methods that need the committed DB (e.g. [`get`](Self::get)) accept it as a parameter.
39#[allow(clippy::type_complexity)]
40pub struct UnmerkleizedBatch<F, H, K, V, S: Strategy>
41where
42    F: Family,
43    K: Key,
44    V: ValueEncoding,
45    H: CHasher,
46{
47    /// Authenticated journal batch for computing the speculative Merkle root.
48    journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, K, V>, S>,
49
50    /// Pending mutations.
51    mutations: BTreeMap<K, V::Value>,
52
53    /// Parent batch in the chain. `None` for batches created directly from the DB.
54    parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V, S>>>,
55
56    /// Total operation count before this batch (committed DB + prior batches).
57    /// This batch's i-th operation lands at location `base_size + i`.
58    base_size: u64,
59
60    /// The database size when this batch was created, used to detect stale batches.
61    db_size: u64,
62}
63
64/// Merkleized authenticated-journal batch wrapping an [`Operation`] payload.
65type JournalBatch<F, D, K, V, S> = Arc<authenticated::MerkleizedBatch<F, D, Operation<F, K, V>, S>>;
66
67/// A speculative batch of operations whose root digest has been computed,
68/// in contrast to [`UnmerkleizedBatch`].
69#[derive(Clone)]
70pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy> {
71    /// Authenticated journal batch (Merkle state + local items).
72    pub(super) journal_batch: JournalBatch<F, D, K, V, S>,
73
74    /// Cached operations root after applying this batch.
75    pub(super) root: D,
76
77    /// This batch's local key-level changes only (not accumulated from ancestors).
78    /// Sorted by key with no duplicates; queried via `lookup_sorted` (binary search).
79    pub(super) diff: Arc<DiffVec<K, F, V::Value>>,
80
81    /// The parent batch in the chain, if any.
82    pub(super) parent: Option<Weak<Self>>,
83
84    /// Arc refs to each ancestor's diff, collected during `merkleize()` while the parent
85    /// is alive. Used by `apply_batch` to apply uncommitted ancestor snapshot diffs.
86    /// 1:1 with `bounds.ancestors` (same length, same ordering).
87    pub(super) ancestor_diffs: Vec<Arc<DiffVec<K, F, V::Value>>>,
88
89    /// Position and floor bounds for this batch chain.
90    pub(super) bounds: batch_chain::Bounds<F>,
91}
92
93impl<F, H, K, V, S: Strategy> UnmerkleizedBatch<F, H, K, V, S>
94where
95    F: Family,
96    K: Key,
97    V: ValueEncoding,
98    H: CHasher,
99    Operation<F, K, V>: EncodeShared,
100{
101    /// Create a batch from a committed DB (no parent chain).
102    pub(super) fn new<E, C, T>(
103        immutable: &Immutable<F, E, K, V, C, H, T, S>,
104        journal_size: u64,
105    ) -> Self
106    where
107        E: Context,
108        C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
109        C::Item: EncodeShared,
110        T: Translator,
111    {
112        Self {
113            journal_batch: immutable.journal.new_batch(),
114            mutations: BTreeMap::new(),
115            parent: None,
116            base_size: journal_size,
117            db_size: journal_size,
118        }
119    }
120
121    /// Set a key to a value.
122    ///
123    /// The key must not already exist in the database or in any ancestor batch
124    /// in the chain. Setting a key that already exists causes undefined behavior.
125    pub fn set(mut self, key: K, value: V::Value) -> Self {
126        self.mutations.insert(key, value);
127        self
128    }
129
130    /// Read through: mutations -> ancestor diffs -> committed DB.
131    pub async fn get<E, C, T>(
132        &self,
133        key: &K,
134        db: &Immutable<F, E, K, V, C, H, T, S>,
135    ) -> Result<Option<V::Value>, Error<F>>
136    where
137        E: Context,
138        C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
139        C::Item: EncodeShared,
140        T: Translator,
141    {
142        // Check this batch's pending mutations.
143        if let Some(value) = self.mutations.get(key) {
144            return Ok(Some(value.clone()));
145        }
146        // Walk parent chain. The first parent is a strong Arc (held by UnmerkleizedBatch),
147        // subsequent parents are Weak refs.
148        if let Some(parent) = self.parent.as_ref() {
149            if let Some(entry) = lookup_sorted(parent.diff.as_slice(), key) {
150                return Ok(Some(entry.value.clone()));
151            }
152            for batch in parent.ancestors() {
153                if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
154                    return Ok(Some(entry.value.clone()));
155                }
156            }
157        }
158        // Fall through to base DB.
159        db.get(key).await
160    }
161
162    /// Batch read multiple keys.
163    ///
164    /// Returns results in the same order as the input keys.
165    pub async fn get_many<E, C, T>(
166        &self,
167        keys: &[&K],
168        db: &Immutable<F, E, K, V, C, H, T, S>,
169    ) -> Result<Vec<Option<V::Value>>, Error<F>>
170    where
171        E: Context,
172        C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
173        C::Item: EncodeShared,
174        T: Translator,
175    {
176        if keys.is_empty() {
177            return Ok(Vec::new());
178        }
179
180        let mut results: Vec<Option<V::Value>> = Vec::with_capacity(keys.len());
181        let mut db_indices = Vec::new();
182        let mut db_keys = Vec::new();
183
184        for (i, key) in keys.iter().enumerate() {
185            // Check local mutations.
186            if let Some(value) = self.mutations.get(*key) {
187                results.push(Some(value.clone()));
188                continue;
189            }
190
191            // Check parent diff chain.
192            let mut found = false;
193            if let Some(parent) = self.parent.as_ref() {
194                if let Some(entry) = lookup_sorted(parent.diff.as_slice(), *key) {
195                    results.push(Some(entry.value.clone()));
196                    found = true;
197                }
198                if !found {
199                    for batch in parent.ancestors() {
200                        if let Some(entry) = lookup_sorted(batch.diff.as_slice(), *key) {
201                            results.push(Some(entry.value.clone()));
202                            found = true;
203                            break;
204                        }
205                    }
206                }
207            }
208
209            if found {
210                continue;
211            }
212
213            // Need DB fallthrough.
214            db_indices.push(i);
215            db_keys.push(*key);
216            results.push(None);
217        }
218
219        if !db_keys.is_empty() {
220            let db_results = db.get_many(&db_keys).await?;
221            for (slot, value) in db_indices.into_iter().zip(db_results) {
222                results[slot] = value;
223            }
224        }
225
226        Ok(results)
227    }
228
229    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
230    ///
231    /// `inactivity_floor` declares that all operations before this location are inactive.
232    /// It must be >= the database's current inactivity floor (monotonically non-decreasing).
233    pub fn merkleize<E, C, T>(
234        self,
235        db: &Immutable<F, E, K, V, C, H, T, S>,
236        metadata: Option<V::Value>,
237        inactivity_floor: Location<F>,
238    ) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>>
239    where
240        E: Context,
241        C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
242        C::Item: EncodeShared,
243        T: Translator,
244    {
245        let base = self.base_size;
246
247        // Build operations: one Set per key, then Commit. `self.mutations` is a BTreeMap, so
248        // iteration yields keys in sorted order, which `diff` relies on for binary search.
249        let mut ops: Vec<Operation<F, K, V>> = Vec::with_capacity(self.mutations.len() + 1);
250        let mut diff: DiffVec<K, F, V::Value> = Vec::with_capacity(self.mutations.len());
251
252        for (key, value) in self.mutations {
253            let loc = Location::new(base + ops.len() as u64);
254            ops.push(Operation::Set(key.clone(), value.clone()));
255            diff.push((key, DiffEntry { value, loc }));
256        }
257        assert!(diff.is_sorted_by(|a, b| a.0 < b.0));
258
259        ops.push(Operation::Commit(metadata, inactivity_floor));
260
261        let total_size = base + ops.len() as u64;
262
263        // Add operations to the journal batch and merkleize.
264        let mut journal_batch = self.journal_batch;
265        for op in &ops {
266            journal_batch = journal_batch.add(op.clone());
267        }
268        let inactive_peaks = F::inactive_peaks(
269            F::location_to_position(Location::new(total_size)),
270            inactivity_floor,
271        );
272        let journal_merkleized = db.journal.with_mem(|mem| journal_batch.merkleize(mem));
273        let root = db
274            .journal
275            .with_mem(|mem| journal_merkleized.root(mem, &db.journal.hasher, inactive_peaks))
276            .expect("inactive_peaks computed from batch size");
277
278        let mut ancestor_diffs = Vec::new();
279        let mut ancestors = Vec::new();
280        for batch in
281            batch_chain::parent_and_ancestors(self.parent.as_ref(), |parent| parent.ancestors())
282        {
283            ancestor_diffs.push(Arc::clone(&batch.diff));
284            ancestors.push(batch_chain::AncestorBounds {
285                floor: batch.bounds.inactivity_floor,
286                end: batch.bounds.total_size,
287            });
288        }
289
290        Arc::new(MerkleizedBatch {
291            journal_batch: journal_merkleized,
292            root,
293            diff: Arc::new(diff),
294            parent: self.parent.as_ref().map(Arc::downgrade),
295            ancestor_diffs,
296            bounds: batch_chain::Bounds {
297                base_size: self.base_size,
298                db_size: self.db_size,
299                total_size,
300                ancestors,
301                inactivity_floor,
302            },
303        })
304    }
305}
306
307impl<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy> MerkleizedBatch<F, D, K, V, S>
308where
309    Operation<F, K, V>: EncodeShared,
310{
311    /// Return the speculative root.
312    pub const fn root(&self) -> D {
313        self.root
314    }
315
316    /// Return the [`Bounds`] of the batch.
317    pub const fn bounds(&self) -> &Bounds<F> {
318        &self.bounds
319    }
320
321    /// Iterate over ancestor batches (parent first, then grandparent, etc.).
322    pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
323        batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
324    }
325
326    /// Read through: local diff -> ancestor diffs -> committed DB.
327    pub async fn get<E, C, H, T>(
328        &self,
329        key: &K,
330        db: &Immutable<F, E, K, V, C, H, T, S>,
331    ) -> Result<Option<V::Value>, Error<F>>
332    where
333        E: Context,
334        C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
335        C::Item: EncodeShared,
336        H: CHasher<Digest = D>,
337        T: Translator,
338    {
339        if let Some(entry) = lookup_sorted(self.diff.as_slice(), key) {
340            return Ok(Some(entry.value.clone()));
341        }
342        for batch in self.ancestors() {
343            if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
344                return Ok(Some(entry.value.clone()));
345            }
346        }
347        db.get(key).await
348    }
349
350    /// Batch read multiple keys.
351    ///
352    /// Returns results in the same order as the input keys.
353    pub async fn get_many<E, C, H, T>(
354        &self,
355        keys: &[&K],
356        db: &Immutable<F, E, K, V, C, H, T, S>,
357    ) -> Result<Vec<Option<V::Value>>, Error<F>>
358    where
359        E: Context,
360        C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
361        C::Item: EncodeShared,
362        H: CHasher<Digest = D>,
363        T: Translator,
364    {
365        if keys.is_empty() {
366            return Ok(Vec::new());
367        }
368
369        let mut results: Vec<Option<V::Value>> = Vec::with_capacity(keys.len());
370        let mut db_indices = Vec::new();
371        let mut db_keys = Vec::new();
372
373        for (i, key) in keys.iter().enumerate() {
374            // Check local diff.
375            if let Some(entry) = lookup_sorted(self.diff.as_slice(), *key) {
376                results.push(Some(entry.value.clone()));
377                continue;
378            }
379
380            // Walk parent chain.
381            let mut found = false;
382            for batch in self.ancestors() {
383                if let Some(entry) = lookup_sorted(batch.diff.as_slice(), *key) {
384                    results.push(Some(entry.value.clone()));
385                    found = true;
386                    break;
387                }
388            }
389
390            if found {
391                continue;
392            }
393
394            // Need DB fallthrough.
395            db_indices.push(i);
396            db_keys.push(*key);
397            results.push(None);
398        }
399
400        if !db_keys.is_empty() {
401            let db_results = db.get_many(&db_keys).await?;
402            for (slot, value) in db_indices.into_iter().zip(db_results) {
403                results[slot] = value;
404            }
405        }
406
407        Ok(results)
408    }
409
410    /// Create a new speculative batch of operations with this batch as its parent.
411    ///
412    /// All uncommitted ancestors in the chain must be kept alive until the child (or any
413    /// descendant) is merkleized. Dropping an uncommitted ancestor causes data
414    /// loss detected at `apply_batch` time.
415    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V, S>
416    where
417        H: CHasher<Digest = D>,
418    {
419        UnmerkleizedBatch {
420            journal_batch: self.journal_batch.new_batch::<H>(),
421            mutations: BTreeMap::new(),
422            parent: Some(Arc::clone(self)),
423            base_size: self.bounds.total_size,
424            db_size: self.bounds.db_size,
425        }
426    }
427}
428
429impl<F, E, K, V, C, H, T, S> Immutable<F, E, K, V, C, H, T, S>
430where
431    F: Family,
432    E: Context,
433    K: Key,
434    V: ValueEncoding,
435    C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
436    C::Item: EncodeShared,
437    H: CHasher,
438    T: Translator,
439    S: Strategy,
440{
441    /// Create an initial [`MerkleizedBatch`] from the committed DB state.
442    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>> {
443        let journal_size = *self.last_commit_loc + 1;
444        Arc::new(MerkleizedBatch {
445            journal_batch: self.journal.to_merkleized_batch(),
446            root: self.root,
447            diff: Arc::new(Vec::new()),
448            parent: None,
449            ancestor_diffs: Vec::new(),
450            bounds: batch_chain::Bounds {
451                base_size: journal_size,
452                db_size: journal_size,
453                total_size: journal_size,
454                ancestors: Vec::new(),
455                inactivity_floor: self.inactivity_floor_loc,
456            },
457        })
458    }
459}