Skip to main content

commonware_storage/qmdb/
batch_chain.rs

1//! Shared validation for QMDB batch chains.
2//!
3//! A batch chain is a linked sequence of in-memory batches built on top of a DB state. Each
4//! batch records its position via [`Bounds`] (where its operations sit in the log) and the
5//! inactivity floor declared by its commit. Older batches in the chain are tracked as
6//! [`AncestorBounds`] in newest-first order; some may already be on disk and others may not.
7//!
8//! Before applying a batch to the DB, the internal validation checks two things shared across QMDB
9//! variants (any, immutable, keyless):
10//!
11//! - The batch is not stale: the current DB size must match either the batch's recorded
12//!   `db_size`, its `base_size`, or one of its ancestor boundaries.
13//! - Commit floors are monotonically non-decreasing along the chain, and no floor exceeds
14//!   its own commit location. Ancestors already on disk are skipped (their floors were
15//!   validated when they were first applied); the rest of the chain and the tip are checked.
16//!
17//! Internal helpers walk the chain via weak parent references and snapshot ancestor bounds into a
18//! `Vec` for storage on a merkleized batch.
19
20use crate::{
21    merkle::{Family, Location},
22    qmdb::Error,
23};
24use core::iter;
25use std::sync::{Arc, Weak};
26
27/// Bounds declared by an ancestor batch's commit.
28#[derive(Clone)]
29pub struct AncestorBounds<F: Family> {
30    /// Inactivity floor declared by the ancestor commit.
31    pub floor: Location<F>,
32    /// Total operations after the ancestor batch.
33    pub end: u64,
34}
35
36/// Position and inactivity-floor state for a merkleized QMDB batch.
37#[derive(Clone)]
38pub struct Bounds<F: Family> {
39    /// Total operations before this batch's own operations.
40    pub base_size: u64,
41    /// Boundary between committed DB operations and operations kept in this batch chain.
42    ///
43    /// Usually this is the DB size when the batch was created. If older ancestors were
44    /// dropped, the boundary moves forward to the oldest ancestor still kept in memory.
45    pub db_size: u64,
46    /// Total operations after this batch.
47    pub total_size: u64,
48    /// Ancestor bounds in newest-first order.
49    pub ancestors: Vec<AncestorBounds<F>>,
50    /// Inactivity floor declared by this batch's commit.
51    pub inactivity_floor: Location<F>,
52}
53
54impl<F: Family> Bounds<F> {
55    /// Validate that this batch can be applied to the current database state.
56    pub(crate) fn validate_apply_to(
57        &self,
58        current_size: u64,
59        current_floor: Location<F>,
60    ) -> Result<(), Error<F>> {
61        validate_batch_applicable(current_size, self.db_size, self.base_size, &self.ancestors)?;
62        validate_commit_floors(
63            current_floor,
64            current_size,
65            &self.ancestors,
66            self.inactivity_floor,
67            Location::new(
68                self.total_size
69                    .checked_sub(1)
70                    .expect("merkleized batch includes a commit"),
71            ),
72        )
73    }
74}
75
76/// Iterate over a batch's live ancestors, starting at `parent`.
77///
78/// Iteration stops when a weak parent reference cannot be upgraded.
79pub(crate) fn ancestors<T, P>(
80    parent: Option<Weak<T>>,
81    mut parent_of: P,
82) -> impl Iterator<Item = Arc<T>>
83where
84    P: for<'a> FnMut(&'a T) -> Option<&'a Weak<T>>,
85{
86    let mut next = parent.as_ref().and_then(Weak::upgrade);
87    iter::from_fn(move || {
88        let batch = next.take()?;
89        next = parent_of(&batch).and_then(Weak::upgrade);
90        Some(batch)
91    })
92}
93
94/// Iterate over a strong parent followed by its live ancestors.
95pub(crate) fn parent_and_ancestors<T, P, I>(
96    parent: Option<&Arc<T>>,
97    mut ancestors_of: P,
98) -> impl Iterator<Item = Arc<T>>
99where
100    P: FnMut(&Arc<T>) -> I,
101    I: IntoIterator<Item = Arc<T>>,
102{
103    parent.cloned().into_iter().flat_map(move |parent| {
104        let ancestors = ancestors_of(&parent);
105        iter::once(parent).chain(ancestors)
106    })
107}
108
109/// Collect ancestor bounds in newest-first order.
110pub(crate) fn collect_ancestor_bounds<T, F, I, E, L>(
111    ancestors: I,
112    floor: L,
113    end: E,
114) -> Vec<AncestorBounds<F>>
115where
116    F: Family,
117    I: IntoIterator<Item = Arc<T>>,
118    E: Fn(&T) -> u64,
119    L: Fn(&T) -> Location<F>,
120{
121    let mut bounds = Vec::new();
122
123    for batch in ancestors {
124        bounds.push(AncestorBounds {
125            floor: floor(&batch),
126            end: end(&batch),
127        });
128    }
129
130    bounds
131}
132
133/// Validate that a batch can be applied to a database with `db_size` committed operations.
134///
135/// A batch is applicable if the database has not advanced since the batch was created
136/// (`batch_db_size`), if all ancestors are already committed (`batch_base_size`), or if the
137/// database has advanced to one of the batch's ancestor boundaries.
138pub(crate) fn validate_batch_applicable<F: Family>(
139    db_size: u64,
140    batch_db_size: u64,
141    batch_base_size: u64,
142    ancestors: &[AncestorBounds<F>],
143) -> Result<(), Error<F>> {
144    if db_size == batch_db_size
145        || db_size == batch_base_size
146        || ancestors.iter().any(|ancestor| ancestor.end == db_size)
147    {
148        return Ok(());
149    }
150
151    Err(Error::StaleBatch {
152        db_size,
153        batch_db_size,
154        batch_base_size,
155    })
156}
157
158/// Validate commit-floor monotonicity for a batch chain.
159///
160/// Ancestors are stored newest-first. Validation walks them in reverse so unapplied ancestors are
161/// checked oldest-to-newest, then checks the tip. Ancestors at or below `db_size` are already
162/// committed locally and are skipped.
163pub(crate) fn validate_commit_floors<F: Family>(
164    starting_floor: Location<F>,
165    db_size: u64,
166    ancestors: &[AncestorBounds<F>],
167    tip_floor: Location<F>,
168    tip_commit_loc: Location<F>,
169) -> Result<(), Error<F>> {
170    let mut prev_floor = starting_floor;
171    for ancestor in ancestors.iter().rev() {
172        if ancestor.end <= db_size {
173            continue;
174        }
175
176        let ancestor_commit_loc = Location::new(ancestor.end - 1);
177        if ancestor.floor < prev_floor {
178            return Err(Error::FloorRegressed(ancestor.floor, prev_floor));
179        }
180        if ancestor.floor > ancestor_commit_loc {
181            return Err(Error::FloorBeyondSize(ancestor.floor, ancestor_commit_loc));
182        }
183        prev_floor = ancestor.floor;
184    }
185
186    if tip_floor < prev_floor {
187        return Err(Error::FloorRegressed(tip_floor, prev_floor));
188    }
189    if tip_floor > tip_commit_loc {
190        return Err(Error::FloorBeyondSize(tip_floor, tip_commit_loc));
191    }
192    Ok(())
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::merkle::mmr;
199    use std::sync::{Arc, Weak};
200
201    type F = mmr::Family;
202
203    struct TestBatch {
204        id: u8,
205        bounds: Bounds<F>,
206        parent: Option<Weak<Self>>,
207    }
208
209    const fn loc(n: u64) -> Location<F> {
210        Location::new(n)
211    }
212
213    const fn ancestor(floor: Location<F>, end: u64) -> AncestorBounds<F> {
214        AncestorBounds { floor, end }
215    }
216
217    #[test]
218    fn validate_batch_applicable_accepts_valid_boundaries() {
219        let ancestors = vec![ancestor(loc(10), 12), ancestor(loc(14), 16)];
220        assert!(validate_batch_applicable::<F>(10, 10, 20, &ancestors).is_ok());
221        assert!(validate_batch_applicable::<F>(20, 10, 20, &ancestors).is_ok());
222        assert!(validate_batch_applicable::<F>(16, 10, 20, &ancestors).is_ok());
223    }
224
225    #[test]
226    fn validate_batch_applicable_rejects_stale_batch() {
227        let ancestors = vec![ancestor(loc(10), 12), ancestor(loc(14), 16)];
228        let result = validate_batch_applicable::<F>(18, 10, 20, &ancestors);
229        assert!(matches!(
230            result,
231            Err(Error::StaleBatch {
232                db_size: 18,
233                batch_db_size: 10,
234                batch_base_size: 20,
235            })
236        ));
237    }
238
239    #[test]
240    fn ancestors_iterates_parent_first() {
241        let grandparent = Arc::new(TestBatch {
242            id: 1,
243            bounds: Bounds {
244                base_size: 0,
245                db_size: 0,
246                total_size: 5,
247                ancestors: Vec::new(),
248                inactivity_floor: loc(3),
249            },
250            parent: None,
251        });
252        let parent = Arc::new(TestBatch {
253            id: 2,
254            bounds: Bounds {
255                base_size: 5,
256                db_size: 0,
257                total_size: 7,
258                ancestors: vec![ancestor(loc(3), 5)],
259                inactivity_floor: loc(6),
260            },
261            parent: Some(Arc::downgrade(&grandparent)),
262        });
263
264        let ids: Vec<_> = ancestors(Some(Arc::downgrade(&parent)), |batch| batch.parent.as_ref())
265            .map(|batch| batch.id)
266            .collect();
267
268        assert_eq!(ids, vec![2, 1]);
269    }
270
271    #[test]
272    fn collect_ancestor_bounds_preserves_pairing_and_order() {
273        let parent = Arc::new(TestBatch {
274            id: 1,
275            bounds: Bounds {
276                base_size: 0,
277                db_size: 0,
278                total_size: 12,
279                ancestors: Vec::new(),
280                inactivity_floor: loc(10),
281            },
282            parent: None,
283        });
284        let grandparent = Arc::new(TestBatch {
285            id: 2,
286            bounds: Bounds {
287                base_size: 0,
288                db_size: 0,
289                total_size: 8,
290                ancestors: Vec::new(),
291                inactivity_floor: loc(6),
292            },
293            parent: None,
294        });
295
296        let bounds = collect_ancestor_bounds(
297            vec![Arc::clone(&parent), Arc::clone(&grandparent)],
298            |batch| batch.bounds.inactivity_floor,
299            |batch| batch.bounds.total_size,
300        );
301
302        assert_eq!(bounds.len(), 2);
303        assert_eq!((bounds[0].floor, bounds[0].end), (loc(10), 12));
304        assert_eq!((bounds[1].floor, bounds[1].end), (loc(6), 8));
305    }
306
307    #[test]
308    fn bounds_validates_apply_to_current_state() {
309        let bounds = Bounds::<F> {
310            base_size: 10,
311            db_size: 10,
312            total_size: 14,
313            ancestors: vec![ancestor(loc(10), 12)],
314            inactivity_floor: loc(11),
315        };
316        assert!(bounds.validate_apply_to(10, loc(9)).is_ok());
317
318        let result = bounds.validate_apply_to(11, loc(9));
319        assert!(matches!(
320            result,
321            Err(Error::StaleBatch {
322                db_size: 11,
323                batch_db_size: 10,
324                batch_base_size: 10,
325            })
326        ));
327    }
328
329    #[test]
330    fn validate_commit_floors_accepts_monotonic_chain() {
331        let ancestors = vec![ancestor(loc(6), 7), ancestor(loc(4), 5)];
332        assert!(validate_commit_floors::<F>(loc(2), 1, &ancestors, loc(8), loc(9),).is_ok());
333    }
334
335    #[test]
336    fn validate_commit_floors_skips_committed_ancestors() {
337        let ancestors = vec![ancestor(loc(1), 7), ancestor(loc(1), 5)];
338        assert!(validate_commit_floors::<F>(loc(6), 7, &ancestors, loc(8), loc(9),).is_ok());
339    }
340
341    #[test]
342    fn validate_commit_floors_rejects_ancestor_regression() {
343        let ancestors = vec![ancestor(loc(6), 7), ancestor(loc(3), 5)];
344        let result = validate_commit_floors::<F>(loc(4), 1, &ancestors, loc(8), loc(9));
345        assert!(matches!(
346            result,
347            Err(Error::FloorRegressed(floor, previous)) if floor == loc(3) && previous == loc(4)
348        ));
349    }
350
351    #[test]
352    fn validate_commit_floors_rejects_ancestor_floor_beyond_commit() {
353        let ancestors = vec![ancestor(loc(8), 7), ancestor(loc(4), 5)];
354        let result = validate_commit_floors::<F>(loc(2), 1, &ancestors, loc(9), loc(9));
355        assert!(matches!(
356            result,
357            Err(Error::FloorBeyondSize(floor, commit)) if floor == loc(8) && commit == loc(6)
358        ));
359    }
360
361    #[test]
362    fn validate_commit_floors_rejects_tip_regression() {
363        let ancestors = vec![ancestor(loc(4), 5)];
364        let result = validate_commit_floors::<F>(loc(2), 1, &ancestors, loc(3), loc(9));
365        assert!(matches!(
366            result,
367            Err(Error::FloorRegressed(floor, previous)) if floor == loc(3) && previous == loc(4)
368        ));
369    }
370
371    #[test]
372    fn validate_commit_floors_rejects_tip_floor_beyond_commit() {
373        let ancestors = vec![ancestor(loc(4), 5)];
374        let result = validate_commit_floors::<F>(loc(2), 1, &ancestors, loc(10), loc(9));
375        assert!(matches!(
376            result,
377            Err(Error::FloorBeyondSize(floor, commit)) if floor == loc(10) && commit == loc(9)
378        ));
379    }
380}