commonware_storage/adb/
keyless.rs

1//! The [Keyless] adb allows for append-only storage of arbitrary variable-length data that can
2//! later be retrieved by its location.
3//!
4//! The implementation consists of an `mmr` over the operations applied to the database, an
5//! operations `log` storing these operations, and a `locations` journal storing the offset of its
6//! respective operation in its section of the operations log.
7//!
8//! The state of the operations log up until the last commit point is the "source of truth". In the
9//! event of unclean shutdown, the mmr and locations structures will be brought back into alignment
10//! with the log on startup.
11
12use crate::{
13    adb::{align_mmr_and_locations, Error},
14    journal::{
15        fixed::{Config as FConfig, Journal as FJournal},
16        variable::{Config as VConfig, Journal as VJournal},
17    },
18    mmr::{
19        hasher::Standard,
20        iterator::leaf_num_to_pos,
21        journaled::{Config as MmrConfig, Mmr},
22        verification::Proof,
23    },
24    store::operation::Keyless as Operation,
25};
26use commonware_codec::{Codec, Encode as _};
27use commonware_cryptography::Hasher as CHasher;
28use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
29use commonware_utils::NZUsize;
30use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt as _};
31use std::num::{NonZeroU64, NonZeroUsize};
32use tracing::{debug, warn};
33
34/// The size of the read buffer to use for replaying the operations log during recovery.
35const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 14);
36
37/// Configuration for a [Keyless] authenticated db.
38#[derive(Clone)]
39pub struct Config<C> {
40    /// The name of the [Storage] partition used for the MMR's backing journal.
41    pub mmr_journal_partition: String,
42
43    /// The items per blob configuration value used by the MMR journal.
44    pub mmr_items_per_blob: NonZeroU64,
45
46    /// The size of the write buffer to use for each blob in the MMR journal.
47    pub mmr_write_buffer: NonZeroUsize,
48
49    /// The name of the [Storage] partition used for the MMR's metadata.
50    pub mmr_metadata_partition: String,
51
52    /// The name of the [Storage] partition used to persist the operations log.
53    pub log_journal_partition: String,
54
55    /// The size of the write buffer to use with the log journal.
56    pub log_write_buffer: NonZeroUsize,
57
58    /// Optional compression level (using `zstd`) to apply to log data before storing.
59    pub log_compression: Option<u8>,
60
61    /// The codec configuration to use for encoding and decoding the operations log.
62    pub log_codec_config: C,
63
64    /// The max number of operations to put in each section of the operations log.
65    pub log_items_per_section: NonZeroU64,
66
67    /// The name of the [Storage] partition used for the location map.
68    pub locations_journal_partition: String,
69
70    /// The number of items to put in each blob in the locations journal.
71    pub locations_items_per_blob: NonZeroU64,
72
73    /// The size of the write buffer to use with the locations journal.
74    pub locations_write_buffer: NonZeroUsize,
75
76    /// An optional thread pool to use for parallelizing batch MMR operations.
77    pub thread_pool: Option<ThreadPool>,
78
79    /// The buffer pool to use for caching data.
80    pub buffer_pool: PoolRef,
81}
82
83/// A keyless ADB for variable length data.
84pub struct Keyless<E: Storage + Clock + Metrics, V: Codec, H: CHasher> {
85    /// An MMR over digests of the operations applied to the db.
86    ///
87    /// # Invariant
88    ///
89    /// The number of leaves in this MMR always equals the number of operations in the unpruned
90    /// `locations` journal.
91    mmr: Mmr<E, H>,
92
93    /// A journal of all operations ever applied to the db.
94    log: VJournal<E, Operation<V>>,
95
96    /// The total number of operations appended (including those that have been pruned).  The next
97    /// appended operation will have this value as its location.
98    size: u64,
99
100    /// The number of operations to put in each section of the operations log.
101    log_items_per_section: u64,
102
103    /// A fixed-length journal that maps an appended value's location to its offset within its
104    /// respective section of the log journal. (The section number is derived from location.)
105    ///
106    /// The locations structure provides the "source of truth" for the db's pruning boundaries and
107    /// overall size, should there be any discrepancies.
108    locations: FJournal<E, u32>,
109
110    /// Cryptographic hasher to re-use within mutable operations requiring digest computation.
111    hasher: Standard<H>,
112
113    /// The location of the last commit, if any.
114    last_commit_loc: Option<u64>,
115}
116
117impl<E: Storage + Clock + Metrics, V: Codec, H: CHasher> Keyless<E, V, H> {
118    /// Find the last valid log operation that has a corresponding location entry.
119    ///
120    /// Accepts the `aligned_size` of the MMR and locations journal and the `log_items_per_section` of a [Keyless] instance.
121    /// Returns the index of the last operation and its offset in the section it belongs to.
122    async fn find_last_operation(
123        locations: &FJournal<E, u32>,
124        log: &VJournal<E, Operation<V>>,
125        aligned_size: u64,
126        log_items_per_section: u64,
127    ) -> Result<(u64, Option<(u64, u32)>), Error> {
128        let mut valid_size = aligned_size;
129        let mut section_offset = None;
130
131        while valid_size > 0 {
132            let loc = valid_size - 1;
133            let offset = locations.read(loc).await?;
134            let section = loc / log_items_per_section;
135            match log.get(section, offset).await {
136                Ok(_) => {
137                    section_offset = Some((section, offset));
138                    break;
139                }
140                Err(e) => {
141                    warn!(loc, err=?e, "log operation missing");
142                }
143            };
144            warn!(loc, offset, section, "walking back locations");
145            valid_size -= 1;
146        }
147
148        Ok((valid_size, section_offset))
149    }
150
151    /// Replay log operations from a given position and sync MMR and locations.
152    ///
153    /// Returns None if the log is empty (for initial replay), otherwise returns
154    /// the offset and the last operation processed.
155    async fn replay_operations(
156        mmr: &mut Mmr<E, H>,
157        hasher: &mut Standard<H>,
158        locations: &mut FJournal<E, u32>,
159        log: &VJournal<E, Operation<V>>,
160        section_offset: Option<(u64, u32)>,
161    ) -> Result<Option<(u32, Operation<V>)>, Error> {
162        // Initialize stream from section_offset
163        let (section, offset, skip_first) = match section_offset {
164            Some((s, o)) => (s, o, true),
165            None => (0, 0, false),
166        };
167        let stream = log.replay(section, offset, REPLAY_BUFFER_SIZE).await?;
168        pin_mut!(stream);
169
170        // Get first operation and handle empty log case
171        let first_op = stream.next().await;
172        let (mut last_offset, mut last_op) = if skip_first {
173            // We expect the first operation to exist (already processed)
174            let first_op = first_op.expect("operation known to exist")?;
175            (offset, first_op.3)
176        } else {
177            // Check if log is empty
178            let Some(first_op) = first_op else {
179                debug!("no starting log operation found, returning empty db");
180                return Ok(None);
181            };
182            let first_op = first_op?;
183            let encoded_op = first_op.3.encode();
184
185            // Add first operation to mmr and locations
186            mmr.add_batched(hasher, &encoded_op).await?;
187            locations.append(first_op.1).await?;
188            (first_op.1, first_op.3)
189        };
190
191        // Process remaining operations
192        while let Some(result) = stream.next().await {
193            let (section, offset, _, next_op) = result?;
194            let encoded_op = next_op.encode();
195            last_offset = offset;
196            last_op = next_op;
197            warn!(
198                location = mmr.leaves(),
199                section, offset, "adding missing operation to MMR/location map"
200            );
201            mmr.add_batched(hasher, &encoded_op).await?;
202            locations.append(offset).await?;
203        }
204
205        // Sync if needed
206        if mmr.is_dirty() {
207            mmr.sync(hasher).await?;
208            locations.sync().await?;
209        }
210
211        Ok(Some((last_offset, last_op)))
212    }
213
214    /// Find the last commit point and rewind to it if necessary.
215    ///
216    /// Accepts the `op_count` of the MMR, the `last_offset` of the last log operation, and the `log_items_per_section` of a [Keyless] instance.
217    /// Returns the index of the last operation after rewinding.
218    async fn rewind_to_last_commit(
219        mmr: &mut Mmr<E, H>,
220        locations: &mut FJournal<E, u32>,
221        log: &mut VJournal<E, Operation<V>>,
222        last_log_op: Operation<V>,
223        op_count: u64,
224        last_offset: u32,
225        log_items_per_section: u64,
226    ) -> Result<u64, Error> {
227        let mut first_uncommitted: Option<(u64, u32)> = None;
228        let mut op_index = op_count - 1;
229        let mut op = last_log_op;
230        let mut offset = last_offset;
231        let oldest_retained_loc = locations
232            .oldest_retained_pos()
233            .await?
234            .expect("location should be nonempty");
235
236        // Walk backwards through the log until we find the last commit point.
237        loop {
238            match op {
239                Operation::Commit(_) => {
240                    break;
241                }
242                Operation::Append(_) => {
243                    // Track the earliest uncommitted append (index, offset) encountered while
244                    // walking backwards. If none is found before we hit a Commit, there is
245                    // nothing to rewind.
246                    first_uncommitted = Some((op_index, offset));
247                }
248            }
249            if op_index == oldest_retained_loc {
250                assert_eq!(op_index, 0, "no commit operation found");
251                break;
252            }
253            op_index -= 1;
254            offset = locations.read(op_index).await?;
255            let section = op_index / log_items_per_section;
256            op = log.get(section, offset).await?;
257        }
258
259        // If there are no uncommitted operations, exit early.
260        let Some((rewind_size, rewind_offset)) = first_uncommitted else {
261            return Ok(op_index + 1);
262        };
263
264        // Rewind the log and MMR to the last commit point.
265        let ops_to_rewind = (op_count - rewind_size) as usize;
266        warn!(ops_to_rewind, rewind_size, "rewinding log to last commit");
267        locations.rewind(rewind_size).await?;
268        locations.sync().await?;
269        mmr.pop(ops_to_rewind).await?; // sync is handled by pop
270        let section = rewind_size / log_items_per_section;
271        log.rewind_to_offset(section, rewind_offset).await?;
272        log.sync(section).await?;
273
274        Ok(rewind_size)
275    }
276
277    /// Returns a [Keyless] adb initialized from `cfg`. Any uncommitted operations will be discarded
278    /// and the state of the db will be as of the last committed operation.
279    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
280        let mut hasher = Standard::<H>::new();
281
282        let mut mmr = Mmr::init(
283            context.with_label("mmr"),
284            &mut hasher,
285            MmrConfig {
286                journal_partition: cfg.mmr_journal_partition,
287                metadata_partition: cfg.mmr_metadata_partition,
288                items_per_blob: cfg.mmr_items_per_blob,
289                write_buffer: cfg.mmr_write_buffer,
290                thread_pool: cfg.thread_pool,
291                buffer_pool: cfg.buffer_pool.clone(),
292            },
293        )
294        .await?;
295
296        let mut locations = FJournal::init(
297            context.with_label("locations"),
298            FConfig {
299                partition: cfg.locations_journal_partition,
300                items_per_blob: cfg.locations_items_per_blob,
301                write_buffer: cfg.locations_write_buffer,
302                buffer_pool: cfg.buffer_pool.clone(),
303            },
304        )
305        .await?;
306
307        // Align the sizes of locations and mmr.
308        let aligned_size = align_mmr_and_locations(&mut mmr, &mut locations).await?;
309
310        let mut log = VJournal::<E, Operation<V>>::init(
311            context.with_label("log"),
312            VConfig {
313                partition: cfg.log_journal_partition,
314                compression: cfg.log_compression,
315                codec_config: cfg.log_codec_config,
316                buffer_pool: cfg.buffer_pool,
317                write_buffer: cfg.log_write_buffer,
318            },
319        )
320        .await?;
321
322        // Find the location of the most recent log operation that is at an index less than or equal to the `aligned_size`.
323        let log_items_per_section = cfg.log_items_per_section.get();
324        let (valid_size, section_offset) =
325            Self::find_last_operation(&locations, &log, aligned_size, log_items_per_section)
326                .await?;
327
328        // Trim any locations/mmr elements that do not have corresponding operations in log.
329        if aligned_size != valid_size {
330            warn!(
331                size = aligned_size,
332                new_size = valid_size,
333                "trimming locations & mmr elements ahead of log"
334            );
335            locations.rewind(valid_size).await?;
336            locations.sync().await?;
337            mmr.pop((aligned_size - valid_size) as usize).await?;
338        }
339        assert_eq!(mmr.leaves(), locations.size().await?);
340
341        // Apply operations from the log at indices beyond the `aligned_size` to the MMR and locations journal.
342        //
343        // Because we don't sync the MMR and locations journal during commit, it is possible that they are (far) behind
344        // the log.
345        let replay_result =
346            Self::replay_operations(&mut mmr, &mut hasher, &mut locations, &log, section_offset)
347                .await?;
348        let Some((last_offset, last_op)) = replay_result else {
349            // Empty database
350            return Ok(Self {
351                mmr,
352                log,
353                size: 0,
354                locations,
355                log_items_per_section,
356                hasher,
357                last_commit_loc: None,
358            });
359        };
360
361        // Find the last commit point and rewind to it (if necessary).
362        let op_count = mmr.leaves();
363        let size = Self::rewind_to_last_commit(
364            &mut mmr,
365            &mut locations,
366            &mut log,
367            last_op,
368            op_count,
369            last_offset,
370            log_items_per_section,
371        )
372        .await?;
373        assert_eq!(size, mmr.leaves());
374        assert_eq!(size, locations.size().await?);
375
376        Ok(Self {
377            mmr,
378            log,
379            size,
380            locations,
381            log_items_per_section,
382            hasher,
383            last_commit_loc: size.checked_sub(1),
384        })
385    }
386
387    /// Get the value at location `loc` in the database.
388    pub async fn get(&self, loc: u64) -> Result<Option<V>, Error> {
389        assert!(loc < self.size);
390        let offset = self.locations.read(loc).await?;
391
392        let section = loc / self.log_items_per_section;
393        let op = self.log.get(section, offset).await?;
394
395        Ok(op.into_value())
396    }
397
398    /// Get the number of operations (appends + commits) that have been applied to the db since
399    /// inception.
400    pub fn op_count(&self) -> u64 {
401        self.size
402    }
403
404    /// Returns the location of the last commit, if any.
405    pub fn last_commit_loc(&self) -> Option<u64> {
406        self.last_commit_loc
407    }
408
409    /// Returns the section of the operations log where we are currently writing new operations.
410    fn current_section(&self) -> u64 {
411        self.size / self.log_items_per_section
412    }
413
414    /// Return the oldest location that remains retrievable.
415    pub async fn oldest_retained_loc(&self) -> Result<Option<u64>, Error> {
416        if let Some(oldest_section) = self.log.oldest_section() {
417            Ok(Some(oldest_section * self.log_items_per_section))
418        } else {
419            Ok(None)
420        }
421    }
422
423    /// Prunes the db of up to all operations that have location less than `loc`. The actual number
424    /// pruned may be fewer than requested due to blob boundaries in the underlying journals.
425    ///
426    /// # Panics
427    ///
428    /// Panics if `loc` is beyond the last commit point.
429    pub async fn prune(&mut self, loc: u64) -> Result<(), Error> {
430        assert!(loc <= self.last_commit_loc.unwrap_or(0));
431
432        // Sync the mmr before pruning the log, otherwise the MMR tip could end up behind the log's
433        // pruning boundary on restart from an unclean shutdown, and there would be no way to replay
434        // the operations between the MMR tip and the log pruning boundary.
435        // TODO(https://github.com/commonwarexyz/monorepo/issues/1554): We currently sync locations
436        // as well, but this could be avoided by extending recovery.
437        try_join!(
438            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
439            self.locations.sync().map_err(Error::Journal),
440        )?;
441
442        // Prune the log first since it's always the source of truth.
443        let section = loc / self.log_items_per_section;
444        if !self.log.prune(section).await? {
445            return Ok(());
446        }
447
448        let prune_loc = section * self.log_items_per_section;
449        debug!(size = self.size, loc = prune_loc, "pruned log");
450
451        // Prune locations and the MMR to the corresponding positions.
452        try_join!(
453            self.mmr
454                .prune_to_pos(&mut self.hasher, leaf_num_to_pos(prune_loc))
455                .map_err(Error::Mmr),
456            self.locations.prune(prune_loc).map_err(Error::Journal),
457        )?;
458
459        Ok(())
460    }
461
462    /// Append a value to the db, returning its location which can be used to retrieve it.
463    pub async fn append(&mut self, value: V) -> Result<u64, Error> {
464        let loc = self.size;
465        let section = self.current_section();
466        let operation = Operation::Append(value);
467        let encoded_operation = operation.encode();
468
469        // Create a future that appends the operation to the log and updates locations with the
470        // resulting offset.
471        let log_loc_fut = async {
472            let (offset, _) = self.log.append(section, operation).await?;
473            self.locations.append(offset).await?;
474            Ok::<(), Error>(())
475        };
476
477        // Create a future that updates the MMR.
478        let mmr_fut = async {
479            self.mmr
480                .add_batched(&mut self.hasher, &encoded_operation)
481                .await?;
482            Ok::<(), Error>(())
483        };
484
485        // Run the 2 futures in parallel.
486        try_join!(log_loc_fut, mmr_fut)?;
487        self.size += 1;
488
489        // Maintain invariant that all filled sections are synced and immutable.
490        if section != self.current_section() {
491            self.log.sync(section).await?;
492        }
493
494        Ok(loc)
495    }
496
497    /// Commit any pending operations to the database, ensuring their durability upon return from
498    /// this function. Caller can associate an arbitrary `metadata` value with the commit.
499    ///
500    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
501    /// recover the database on restart.
502    pub async fn commit(&mut self, metadata: Option<V>) -> Result<u64, Error> {
503        let loc = self.size;
504        let section = self.current_section();
505        self.last_commit_loc = Some(loc);
506
507        let operation = Operation::Commit(metadata);
508        let encoded_operation = operation.encode();
509
510        // Create a future that updates and syncs the log, and updates locations with the resulting
511        // offset.
512        let log_loc_fut = async {
513            let (offset, _) = self.log.append(section, operation).await?;
514            // Sync the log and update locations in parallel.
515            try_join!(
516                self.log.sync(section).map_err(Error::Journal),
517                self.locations.append(offset).map_err(Error::Journal),
518            )?;
519
520            Ok::<(), Error>(())
521        };
522
523        // Create a future that adds the commit operation to the MMR and processes all updates.
524        let mmr_fut = async {
525            self.mmr
526                .add_batched(&mut self.hasher, &encoded_operation)
527                .await?;
528            self.mmr.process_updates(&mut self.hasher);
529
530            Ok::<(), Error>(())
531        };
532
533        // Run the 2 futures in parallel.
534        try_join!(log_loc_fut, mmr_fut)?;
535        self.size += 1;
536
537        debug!(size = self.size, "committed db");
538
539        Ok(loc)
540    }
541
542    /// Sync all database state to disk. While this isn't necessary to ensure durability of
543    /// committed operations, periodic invocation may reduce memory usage and the time required to
544    /// recover the database on restart.
545    pub async fn sync(&mut self) -> Result<(), Error> {
546        let section = self.current_section();
547        try_join!(
548            self.locations.sync().map_err(Error::Journal),
549            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
550            self.log.sync(section).map_err(Error::Journal),
551        )?;
552
553        Ok(())
554    }
555
556    /// Get the location and metadata associated with the last commit, or None if no commit has been
557    /// made.
558    pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
559        let Some(loc) = self.last_commit_loc else {
560            return Ok(None);
561        };
562        let offset = self.locations.read(loc).await?;
563        let section = loc / self.log_items_per_section;
564        let op = self.log.get(section, offset).await?;
565        let Operation::Commit(metadata) = op else {
566            return Ok(None);
567        };
568
569        Ok(Some((loc, metadata)))
570    }
571
572    /// Return the root of the db.
573    ///
574    /// # Warning
575    ///
576    /// Panics if there are uncommitted operations.
577    pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
578        self.mmr.root(hasher)
579    }
580
581    /// Generate and return:
582    ///  1. a proof of all operations applied to the db in the range starting at (and including)
583    ///     location `start_loc`, and ending at the first of either:
584    ///     - the last operation performed, or
585    ///     - the operation `max_ops` from the start.
586    ///  2. the operations corresponding to the leaves in this range.
587    ///
588    /// # Warning
589    ///
590    /// Panics if there are uncommitted operations.
591    pub async fn proof(
592        &self,
593        start_loc: u64,
594        max_ops: NonZeroU64,
595    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
596        self.historical_proof(self.size, start_loc, max_ops).await
597    }
598
599    /// Analogous to proof, but with respect to the state of the MMR when it had `size` elements.
600    pub async fn historical_proof(
601        &self,
602        size: u64,
603        start_loc: u64,
604        max_ops: NonZeroU64,
605    ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
606        let start_pos = leaf_num_to_pos(start_loc);
607        let end_index = std::cmp::min(size - 1, start_loc + max_ops.get() - 1);
608        let end_pos = leaf_num_to_pos(end_index);
609        let mmr_size = leaf_num_to_pos(size);
610
611        let proof = self
612            .mmr
613            .historical_range_proof(mmr_size, start_pos, end_pos)
614            .await?;
615        let mut ops = Vec::with_capacity((end_index - start_loc + 1) as usize);
616        for loc in start_loc..=end_index {
617            let offset = self.locations.read(loc).await?;
618            let section = loc / self.log_items_per_section;
619            let value = self.log.get(section, offset).await?;
620            ops.push(value);
621        }
622
623        Ok((proof, ops))
624    }
625
626    /// Close the db. Operations that have not been committed will be lost.
627    pub async fn close(mut self) -> Result<(), Error> {
628        // Close the locations journal first to make sure it's synced first (see `sync` for why this
629        // is important).
630        self.locations.close().await?;
631
632        try_join!(
633            self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
634            self.log.close().map_err(Error::Journal),
635        )?;
636
637        Ok(())
638    }
639
640    /// Destroy the db, removing all data from disk.
641    pub async fn destroy(self) -> Result<(), Error> {
642        try_join!(
643            self.mmr.destroy().map_err(Error::Mmr),
644            self.log.destroy().map_err(Error::Journal),
645            self.locations.destroy().map_err(Error::Journal),
646        )?;
647
648        Ok(())
649    }
650
651    #[cfg(test)]
652    /// Simulate failure by consuming the db but without syncing / closing the various structures.
653    pub(super) async fn simulate_failure(
654        mut self,
655        sync_log: bool,
656        sync_locations: bool,
657        sync_mmr: bool,
658    ) -> Result<(), Error> {
659        if sync_log {
660            let section = self.current_section();
661            self.log.sync(section).await?;
662        }
663        if sync_locations {
664            self.locations.sync().await?;
665        }
666        if sync_mmr {
667            self.mmr.sync(&mut self.hasher).await?;
668        }
669
670        Ok(())
671    }
672
673    #[cfg(test)]
674    /// Simulate pruning failure by consuming the db and abandoning pruning operation mid-flight.
675    pub(super) async fn simulate_prune_failure(mut self, loc: u64) -> Result<(), Error> {
676        assert!(loc <= self.last_commit_loc.unwrap_or(0));
677        // Perform the same steps as pruning except "crash" right after the log is pruned.
678        try_join!(
679            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
680            self.locations.sync().map_err(Error::Journal),
681        )?;
682        let section = loc / self.log_items_per_section;
683        assert!(
684            self.log.prune(section).await?,
685            "nothing was pruned, so could not simulate failure"
686        );
687
688        // "fail" before mmr/locations are pruned.
689        Ok(())
690    }
691}
692
693#[cfg(test)]
694mod test {
695    use super::*;
696    use crate::{
697        adb::verify_proof,
698        mmr::{hasher::Standard, mem::Mmr as MemMmr},
699    };
700    use commonware_cryptography::Sha256;
701    use commonware_macros::test_traced;
702    use commonware_runtime::{deterministic, Runner as _};
703    use commonware_utils::{NZUsize, NZU64};
704
705    // Use some weird sizes here to test boundary conditions.
706    const PAGE_SIZE: usize = 101;
707    const PAGE_CACHE_SIZE: usize = 11;
708
709    fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg, ())> {
710        Config {
711            mmr_journal_partition: format!("journal_{suffix}"),
712            mmr_metadata_partition: format!("metadata_{suffix}"),
713            mmr_items_per_blob: NZU64!(11),
714            mmr_write_buffer: NZUsize!(1024),
715            log_journal_partition: format!("log_journal_{suffix}"),
716            log_write_buffer: NZUsize!(1024),
717            log_compression: None,
718            log_codec_config: ((0..=10000).into(), ()),
719            log_items_per_section: NZU64!(7),
720            locations_journal_partition: format!("locations_journal_{suffix}"),
721            locations_items_per_blob: NZU64!(13),
722            locations_write_buffer: NZUsize!(1024),
723            thread_pool: None,
724            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
725        }
726    }
727
728    /// A type alias for the concrete [Any] type used in these unit tests.
729    type Db = Keyless<deterministic::Context, Vec<u8>, Sha256>;
730
731    /// Return a [Keyless] database initialized with a fixed config.
732    async fn open_db(context: deterministic::Context) -> Db {
733        Db::init(context, db_config("partition")).await.unwrap()
734    }
735
736    #[test_traced("INFO")]
737    pub fn test_keyless_db_empty() {
738        let executor = deterministic::Runner::default();
739        executor.start(|context| async move {
740            let mut db = open_db(context.clone()).await;
741            let mut hasher = Standard::<Sha256>::new();
742            assert_eq!(db.op_count(), 0);
743            assert_eq!(db.oldest_retained_loc().await.unwrap(), None);
744            assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
745            assert_eq!(db.get_metadata().await.unwrap(), None);
746            assert_eq!(db.last_commit_loc(), None);
747
748            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
749            let v1 = vec![1u8; 8];
750            let root = db.root(&mut hasher);
751            db.append(v1).await.unwrap();
752            db.close().await.unwrap();
753            let mut db = open_db(context.clone()).await;
754            assert_eq!(db.root(&mut hasher), root);
755            assert_eq!(db.op_count(), 0);
756            assert_eq!(db.get_metadata().await.unwrap(), None);
757
758            // Test calling commit on an empty db which should make it (durably) non-empty.
759            let metadata = Some(vec![3u8; 10]);
760            db.commit(metadata.clone()).await.unwrap();
761            assert_eq!(db.op_count(), 1); // commit op
762            assert_eq!(
763                db.get_metadata().await.unwrap(),
764                Some((0, metadata.clone()))
765            );
766            assert_eq!(db.get(0).await.unwrap(), metadata); // the commit op
767            let root = db.root(&mut hasher);
768
769            // Commit op should remain after reopen even without clean shutdown.
770            let db = open_db(context.clone()).await;
771            assert_eq!(db.op_count(), 1); // commit op should remain after re-open.
772            assert_eq!(db.get_metadata().await.unwrap(), Some((0, metadata)));
773            assert_eq!(db.root(&mut hasher), root);
774            assert_eq!(db.last_commit_loc(), Some(0));
775
776            db.destroy().await.unwrap();
777        });
778    }
779
780    #[test_traced("WARN")]
781    pub fn test_keyless_db_build_basic() {
782        let executor = deterministic::Runner::default();
783        executor.start(|context| async move {
784            // Build a db with 2 values and make sure we can get them back.
785            let mut hasher = Standard::<Sha256>::new();
786            let mut db = open_db(context.clone()).await;
787
788            let v1 = vec![1u8; 8];
789            let v2 = vec![2u8; 20];
790
791            let loc1 = db.append(v1.clone()).await.unwrap();
792            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
793
794            let loc2 = db.append(v2.clone()).await.unwrap();
795            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
796
797            // Make sure closing/reopening gets us back to the same state.
798            db.commit(None).await.unwrap();
799            assert_eq!(db.op_count(), 3); // 2 appends, 1 commit
800            assert_eq!(db.get_metadata().await.unwrap(), Some((2, None)));
801            assert_eq!(db.get(2).await.unwrap(), None); // the commit op
802            let root = db.root(&mut hasher);
803            db.close().await.unwrap();
804            let mut db = open_db(context.clone()).await;
805            assert_eq!(db.op_count(), 3);
806            assert_eq!(db.root(&mut hasher), root);
807
808            assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
809            assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
810
811            db.append(v2).await.unwrap();
812            db.append(v1).await.unwrap();
813
814            // Make sure uncommitted items get rolled back.
815            db.close().await.unwrap();
816            let db = open_db(context.clone()).await;
817            assert_eq!(db.op_count(), 3);
818            assert_eq!(db.root(&mut hasher), root);
819
820            // Make sure commit operation remains after close/reopen.
821            db.close().await.unwrap();
822            let db = open_db(context.clone()).await;
823            assert_eq!(db.op_count(), 3);
824            assert_eq!(db.root(&mut hasher), root);
825
826            db.destroy().await.unwrap();
827        });
828    }
829
830    #[test_traced("WARN")]
831    pub fn test_keyless_db_recovery() {
832        let executor = deterministic::Runner::default();
833        const ELEMENTS: u64 = 1000;
834        executor.start(|context| async move {
835            let mut hasher = Standard::<Sha256>::new();
836            let mut db = open_db(context.clone()).await;
837            let root = db.root(&mut hasher);
838
839            for i in 0u64..ELEMENTS {
840                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
841                db.append(v.clone()).await.unwrap();
842            }
843
844            // Simulate a failure before committing and test that we rollback to the previous root.
845            db.simulate_failure(false, false, false).await.unwrap();
846            let mut db = open_db(context.clone()).await;
847            assert_eq!(root, db.root(&mut hasher));
848
849            // re-apply the updates and commit them this time.
850            for i in 0u64..ELEMENTS {
851                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
852                db.append(v.clone()).await.unwrap();
853            }
854            db.commit(None).await.unwrap();
855            let root = db.root(&mut hasher);
856
857            // Append even more values.
858            for i in ELEMENTS..2 * ELEMENTS {
859                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
860                db.append(v.clone()).await.unwrap();
861            }
862
863            // Simulate a failure (mode 1) and test that we rollback to the previous root.
864            db.simulate_failure(false, false, false).await.unwrap();
865            let mut db = open_db(context.clone()).await;
866            assert_eq!(root, db.root(&mut hasher));
867
868            // Re-apply the updates and simulate different failure mode (mode 2).
869            for i in ELEMENTS..2 * ELEMENTS {
870                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
871                db.append(v.clone()).await.unwrap();
872            }
873            db.simulate_failure(true, false, false).await.unwrap();
874            let mut db = open_db(context.clone()).await;
875            assert_eq!(root, db.root(&mut hasher));
876
877            // Re-apply the updates and simulate different failure mode (mode 3).
878            for i in ELEMENTS..2 * ELEMENTS {
879                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
880                db.append(v.clone()).await.unwrap();
881            }
882            db.simulate_failure(true, true, false).await.unwrap();
883            let mut db = open_db(context.clone()).await;
884            assert_eq!(root, db.root(&mut hasher));
885
886            // Re-apply the updates and simulate different failure mode (mode 4).
887            for i in ELEMENTS..2 * ELEMENTS {
888                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
889                db.append(v.clone()).await.unwrap();
890            }
891            db.simulate_failure(true, false, true).await.unwrap();
892            let mut db = open_db(context.clone()).await;
893            assert_eq!(root, db.root(&mut hasher));
894
895            // Re-apply the updates and simulate different failure mode (mode 5).
896            for i in ELEMENTS..2 * ELEMENTS {
897                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
898                db.append(v.clone()).await.unwrap();
899            }
900            db.simulate_failure(false, true, false).await.unwrap();
901            let mut db = open_db(context.clone()).await;
902            assert_eq!(root, db.root(&mut hasher));
903
904            // Re-apply the updates and simulate different failure mode (mode 6).
905            for i in ELEMENTS..2 * ELEMENTS {
906                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
907                db.append(v.clone()).await.unwrap();
908            }
909            db.simulate_failure(false, false, true).await.unwrap();
910            let mut db = open_db(context.clone()).await;
911            assert_eq!(root, db.root(&mut hasher));
912
913            // Re-apply the updates and simulate different failure mode (mode 7).
914            for i in ELEMENTS..2 * ELEMENTS {
915                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
916                db.append(v.clone()).await.unwrap();
917            }
918            db.simulate_failure(false, true, true).await.unwrap();
919            let mut db = open_db(context.clone()).await;
920            assert_eq!(root, db.root(&mut hasher));
921
922            // Re-apply the updates and commit them this time.
923            for i in ELEMENTS..2 * ELEMENTS {
924                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
925                db.append(v.clone()).await.unwrap();
926            }
927            db.commit(None).await.unwrap();
928            let root = db.root(&mut hasher);
929
930            // Make sure we can close/reopen and get back to the same state.
931            db.close().await.unwrap();
932            let db = open_db(context.clone()).await;
933            assert_eq!(db.op_count(), 2 * ELEMENTS + 2);
934            assert_eq!(db.root(&mut hasher), root);
935
936            db.destroy().await.unwrap();
937        });
938    }
939
940    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
941    /// empty DB on re-open.
942    #[test_traced("WARN")]
943    fn test_keyless_db_non_empty_db_recovery() {
944        let executor = deterministic::Runner::default();
945        executor.start(|context| async move {
946            let mut hasher = Standard::<Sha256>::new();
947            let mut db = open_db(context.clone()).await;
948
949            // Append many values then commit.
950            const ELEMENTS: u64 = 200;
951            for i in 0u64..ELEMENTS {
952                let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
953                db.append(v).await.unwrap();
954            }
955            db.commit(None).await.unwrap();
956            db.prune(10).await.unwrap();
957            let root = db.root(&mut hasher);
958            let op_count = db.op_count();
959
960            // Reopen DB without clean shutdown and make sure the state is the same.
961            let db = open_db(context.clone()).await;
962            assert_eq!(db.op_count(), op_count);
963            assert_eq!(db.root(&mut hasher), root);
964            assert_eq!(db.last_commit_loc(), Some(op_count - 1));
965            db.close().await.unwrap();
966
967            async fn apply_more_ops(db: &mut Db) {
968                for i in 0..ELEMENTS {
969                    let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
970                    db.append(v).await.unwrap();
971                }
972            }
973
974            // Insert many operations without commit, then simulate various types of failures.
975            async fn recover_from_failure(
976                context: deterministic::Context,
977                root: <Sha256 as CHasher>::Digest,
978                hasher: &mut Standard<Sha256>,
979                op_count: u64,
980            ) {
981                let mut db = open_db(context.clone()).await;
982                apply_more_ops(&mut db).await;
983                db.simulate_failure(false, false, false).await.unwrap();
984                let mut db = open_db(context.clone()).await;
985                assert_eq!(db.op_count(), op_count);
986                assert_eq!(db.root(hasher), root);
987
988                apply_more_ops(&mut db).await;
989                db.simulate_failure(true, false, false).await.unwrap();
990                let mut db = open_db(context.clone()).await;
991                assert_eq!(db.op_count(), op_count);
992                assert_eq!(db.root(hasher), root);
993
994                apply_more_ops(&mut db).await;
995                db.simulate_failure(false, true, false).await.unwrap();
996                let mut db = open_db(context.clone()).await;
997                assert_eq!(db.op_count(), op_count);
998                assert_eq!(db.root(hasher), root);
999
1000                apply_more_ops(&mut db).await;
1001                db.simulate_failure(false, false, true).await.unwrap();
1002                let mut db = open_db(context.clone()).await;
1003                assert_eq!(db.op_count(), op_count);
1004                assert_eq!(db.root(hasher), root);
1005
1006                apply_more_ops(&mut db).await;
1007                db.simulate_failure(true, true, false).await.unwrap();
1008                let mut db = open_db(context.clone()).await;
1009                assert_eq!(db.op_count(), op_count);
1010                assert_eq!(db.root(hasher), root);
1011
1012                apply_more_ops(&mut db).await;
1013                db.simulate_failure(true, false, true).await.unwrap();
1014                let mut db = open_db(context.clone()).await;
1015                assert_eq!(db.op_count(), op_count);
1016                assert_eq!(db.root(hasher), root);
1017
1018                apply_more_ops(&mut db).await;
1019                db.simulate_failure(false, true, true).await.unwrap();
1020                let db = open_db(context.clone()).await;
1021                assert_eq!(db.op_count(), op_count);
1022                assert_eq!(db.root(hasher), root);
1023                assert_eq!(db.last_commit_loc(), Some(op_count - 1));
1024            }
1025
1026            recover_from_failure(context.clone(), root, &mut hasher, op_count).await;
1027
1028            // Simulate a failure during pruning and ensure we recover.
1029            let db = open_db(context.clone()).await;
1030            let last_commit_loc = db.last_commit_loc().unwrap();
1031            db.simulate_prune_failure(last_commit_loc).await.unwrap();
1032            let db = open_db(context.clone()).await;
1033            assert_eq!(db.op_count(), op_count);
1034            assert_eq!(db.root(&mut hasher), root);
1035            db.close().await.unwrap();
1036
1037            // Repeat recover_from_failure tests after successfully pruning to the last commit.
1038            let mut db = open_db(context.clone()).await;
1039            db.prune(db.last_commit_loc().unwrap()).await.unwrap();
1040            assert_eq!(db.op_count(), op_count);
1041            assert_eq!(db.root(&mut hasher), root);
1042            db.close().await.unwrap();
1043
1044            recover_from_failure(context.clone(), root, &mut hasher, op_count).await;
1045
1046            // Apply the ops one last time but fully commit them this time, then clean up.
1047            let mut db = open_db(context.clone()).await;
1048            apply_more_ops(&mut db).await;
1049            db.commit(None).await.unwrap();
1050            let db = open_db(context.clone()).await;
1051            assert!(db.op_count() > op_count);
1052            assert_ne!(db.root(&mut hasher), root);
1053            assert_eq!(db.last_commit_loc(), Some(db.op_count() - 1));
1054
1055            db.destroy().await.unwrap();
1056        });
1057    }
1058
1059    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
1060    /// DB on re-open.
1061    #[test_traced("WARN")]
1062    fn test_keyless_db_empty_db_recovery() {
1063        const ELEMENTS: u64 = 1000;
1064        let executor = deterministic::Runner::default();
1065        executor.start(|context| async move {
1066            let mut hasher = Standard::<Sha256>::new();
1067            let db = open_db(context.clone()).await;
1068            let root = db.root(&mut hasher);
1069
1070            // Reopen DB without clean shutdown and make sure the state is the same.
1071            let mut db = open_db(context.clone()).await;
1072            assert_eq!(db.op_count(), 0);
1073            assert_eq!(db.root(&mut hasher), root);
1074
1075            async fn apply_ops(db: &mut Db) {
1076                for i in 0..ELEMENTS {
1077                    let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
1078                    db.append(v).await.unwrap();
1079                }
1080            }
1081
1082            // Simulate various failure types after inserting operations without a commit.
1083            apply_ops(&mut db).await;
1084            db.simulate_failure(false, false, false).await.unwrap();
1085            let mut db = open_db(context.clone()).await;
1086            assert_eq!(db.op_count(), 0);
1087            assert_eq!(db.root(&mut hasher), root);
1088
1089            apply_ops(&mut db).await;
1090            db.simulate_failure(true, false, false).await.unwrap();
1091            let mut db = open_db(context.clone()).await;
1092            assert_eq!(db.op_count(), 0);
1093            assert_eq!(db.root(&mut hasher), root);
1094
1095            apply_ops(&mut db).await;
1096            db.simulate_failure(false, true, false).await.unwrap();
1097            let mut db = open_db(context.clone()).await;
1098            assert_eq!(db.op_count(), 0);
1099            assert_eq!(db.root(&mut hasher), root);
1100
1101            apply_ops(&mut db).await;
1102            db.simulate_failure(false, false, true).await.unwrap();
1103            let mut db = open_db(context.clone()).await;
1104            assert_eq!(db.op_count(), 0);
1105            assert_eq!(db.root(&mut hasher), root);
1106
1107            apply_ops(&mut db).await;
1108            db.simulate_failure(true, true, false).await.unwrap();
1109            let mut db = open_db(context.clone()).await;
1110            assert_eq!(db.op_count(), 0);
1111            assert_eq!(db.root(&mut hasher), root);
1112
1113            apply_ops(&mut db).await;
1114            db.simulate_failure(true, false, true).await.unwrap();
1115            let mut db = open_db(context.clone()).await;
1116            assert_eq!(db.op_count(), 0);
1117            assert_eq!(db.root(&mut hasher), root);
1118
1119            apply_ops(&mut db).await;
1120            db.simulate_failure(false, true, true).await.unwrap();
1121            let mut db = open_db(context.clone()).await;
1122            assert_eq!(db.op_count(), 0);
1123            assert_eq!(db.root(&mut hasher), root);
1124
1125            // One last check that re-open without proper shutdown still recovers the correct state.
1126            apply_ops(&mut db).await;
1127            apply_ops(&mut db).await;
1128            apply_ops(&mut db).await;
1129            let mut db = open_db(context.clone()).await;
1130            assert_eq!(db.op_count(), 0);
1131            assert_eq!(db.root(&mut hasher), root);
1132            assert_eq!(db.last_commit_loc(), None);
1133
1134            // Apply the ops one last time but fully commit them this time, then clean up.
1135            apply_ops(&mut db).await;
1136            db.commit(None).await.unwrap();
1137            let db = open_db(context.clone()).await;
1138            assert!(db.op_count() > 0);
1139            assert_ne!(db.root(&mut hasher), root);
1140
1141            db.destroy().await.unwrap();
1142        });
1143    }
1144
1145    #[test_traced("INFO")]
1146    pub fn test_keyless_db_proof_generation_and_verification() {
1147        let executor = deterministic::Runner::default();
1148        executor.start(|context| async move {
1149            let mut hasher = Standard::<Sha256>::new();
1150            let mut db = open_db(context.clone()).await;
1151
1152            // Build a db with some values
1153            const ELEMENTS: u64 = 100;
1154            let mut values = Vec::new();
1155            for i in 0u64..ELEMENTS {
1156                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1157                values.push(v.clone());
1158                db.append(v).await.unwrap();
1159            }
1160            db.commit(None).await.unwrap();
1161            let root = db.root(&mut hasher);
1162
1163            // Test proof generation for various ranges
1164            let test_cases = vec![
1165                (0, 10),           // First 10 operations
1166                (10, 5),           // Middle range
1167                (50, 20),          // Larger range
1168                (90, 15),          // Range that extends beyond end (should be limited)
1169                (0, 1),            // Single operation
1170                (ELEMENTS - 1, 1), // Last append operation
1171                (ELEMENTS, 1),     // The commit operation
1172            ];
1173
1174            for (start_loc, max_ops) in test_cases {
1175                let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
1176
1177                // Verify the proof
1178                assert!(
1179                    verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
1180                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
1181                );
1182
1183                // Check that we got the expected number of operations
1184                let expected_ops = std::cmp::min(max_ops, db.op_count() - start_loc);
1185                assert_eq!(
1186                    ops.len() as u64,
1187                    expected_ops,
1188                    "Expected {expected_ops} operations, got {}",
1189                    ops.len(),
1190                );
1191
1192                // Verify operation types
1193                for (i, op) in ops.iter().enumerate() {
1194                    let loc = start_loc + i as u64;
1195                    if loc < ELEMENTS {
1196                        // Should be an Append operation
1197                        assert!(
1198                            matches!(op, Operation::Append(_)),
1199                            "Expected Append operation at location {loc}, got {op:?}",
1200                        );
1201                    } else if loc == ELEMENTS {
1202                        // Should be a Commit operation
1203                        assert!(
1204                            matches!(op, Operation::Commit(_)),
1205                            "Expected Commit operation at location {loc}, got {op:?}",
1206                        );
1207                    }
1208                }
1209
1210                // Verify that proof fails with wrong root
1211                let wrong_root = Sha256::hash(&[0xFF; 32]);
1212                assert!(
1213                    !verify_proof(&mut hasher, &proof, start_loc, &ops, &wrong_root),
1214                    "Proof should fail with wrong root"
1215                );
1216
1217                // Verify that proof fails with wrong start location
1218                if start_loc > 0 {
1219                    assert!(
1220                        !verify_proof(&mut hasher, &proof, start_loc - 1, &ops, &root),
1221                        "Proof should fail with wrong start location"
1222                    );
1223                }
1224            }
1225
1226            db.destroy().await.unwrap();
1227        });
1228    }
1229
1230    #[test_traced("INFO")]
1231    pub fn test_keyless_db_proof_with_pruning() {
1232        let executor = deterministic::Runner::default();
1233        executor.start(|context| async move {
1234            let mut hasher = Standard::<Sha256>::new();
1235            let mut db = open_db(context.clone()).await;
1236
1237            // Build a db with some values
1238            const ELEMENTS: u64 = 100;
1239            let mut values = Vec::new();
1240            for i in 0u64..ELEMENTS {
1241                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1242                values.push(v.clone());
1243                db.append(v).await.unwrap();
1244            }
1245            db.commit(None).await.unwrap();
1246
1247            // Add more elements and commit again
1248            for i in ELEMENTS..ELEMENTS * 2 {
1249                let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
1250                values.push(v.clone());
1251                db.append(v).await.unwrap();
1252            }
1253            db.commit(None).await.unwrap();
1254            let root = db.root(&mut hasher);
1255
1256            println!("last commit loc: {}", db.last_commit_loc.unwrap());
1257
1258            // Prune the first 30 operations
1259            const PRUNE_LOC: u64 = 30;
1260            db.prune(PRUNE_LOC).await.unwrap();
1261
1262            // Verify pruning worked
1263            let oldest_retained = db.oldest_retained_loc().await.unwrap();
1264            assert!(
1265                oldest_retained.is_some(),
1266                "Should have oldest retained location after pruning"
1267            );
1268
1269            // Root should remain the same after pruning
1270            assert_eq!(
1271                db.root(&mut hasher),
1272                root,
1273                "Root should not change after pruning"
1274            );
1275
1276            db.close().await.unwrap();
1277            let mut db = open_db(context.clone()).await;
1278            assert_eq!(db.root(&mut hasher), root);
1279            assert_eq!(db.op_count(), 2 * ELEMENTS + 2);
1280            assert!(db.oldest_retained_loc().await.unwrap().unwrap() <= PRUNE_LOC);
1281
1282            // Test that we can't get pruned values
1283            for i in 0..oldest_retained.unwrap() {
1284                let result = db.get(i).await;
1285                // Should either return None (for commit ops) or encounter pruned data
1286                match result {
1287                    Ok(None) => {} // Commit operation or pruned
1288                    Ok(Some(_)) => {
1289                        panic!("Should not be able to get pruned value at location {i}")
1290                    }
1291                    Err(_) => {} // Expected error for pruned data
1292                }
1293            }
1294
1295            // Test proof generation after pruning - should work for non-pruned ranges
1296            let test_cases = vec![
1297                (oldest_retained.unwrap(), 10), // Starting from oldest retained
1298                (50, 20),                       // Middle range (if not pruned)
1299                (150, 10),                      // Later range
1300                (190, 15),                      // Near the end
1301            ];
1302
1303            for (start_loc, max_ops) in test_cases {
1304                // Skip if start_loc is before oldest retained
1305                if start_loc < oldest_retained.unwrap() {
1306                    continue;
1307                }
1308
1309                let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
1310
1311                // Verify the proof still works
1312                assert!(
1313                    verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
1314                    "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
1315                );
1316
1317                // Check that we got operations
1318                let expected_ops = std::cmp::min(max_ops, db.op_count() - start_loc);
1319                assert_eq!(
1320                    ops.len() as u64,
1321                    expected_ops,
1322                    "Expected {expected_ops} operations, got {}",
1323                    ops.len(),
1324                );
1325            }
1326
1327            // Test pruning more aggressively
1328            const AGGRESSIVE_PRUNE: u64 = 150;
1329            db.prune(AGGRESSIVE_PRUNE).await.unwrap();
1330
1331            let new_oldest = db.oldest_retained_loc().await.unwrap().unwrap();
1332            assert!(new_oldest <= AGGRESSIVE_PRUNE);
1333
1334            // Can still generate proofs for the remaining data
1335            let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
1336            assert!(
1337                verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
1338                "Proof should still verify after aggressive pruning"
1339            );
1340
1341            // Test edge case: prune everything except the last few operations
1342            let almost_all = db.op_count() - 5;
1343            db.prune(almost_all).await.unwrap();
1344
1345            let final_oldest = db.oldest_retained_loc().await.unwrap().unwrap();
1346
1347            // Should still be able to prove the remaining operations
1348            if final_oldest < db.op_count() {
1349                let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
1350                assert!(
1351                    verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
1352                    "Should be able to prove remaining operations after extensive pruning"
1353                );
1354            }
1355
1356            db.destroy().await.unwrap();
1357        });
1358    }
1359
1360    #[test_traced("WARN")]
1361    fn test_keyless_db_replay_with_trailing_appends() {
1362        let executor = deterministic::Runner::default();
1363        executor.start(|context| async move {
1364            let mut hasher = Standard::<Sha256>::new();
1365
1366            // Create initial database with committed data
1367            let mut db = open_db(context.clone()).await;
1368
1369            // Add some initial operations and commit
1370            for i in 0..10 {
1371                let v = vec![i as u8; 10];
1372                db.append(v).await.unwrap();
1373            }
1374            db.commit(None).await.unwrap();
1375            let committed_root = db.root(&mut hasher);
1376            let committed_size = db.op_count();
1377
1378            // Add exactly one more append (uncommitted)
1379            let uncommitted_value = vec![99u8; 20];
1380            db.append(uncommitted_value.clone()).await.unwrap();
1381
1382            // Sync only the log (not MMR or locations)
1383            db.simulate_failure(true, false, false).await.unwrap();
1384
1385            // Reopen database
1386            let mut db = open_db(context.clone()).await;
1387
1388            // Verify correct recovery
1389            assert_eq!(
1390                db.op_count(),
1391                committed_size,
1392                "Should rewind to last commit"
1393            );
1394            assert_eq!(
1395                db.root(&mut hasher),
1396                committed_root,
1397                "Root should match last commit"
1398            );
1399            assert_eq!(
1400                db.last_commit_loc(),
1401                Some(committed_size - 1),
1402                "Last commit location should be correct"
1403            );
1404
1405            // Verify the uncommitted append was properly discarded
1406            // We should be able to append new data without issues
1407            let new_value = vec![77u8; 15];
1408            let loc = db.append(new_value.clone()).await.unwrap();
1409            assert_eq!(
1410                loc, committed_size,
1411                "New append should get the expected location"
1412            );
1413
1414            // Verify we can read the new value
1415            assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1416
1417            // Test with multiple trailing appends to ensure robustness
1418            db.commit(None).await.unwrap();
1419            let new_committed_root = db.root(&mut hasher);
1420            let new_committed_size = db.op_count();
1421
1422            // Add multiple uncommitted appends
1423            for i in 0..5 {
1424                let v = vec![(200 + i) as u8; 10];
1425                db.append(v).await.unwrap();
1426            }
1427
1428            // Simulate the same partial failure scenario
1429            db.simulate_failure(true, false, false).await.unwrap();
1430
1431            // Reopen and verify correct recovery
1432            let db = open_db(context.clone()).await;
1433            assert_eq!(
1434                db.op_count(),
1435                new_committed_size,
1436                "Should rewind to last commit with multiple trailing appends"
1437            );
1438            assert_eq!(
1439                db.root(&mut hasher),
1440                new_committed_root,
1441                "Root should match last commit after multiple appends"
1442            );
1443            assert_eq!(
1444                db.last_commit_loc(),
1445                Some(new_committed_size - 1),
1446                "Last commit location should be correct after multiple appends"
1447            );
1448
1449            db.destroy().await.unwrap();
1450        });
1451    }
1452}