Skip to main content

shodh_redb/
db.rs

1use crate::blob_store::{BlobCompactionPolicy, BlobCompactionReport, BlobDedupConfig, BlobStats};
2use crate::cdc::CdcConfig;
3use crate::error::{BackendError, TransactionError};
4#[cfg(feature = "std")]
5use crate::group_commit::{GroupCommitError, GroupCommitter, WriteBatch};
6#[cfg(feature = "metrics")]
7use crate::observer::DbMetrics;
8use crate::observer::{DatabaseObserver, default_observer};
9use crate::sealed::Sealed;
10use crate::transaction_tracker::{TransactionId, TransactionTracker};
11use crate::transactions::{
12    ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
13    DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
14    TransactionIdWithPagination,
15};
16#[cfg(feature = "std")]
17use crate::tree_store::ReadOnlyBackend;
18#[cfg(feature = "std")]
19use crate::tree_store::salvage_tree_leaves;
20use crate::tree_store::{
21    Btree, BtreeHeader, CompressionConfig, InternalTableDefinition, PAGE_SIZE, PageHint,
22    PageNumber, ShrinkPolicy, TableTree, TableType, TransactionalMemory,
23};
24use crate::types::{Key, Value};
25use crate::{
26    CommitError, CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError,
27    StorageError, TableError,
28};
29use crate::{ReadTransaction, Result, WriteTransaction};
30use alloc::boxed::Box;
31use alloc::format;
32use alloc::string::{String, ToString};
33use alloc::sync::Arc;
34#[cfg(feature = "std")]
35use alloc::vec;
36use alloc::vec::Vec;
37use core::fmt::{Debug, Display, Formatter};
38use core::marker::PhantomData;
39
40#[cfg(feature = "std")]
41use std::fs::{File, OpenOptions};
42#[cfg(feature = "std")]
43use std::path::Path;
44#[cfg(feature = "std")]
45use std::time::{Duration, Instant};
46
47#[cfg(feature = "std")]
48use crate::tree_store::file_backend::FileBackend;
49#[cfg(feature = "logging")]
50use log::{debug, info, warn};
51
52#[allow(clippy::len_without_is_empty)]
53/// Implements persistent storage for a database.
54pub trait StorageBackend: 'static + Debug + Send + Sync {
55    /// Gets the current length of the storage.
56    fn len(&self) -> core::result::Result<u64, BackendError>;
57
58    /// Reads the specified array of bytes from the storage.
59    ///
60    /// If `out.len()` + `offset` exceeds the length of the storage an appropriate `Error` must be returned.
61    fn read(&self, offset: u64, out: &mut [u8]) -> core::result::Result<(), BackendError>;
62
63    /// Sets the length of the storage.
64    ///
65    /// New positions in the storage must be initialized to zero.
66    fn set_len(&self, len: u64) -> core::result::Result<(), BackendError>;
67
68    /// Syncs all buffered data with the persistent storage.
69    fn sync_data(&self) -> core::result::Result<(), BackendError>;
70
71    /// Writes the specified array to the storage.
72    fn write(&self, offset: u64, data: &[u8]) -> core::result::Result<(), BackendError>;
73
74    /// Release any resources held by the backend
75    ///
76    /// Note: redb will not access the backend after calling this method and will call it exactly
77    /// once when the [`Database`] is dropped
78    fn close(&self) -> core::result::Result<(), BackendError> {
79        Ok(())
80    }
81}
82
83pub trait TableHandle: Sealed {
84    // Returns the name of the table
85    fn name(&self) -> &str;
86}
87
88#[derive(Clone)]
89pub struct UntypedTableHandle {
90    name: String,
91}
92
93impl UntypedTableHandle {
94    pub(crate) fn new(name: String) -> Self {
95        Self { name }
96    }
97}
98
99impl TableHandle for UntypedTableHandle {
100    fn name(&self) -> &str {
101        &self.name
102    }
103}
104
105impl Sealed for UntypedTableHandle {}
106
107pub trait MultimapTableHandle: Sealed {
108    // Returns the name of the multimap table
109    fn name(&self) -> &str;
110}
111
112#[derive(Clone)]
113pub struct UntypedMultimapTableHandle {
114    name: String,
115}
116
117impl UntypedMultimapTableHandle {
118    pub(crate) fn new(name: String) -> Self {
119        Self { name }
120    }
121}
122
123impl MultimapTableHandle for UntypedMultimapTableHandle {
124    fn name(&self) -> &str {
125        &self.name
126    }
127}
128
129impl Sealed for UntypedMultimapTableHandle {}
130
131/// Const-compatible byte-level prefix check for use in `const fn` table name validation.
132const fn const_starts_with(haystack: &[u8], needle: &[u8]) -> bool {
133    if needle.len() > haystack.len() {
134        return false;
135    }
136    let mut i = 0;
137    while i < needle.len() {
138        if haystack[i] != needle[i] {
139            return false;
140        }
141        i += 1;
142    }
143    true
144}
145
146/// Defines the name and types of a table
147///
148/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
149///
150/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
151/// that is stored or retreived from the table
152pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
153    name: &'a str,
154    _key_type: PhantomData<K>,
155    _value_type: PhantomData<V>,
156}
157
158impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
159    /// Construct a new table with given `name`
160    ///
161    /// ## Invariant
162    ///
163    /// `name` must not be empty and must not use the reserved `__ivfpq:` prefix.
164    pub const fn new(name: &'a str) -> Self {
165        assert!(!name.is_empty());
166        assert!(
167            !const_starts_with(name.as_bytes(), b"__ivfpq:"),
168            "table names starting with \"__ivfpq:\" are reserved for internal use"
169        );
170        Self {
171            name,
172            _key_type: PhantomData,
173            _value_type: PhantomData,
174        }
175    }
176
177    /// Internal constructor that permits reserved prefixes.
178    /// Only for use by IVF-PQ and other internal subsystems.
179    pub(crate) const fn new_internal(name: &'a str) -> Self {
180        assert!(!name.is_empty());
181        Self {
182            name,
183            _key_type: PhantomData,
184            _value_type: PhantomData,
185        }
186    }
187}
188
189impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
190    fn name(&self) -> &str {
191        self.name
192    }
193}
194
195impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
196
197impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
198    fn clone(&self) -> Self {
199        *self
200    }
201}
202
203impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
204
205impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
206    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
207        write!(
208            f,
209            "{}<{}, {}>",
210            self.name,
211            K::type_name().name(),
212            V::type_name().name()
213        )
214    }
215}
216
217/// Defines the name and types of a multimap table
218///
219/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
220///
221/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
222///
223/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
224/// that is stored or retreived from the table
225pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
226    name: &'a str,
227    _key_type: PhantomData<K>,
228    _value_type: PhantomData<V>,
229}
230
231impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
232    pub const fn new(name: &'a str) -> Self {
233        assert!(!name.is_empty());
234        assert!(
235            !const_starts_with(name.as_bytes(), b"__ivfpq:"),
236            "table names starting with \"__ivfpq:\" are reserved for internal use"
237        );
238        Self {
239            name,
240            _key_type: PhantomData,
241            _value_type: PhantomData,
242        }
243    }
244}
245
246impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
247    fn name(&self) -> &str {
248        self.name
249    }
250}
251
252impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
253
254impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
255    fn clone(&self) -> Self {
256        *self
257    }
258}
259
260impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
261
262impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
263    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
264        write!(
265            f,
266            "{}<{}, {}>",
267            self.name,
268            K::type_name().name(),
269            V::type_name().name()
270        )
271    }
272}
273
274/// Controls the depth of integrity verification.
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum VerifyLevel {
277    /// Header and commit slot checksums only (fast: reads header once)
278    Header,
279    /// Header + per-page XXH3-128 checksums (medium: walks all B-tree pages)
280    Pages,
281    /// Full: checksums + B-tree structural integrity -- key ordering, valid child
282    /// pointers, consistent tree depth (slow: walks and decodes entire tree)
283    Full,
284}
285
286/// Details about a single corrupt page found during verification.
287#[derive(Debug, Clone)]
288pub struct CorruptPageInfo {
289    /// Page number within the database file
290    pub page_number: u64,
291    /// Which table the page belongs to, if determinable
292    pub table_name: Option<String>,
293    /// Human-readable description of the corruption
294    pub description: String,
295}
296
297/// Results of a database integrity verification.
298#[cfg(feature = "std")]
299#[derive(Debug, Clone)]
300pub struct VerifyReport {
301    /// Overall pass/fail
302    pub valid: bool,
303    /// Whether header magic number and slot checksums are valid
304    pub header_valid: bool,
305    /// Number of B-tree pages checked (0 for Header level)
306    pub pages_checked: u64,
307    /// Number of pages with checksum mismatches
308    pub pages_corrupt: u64,
309    /// Whether B-tree structural invariants hold (only checked at Full level)
310    pub structural_valid: Option<bool>,
311    /// Detailed corruption info for each corrupt page
312    pub corrupt_details: Vec<CorruptPageInfo>,
313    /// Time taken
314    pub duration: Duration,
315}
316
317/// Results of a best-effort database recovery (salvage) operation.
318#[cfg(feature = "std")]
319#[derive(Debug, Clone)]
320pub struct SalvageReport {
321    /// Number of tables discovered in the corrupted database
322    pub tables_found: u64,
323    /// Number of tables from which at least one row was recovered
324    pub tables_recovered: u64,
325    /// Total number of key/value rows successfully recovered
326    pub rows_recovered: u64,
327    /// Estimated number of rows lost due to corruption
328    pub rows_lost: u64,
329    /// Detailed corruption info for each corrupt page encountered
330    pub corrupt_details: Vec<CorruptPageInfo>,
331    /// Time taken
332    pub duration: Duration,
333}
334
335/// Progress report from a single compaction step.
336#[derive(Debug, Clone, Copy)]
337pub struct CompactionProgress {
338    /// Number of pages relocated in this step
339    pub pages_relocated: u64,
340    /// Whether compaction is complete (no more pages to relocate)
341    pub complete: bool,
342}
343
344/// Information regarding the usage of the in-memory cache
345///
346/// Note: hit/miss/eviction metrics are only collected when the "`cache_metrics`" feature is enabled.
347/// `used_bytes` and `budget_bytes` are always available.
348#[derive(Debug, Clone, Copy)]
349pub struct CacheStats {
350    pub(crate) evictions: u64,
351    pub(crate) read_hits: u64,
352    pub(crate) read_misses: u64,
353    pub(crate) write_hits: u64,
354    pub(crate) write_misses: u64,
355    pub(crate) used_bytes: usize,
356    pub(crate) budget_bytes: Option<usize>,
357}
358
359impl CacheStats {
360    /// Number of times that data has been evicted, due to the cache being full
361    ///
362    /// To increase the cache size use [`Builder::set_cache_size`]
363    pub fn evictions(&self) -> u64 {
364        self.evictions
365    }
366
367    /// Number of times that unmodified data has been read from the cache
368    pub fn read_hits(&self) -> u64 {
369        self.read_hits
370    }
371
372    /// Number of times that unmodified data was not in the cache and was read from storage
373    pub fn read_misses(&self) -> u64 {
374        self.read_misses
375    }
376
377    /// Number of times that data modified in a transaction has been read from the cache
378    pub fn write_hits(&self) -> u64 {
379        self.write_hits
380    }
381
382    /// Number of times that data modified in a transaction was not in the cache and was read from storage
383    pub fn write_misses(&self) -> u64 {
384        self.write_misses
385    }
386
387    /// Number of bytes in the cache
388    pub fn used_bytes(&self) -> usize {
389        self.used_bytes
390    }
391
392    /// The configured memory budget, if any.
393    ///
394    /// Returns `None` when no budget is set (default), meaning the cache sizes
395    /// are controlled only by [`Builder::set_cache_size`].
396    pub fn budget_bytes(&self) -> Option<usize> {
397        self.budget_bytes
398    }
399}
400
401pub(crate) enum TransactionGuard {
402    Active {
403        transaction_tracker: Arc<TransactionTracker>,
404        transaction_id: Option<TransactionId>,
405        write_transaction: bool,
406    },
407    /// No-op guard for verification and repair page traversal.
408    Verification,
409}
410
411impl TransactionGuard {
412    pub(crate) fn new_read(
413        transaction_id: TransactionId,
414        tracker: Arc<TransactionTracker>,
415    ) -> Self {
416        Self::Active {
417            transaction_tracker: tracker,
418            transaction_id: Some(transaction_id),
419            write_transaction: false,
420        }
421    }
422
423    pub(crate) fn new_write(
424        transaction_id: TransactionId,
425        tracker: Arc<TransactionTracker>,
426    ) -> Self {
427        Self::Active {
428            transaction_tracker: tracker,
429            transaction_id: Some(transaction_id),
430            write_transaction: true,
431        }
432    }
433
434    pub(crate) fn id(&self) -> Result<TransactionId, StorageError> {
435        match self {
436            Self::Active { transaction_id, .. } => transaction_id.ok_or_else(|| {
437                StorageError::Internal(String::from("TransactionGuard::id() called after leak()"))
438            }),
439            Self::Verification => Err(StorageError::Internal(String::from(
440                "TransactionGuard::id() called on Verification guard",
441            ))),
442        }
443    }
444
445    pub(crate) fn leak(&mut self) -> Result<TransactionId, StorageError> {
446        match self {
447            Self::Active { transaction_id, .. } => transaction_id.take().ok_or_else(|| {
448                StorageError::Internal(String::from(
449                    "TransactionGuard::leak() called after prior leak()",
450                ))
451            }),
452            Self::Verification => Err(StorageError::Internal(String::from(
453                "TransactionGuard::leak() called on Verification guard",
454            ))),
455        }
456    }
457}
458
459impl Drop for TransactionGuard {
460    fn drop(&mut self) {
461        if let Self::Active {
462            transaction_tracker,
463            transaction_id: Some(transaction_id),
464            write_transaction,
465        } = self
466        {
467            if *write_transaction {
468                let _ = transaction_tracker.end_write_transaction(*transaction_id);
469            } else {
470                let _ = transaction_tracker.deallocate_read_transaction(*transaction_id);
471            }
472        }
473    }
474}
475
476pub trait ReadableDatabase {
477    /// Begins a read transaction
478    ///
479    /// Captures a snapshot of the database, so that only data committed before calling this method
480    /// is visible in the transaction
481    ///
482    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
483    /// may exist concurrently with writes
484    fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
485
486    /// Information regarding the usage of the in-memory cache
487    ///
488    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
489    fn cache_stats(&self) -> CacheStats;
490}
491
492/// A redb database opened in read-only mode
493///
494/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
495///
496/// Multiple processes may open a [`ReadOnlyDatabase`], but it may not be opened concurrently
497/// with a [`Database`].
498///
499/// # Examples
500///
501/// Basic usage:
502///
503/// ```rust
504/// use shodh_redb::*;
505/// # use tempfile::NamedTempFile;
506/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
507///
508/// # fn main() -> Result<(), Error> {
509/// # #[cfg(not(target_os = "wasi"))]
510/// # let tmpfile = NamedTempFile::new().unwrap();
511/// # #[cfg(target_os = "wasi")]
512/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
513/// # let filename = tmpfile.path();
514/// let db = Database::create(filename)?;
515/// let txn = db.begin_write()?;
516/// {
517///     let mut table = txn.open_table(TABLE)?;
518///     table.insert(&0, &0)?;
519/// }
520/// txn.commit()?;
521/// drop(db);
522///
523/// let db = ReadOnlyDatabase::open(filename)?;
524/// let txn = db.begin_read()?;
525/// {
526///     let mut table = txn.open_table(TABLE)?;
527///     println!("{}", table.get(&0)?.unwrap().value());
528/// }
529/// # Ok(())
530/// # }
531/// ```
532pub struct ReadOnlyDatabase {
533    mem: Arc<TransactionalMemory>,
534    transaction_tracker: Arc<TransactionTracker>,
535}
536
537impl ReadableDatabase for ReadOnlyDatabase {
538    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
539        let id = self
540            .transaction_tracker
541            .register_read_transaction(&self.mem)?;
542        #[cfg(feature = "logging")]
543        debug!("Beginning read transaction id={id:?}");
544
545        let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
546
547        ReadTransaction::new(
548            self.mem.clone(),
549            guard,
550            default_observer(),
551            #[cfg(feature = "metrics")]
552            Arc::new(DbMetrics::new()),
553        )
554    }
555
556    fn cache_stats(&self) -> CacheStats {
557        self.mem.cache_stats()
558    }
559}
560
561impl ReadOnlyDatabase {
562    /// Opens an existing redb database.
563    #[cfg(feature = "std")]
564    pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
565        Builder::new().open_read_only(path)
566    }
567
568    #[allow(clippy::too_many_arguments)]
569    #[cfg(feature = "std")]
570    fn new(
571        file: Box<dyn StorageBackend>,
572        page_size: usize,
573        region_size: Option<u64>,
574        read_cache_size_bytes: usize,
575        compression: CompressionConfig,
576        memory_budget: Option<usize>,
577        read_verification: ReadVerification,
578        read_verification_callback: Option<Arc<ReadVerificationCallback>>,
579    ) -> Result<Self, DatabaseError> {
580        #[cfg(feature = "logging")]
581        let file_path = format!("{:?}", &file);
582        #[cfg(feature = "logging")]
583        info!("Opening database in read-only {:?}", &file_path);
584        let mem = TransactionalMemory::new(
585            Box::new(ReadOnlyBackend::new(file)),
586            false,
587            page_size,
588            region_size,
589            read_cache_size_bytes,
590            0,
591            true,
592            compression,
593            memory_budget,
594            read_verification,
595            read_verification_callback,
596        )?;
597        let mem = Arc::new(mem);
598        // If the last transaction used 2-phase commit and updated the allocator state table, then
599        // we can just load the allocator state from there. Otherwise, we need a full repair
600        if let Some(tree) = Database::get_allocator_state_table(&mem)? {
601            mem.load_allocator_state(&tree)?;
602        } else {
603            #[cfg(feature = "logging")]
604            warn!(
605                "Database {:?} not shutdown cleanly. Repair required",
606                &file_path
607            );
608            return Err(DatabaseError::RepairAborted);
609        }
610
611        // Verify B-tree checksums to catch corruption that the header check alone misses
612        if !Database::verify_primary_checksums(mem.clone())? {
613            return Err(DatabaseError::Storage(StorageError::Corrupted(
614                "B-tree checksum verification failed".to_string(),
615            )));
616        }
617
618        let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
619        let db = Self {
620            mem,
621            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
622        };
623
624        Ok(db)
625    }
626}
627
628/// Opened redb database file
629///
630/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
631/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
632///
633/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
634/// may be in progress at a time.
635///
636/// # Examples
637///
638/// Basic usage:
639///
640/// ```rust
641/// use shodh_redb::*;
642/// # use tempfile::NamedTempFile;
643/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
644///
645/// # fn main() -> Result<(), Error> {
646/// # #[cfg(not(target_os = "wasi"))]
647/// # let tmpfile = NamedTempFile::new().unwrap();
648/// # #[cfg(target_os = "wasi")]
649/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
650/// # let filename = tmpfile.path();
651/// let db = Database::create(filename)?;
652/// let write_txn = db.begin_write()?;
653/// {
654///     let mut table = write_txn.open_table(TABLE)?;
655///     table.insert(&0, &0)?;
656/// }
657/// write_txn.commit()?;
658/// # Ok(())
659/// # }
660/// ```
661/// Metadata about a retained transaction snapshot, returned by
662/// [`Database::transaction_history()`].
663#[derive(Debug, Clone)]
664pub struct TransactionInfo {
665    /// The unique transaction ID.
666    pub transaction_id: u64,
667    /// Wall-clock timestamp in milliseconds since UNIX epoch.
668    /// Always `0` under `no_std`.
669    pub timestamp_ms: u64,
670}
671
672pub struct Database {
673    mem: Arc<TransactionalMemory>,
674    transaction_tracker: Arc<TransactionTracker>,
675    blob_dedup_config: BlobDedupConfig,
676    cdc_config: CdcConfig,
677    history_retention: u64,
678    blob_compaction_policy: BlobCompactionPolicy,
679    observer: Arc<dyn DatabaseObserver>,
680    #[cfg(feature = "metrics")]
681    db_metrics: Arc<DbMetrics>,
682    #[cfg(feature = "std")]
683    group_committer: GroupCommitter,
684}
685
686impl ReadableDatabase for Database {
687    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
688        let guard = self.allocate_read_transaction()?;
689        let txn_id = guard.id().ok();
690        #[cfg(feature = "logging")]
691        debug!("Beginning read transaction id={txn_id:?}");
692        let txn = ReadTransaction::new(
693            self.get_memory(),
694            guard,
695            Arc::clone(&self.observer),
696            #[cfg(feature = "metrics")]
697            Arc::clone(&self.db_metrics),
698        )?;
699        if let Some(id) = txn_id {
700            self.observer.on_read_begin(id.raw_id());
701            #[cfg(feature = "metrics")]
702            self.db_metrics
703                .read_txn_opened
704                .fetch_add(1, portable_atomic::Ordering::Relaxed);
705        }
706        Ok(txn)
707    }
708
709    fn cache_stats(&self) -> CacheStats {
710        self.mem.cache_stats()
711    }
712}
713
714impl Database {
715    /// Opens the specified file as a redb database.
716    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
717    /// * if the file is a valid redb database, it will be opened
718    /// * otherwise this function will return an error
719    #[cfg(feature = "std")]
720    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
721        Self::builder().create(path)
722    }
723
724    /// Opens an existing redb database.
725    #[cfg(feature = "std")]
726    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
727        Self::builder().open(path)
728    }
729
730    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
731        self.mem.clone()
732    }
733
734    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
735        let table_tree = TableTree::new(
736            mem.get_data_root(),
737            PageHint::None,
738            Arc::new(TransactionGuard::Verification),
739            mem.clone(),
740        )?;
741        if !table_tree.verify_checksums()? {
742            return Ok(false);
743        }
744        let system_table_tree = TableTree::new(
745            mem.get_system_root(),
746            PageHint::None,
747            Arc::new(TransactionGuard::Verification),
748            mem.clone(),
749        )?;
750        if !system_table_tree.verify_checksums()? {
751            return Ok(false);
752        }
753
754        Ok(true)
755    }
756
757    /// Like `verify_primary_checksums` but collects per-page corruption details.
758    #[cfg(feature = "std")]
759    pub(crate) fn verify_primary_checksums_detailed(
760        mem: Arc<TransactionalMemory>,
761    ) -> Result<(u64, Vec<CorruptPageInfo>)> {
762        let mut total_pages = 0u64;
763        let mut all_corruptions = Vec::new();
764
765        let table_tree = TableTree::new(
766            mem.get_data_root(),
767            PageHint::None,
768            Arc::new(TransactionGuard::Verification),
769            mem.clone(),
770        )?;
771        let (pages, corruptions) = table_tree.verify_checksums_detailed()?;
772        total_pages += pages;
773        all_corruptions.extend(corruptions);
774
775        let system_table_tree = TableTree::new(
776            mem.get_system_root(),
777            PageHint::None,
778            Arc::new(TransactionGuard::Verification),
779            mem.clone(),
780        )?;
781        let (pages, corruptions) = system_table_tree.verify_checksums_detailed()?;
782        total_pages += pages;
783        all_corruptions.extend(corruptions);
784
785        Ok((total_pages, all_corruptions))
786    }
787
788    /// Like `verify_primary_checksums` but verifies B-tree structural invariants.
789    #[cfg(feature = "std")]
790    pub(crate) fn verify_primary_structure(
791        mem: Arc<TransactionalMemory>,
792    ) -> Result<Vec<CorruptPageInfo>> {
793        let mut all_corruptions = Vec::new();
794
795        let table_tree = TableTree::new(
796            mem.get_data_root(),
797            PageHint::None,
798            Arc::new(TransactionGuard::Verification),
799            mem.clone(),
800        )?;
801        all_corruptions.extend(table_tree.verify_structure_detailed()?);
802
803        let system_table_tree = TableTree::new(
804            mem.get_system_root(),
805            PageHint::None,
806            Arc::new(TransactionGuard::Verification),
807            mem.clone(),
808        )?;
809        all_corruptions.extend(system_table_tree.verify_structure_detailed()?);
810
811        Ok(all_corruptions)
812    }
813
814    /// Creates a consistent backup of the database at the given path.
815    ///
816    /// The backup captures a snapshot of the last committed transaction. This method
817    /// can be called while other read or write transactions are active -- it will not
818    /// block writers and will not include uncommitted data.
819    ///
820    /// The resulting file is a valid redb database. Open it with [`Database::open`]
821    /// (recommended, handles any needed repair) or [`Builder::open`].
822    #[cfg(feature = "std")]
823    #[allow(clippy::cast_possible_truncation)]
824    pub fn backup(&self, path: impl AsRef<Path>) -> Result<(), StorageError> {
825        use std::io::Write;
826
827        const CHUNK_SIZE: usize = 1024 * 1024; // 1 MB
828
829        // Pin a consistent snapshot so pages aren't reclaimed during the copy
830        let _read_txn = self.begin_read().map_err(|e| e.into_storage_error())?;
831
832        // Flush pending writes to ensure we copy a complete state
833        self.mem.flush_data()?;
834
835        let file_len = self.mem.raw_len()?;
836        let mut dest =
837            File::create(path.as_ref()).map_err(|e| StorageError::Io(BackendError::Io(e)))?;
838        let mut buf = vec![0u8; CHUNK_SIZE];
839        let mut offset = 0u64;
840
841        while offset < file_len {
842            let remaining = (file_len - offset) as usize;
843            let to_read = remaining.min(CHUNK_SIZE);
844            let chunk = &mut buf[..to_read];
845            self.mem.read_raw(offset, chunk)?;
846            dest.write_all(chunk)
847                .map_err(|e| StorageError::Io(BackendError::Io(e)))?;
848            offset += to_read as u64;
849        }
850
851        dest.sync_all()
852            .map_err(|e| StorageError::Io(BackendError::Io(e)))?;
853
854        Ok(())
855    }
856
857    /// Verifies the integrity of a backup (or any redb database file) without modifying it.
858    ///
859    /// This is a standalone function that does not require an open [`Database`].
860    /// The file is opened read-only and is never modified, making it safe to run on
861    /// backup files, read-only media, or files in use by another process.
862    ///
863    /// # Verification levels
864    /// - [`VerifyLevel::Header`]: Verifies magic number and commit slot checksums (~instant)
865    /// - [`VerifyLevel::Pages`]: Header + walks all B-tree pages verifying XXH3-128 checksums
866    /// - [`VerifyLevel::Full`]: Pages + verifies B-tree structural invariants (key ordering,
867    ///   valid child pointers, consistent tree depth)
868    #[cfg(feature = "std")]
869    pub fn verify_backup(
870        path: impl AsRef<Path>,
871        level: VerifyLevel,
872    ) -> core::result::Result<VerifyReport, DatabaseError> {
873        let start = Instant::now();
874        let file = OpenOptions::new().read(true).open(path.as_ref())?;
875        let backend: Box<dyn StorageBackend> = Box::new(
876            crate::tree_store::file_backend::FileBackend::new_internal(file, true)?,
877        );
878
879        let (mem, header_valid) =
880            TransactionalMemory::new_for_verify(backend, PAGE_SIZE, None, CompressionConfig::None)?;
881
882        if level == VerifyLevel::Header {
883            return Ok(VerifyReport {
884                valid: header_valid,
885                header_valid,
886                pages_checked: 0,
887                pages_corrupt: 0,
888                structural_valid: None,
889                corrupt_details: Vec::new(),
890                duration: start.elapsed(),
891            });
892        }
893
894        let mem = Arc::new(mem);
895        let (pages_checked, mut corrupt_details) =
896            Self::verify_primary_checksums_detailed(mem.clone())?;
897        let pages_corrupt = corrupt_details.len() as u64;
898
899        let structural_valid = if level == VerifyLevel::Full {
900            let structural_corruptions = Self::verify_primary_structure(mem)?;
901            if !structural_corruptions.is_empty() {
902                corrupt_details.extend(structural_corruptions);
903                Some(false)
904            } else {
905                Some(true)
906            }
907        } else {
908            None
909        };
910
911        let valid = header_valid && pages_corrupt == 0 && structural_valid.unwrap_or(true);
912
913        Ok(VerifyReport {
914            valid,
915            header_valid,
916            pages_checked,
917            pages_corrupt,
918            structural_valid,
919            corrupt_details,
920            duration: start.elapsed(),
921        })
922    }
923
924    /// Best-effort recovery of data from a corrupted database file.
925    ///
926    /// Opens `corrupted_path` read-only, walks every discoverable table's B-tree
927    /// (skipping corrupted subtrees), and writes all recoverable key/value pairs
928    /// into a fresh database at `output_path`.
929    ///
930    /// Recovered tables use raw `&[u8]` key/value types. If the original table
931    /// used typed keys (e.g. `&str`, `u64`), the caller must re-interpret the raw
932    /// bytes after recovery.
933    ///
934    /// Returns a [`SalvageReport`] summarising what was recovered and what was lost.
935    #[cfg(feature = "std")]
936    pub fn salvage(
937        corrupted_path: impl AsRef<Path>,
938        output_path: impl AsRef<Path>,
939    ) -> core::result::Result<SalvageReport, DatabaseError> {
940        let start = Instant::now();
941        let mut corrupt_details: Vec<CorruptPageInfo> = Vec::new();
942        let mut tables_recovered = 0u64;
943        let mut rows_recovered = 0u64;
944        let mut rows_lost = 0u64;
945
946        // 1. Open corrupted file read-only
947        let file = OpenOptions::new()
948            .read(true)
949            .open(corrupted_path.as_ref())?;
950        let backend: Box<dyn crate::StorageBackend> = Box::new(
951            crate::tree_store::file_backend::FileBackend::new_internal(file, true)?,
952        );
953
954        let (mem, _header_valid) =
955            TransactionalMemory::new_for_verify(backend, PAGE_SIZE, None, CompressionConfig::None)?;
956        let mem = Arc::new(mem);
957
958        // 2. Discover tables from the data root (user table tree)
959        let data_root = mem.get_data_root();
960        let table_entries =
961            Self::salvage_discover_tables(mem.clone(), data_root, &mut corrupt_details);
962        let tables_found = table_entries.len() as u64;
963
964        // 3. Create output database
965        let output_db = Database::builder().create(output_path.as_ref())?;
966
967        // 4. For each table, extract k/v pairs and write to output
968        for (table_name, definition) in &table_entries {
969            let (table_root, fixed_key_size, fixed_value_size) = match definition {
970                InternalTableDefinition::Normal {
971                    table_root,
972                    fixed_key_size,
973                    fixed_value_size,
974                    ..
975                }
976                | InternalTableDefinition::Multimap {
977                    table_root,
978                    fixed_key_size,
979                    fixed_value_size,
980                    ..
981                } => (*table_root, *fixed_key_size, *fixed_value_size),
982            };
983
984            let Some(root) = table_root else {
985                continue;
986            };
987
988            let effective_value_size = if mem.compression().is_enabled() {
989                None
990            } else {
991                fixed_value_size
992            };
993
994            let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
995            let mut table_corruptions: Vec<CorruptPageInfo> = Vec::new();
996
997            let table_rows = salvage_tree_leaves(
998                root,
999                &mem,
1000                fixed_key_size,
1001                effective_value_size,
1002                &mut pairs,
1003                &mut table_corruptions,
1004            );
1005
1006            // Annotate corruption entries with the table name
1007            for c in &mut table_corruptions {
1008                c.table_name = Some(table_name.clone());
1009            }
1010
1011            if !pairs.is_empty() {
1012                // Write recovered pairs to output database.
1013                // SAFETY: We leak the table name to get a &'static str required by
1014                // TableDefinition. This is acceptable because salvage is a one-shot
1015                // recovery operation, not a long-running server loop.
1016                let leaked_name: &'static str = Box::leak(table_name.clone().into_boxed_str());
1017                let raw_def: TableDefinition<&[u8], &[u8]> = TableDefinition::new(leaked_name);
1018                let write_txn = output_db
1019                    .begin_write()
1020                    .map_err(|e| DatabaseError::Storage(e.into_storage_error()))?;
1021                {
1022                    let mut table = write_txn.open_table(raw_def).map_err(|e| {
1023                        DatabaseError::Storage(
1024                            e.into_storage_error_or_internal("salvage: open_table"),
1025                        )
1026                    })?;
1027                    for (key, value) in &pairs {
1028                        let _ = table.insert(key.as_slice(), value.as_slice());
1029                    }
1030                }
1031                write_txn
1032                    .commit()
1033                    .map_err(|e| DatabaseError::Storage(e.into_storage_error()))?;
1034                tables_recovered += 1;
1035            }
1036
1037            rows_recovered += table_rows;
1038            // Estimate lost rows from corruption count (each corrupt page likely held some rows)
1039            rows_lost += table_corruptions.len() as u64;
1040            corrupt_details.extend(table_corruptions);
1041        }
1042
1043        Ok(SalvageReport {
1044            tables_found,
1045            tables_recovered,
1046            rows_recovered,
1047            rows_lost,
1048            corrupt_details,
1049            duration: start.elapsed(),
1050        })
1051    }
1052
1053    /// Discover tables from the system root by tolerantly walking the master table tree.
1054    #[cfg(feature = "std")]
1055    fn salvage_discover_tables(
1056        mem: Arc<TransactionalMemory>,
1057        system_root: Option<BtreeHeader>,
1058        corruptions: &mut Vec<CorruptPageInfo>,
1059    ) -> Vec<(String, InternalTableDefinition)> {
1060        let Some(root) = system_root else {
1061            return Vec::new();
1062        };
1063
1064        // The master table tree stores (&str -> InternalTableDefinition) pairs.
1065        // Key size: variable (None), Value size: variable (None).
1066        let mut raw_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1067        salvage_tree_leaves(root, &mem, None, None, &mut raw_pairs, corruptions);
1068
1069        let mut tables = Vec::new();
1070        for (key_bytes, value_bytes) in &raw_pairs {
1071            let name = match core::str::from_utf8(key_bytes) {
1072                Ok(s) => s.to_string(),
1073                Err(_) => continue,
1074            };
1075            // Skip internal system tables (names starting with NUL)
1076            if name.starts_with('\0') {
1077                continue;
1078            }
1079            // Parse table definition tolerantly -- corrupt bytes may cause panics
1080            let vb = value_bytes.clone();
1081            let parsed = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1082                <InternalTableDefinition as crate::types::Value>::from_bytes(&vb)
1083            }));
1084            match parsed {
1085                Ok(definition) => tables.push((name, definition)),
1086                Err(_) => {
1087                    corruptions.push(CorruptPageInfo {
1088                        page_number: 0,
1089                        table_name: Some(name),
1090                        description: "corrupt table definition".to_string(),
1091                    });
1092                }
1093            }
1094        }
1095
1096        tables
1097    }
1098
1099    /// Verifies the integrity of an open database without modifying it.
1100    ///
1101    /// Unlike [`check_integrity`](Self::check_integrity) which repairs the database and
1102    /// commits changes, this method is purely read-only and returns a detailed report.
1103    /// It can be called while read or write transactions are active.
1104    ///
1105    /// # Verification levels
1106    /// - [`VerifyLevel::Header`]: Verifies commit slot checksums (~instant)
1107    /// - [`VerifyLevel::Pages`]: Header + walks all B-tree pages verifying XXH3-128 checksums
1108    /// - [`VerifyLevel::Full`]: Pages + verifies B-tree structural invariants
1109    #[cfg(feature = "std")]
1110    pub fn verify_integrity(&self, level: VerifyLevel) -> Result<VerifyReport> {
1111        let start = Instant::now();
1112
1113        // Header is always valid for an open database (it was validated on open)
1114        let header_valid = true;
1115
1116        if level == VerifyLevel::Header {
1117            return Ok(VerifyReport {
1118                valid: true,
1119                header_valid,
1120                pages_checked: 0,
1121                pages_corrupt: 0,
1122                structural_valid: None,
1123                corrupt_details: Vec::new(),
1124                duration: start.elapsed(),
1125            });
1126        }
1127
1128        let (pages_checked, mut corrupt_details) =
1129            Self::verify_primary_checksums_detailed(self.mem.clone())?;
1130        let pages_corrupt = corrupt_details.len() as u64;
1131
1132        let structural_valid = if level == VerifyLevel::Full {
1133            let structural_corruptions = Self::verify_primary_structure(self.mem.clone())?;
1134            if !structural_corruptions.is_empty() {
1135                corrupt_details.extend(structural_corruptions);
1136                Some(false)
1137            } else {
1138                Some(true)
1139            }
1140        } else {
1141            None
1142        };
1143
1144        let valid = pages_corrupt == 0 && structural_valid.unwrap_or(true);
1145
1146        Ok(VerifyReport {
1147            valid,
1148            header_valid,
1149            pages_checked,
1150            pages_corrupt,
1151            structural_valid,
1152            corrupt_details,
1153            duration: start.elapsed(),
1154        })
1155    }
1156
1157    /// Force a check of the integrity of the database file, and repair it if possible.
1158    ///
1159    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
1160    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
1161    /// quite slow and should only be used when you suspect the database file may have been modified
1162    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
1163    ///
1164    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
1165    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
1166    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
1167        let allocator_hash = self.mem.allocator_hash();
1168        let mut was_clean = Arc::get_mut(&mut self.mem)
1169            .ok_or_else(|| {
1170                DatabaseError::Storage(StorageError::invalid_config(
1171                    "check_integrity() requires exclusive database access, but other references to the memory exist",
1172                ))
1173            })?
1174            .clear_cache_and_reload()?;
1175
1176        let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
1177
1178        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
1179            DatabaseError::Storage(storage_err) => storage_err,
1180            _ => StorageError::Internal(
1181                "unexpected non-storage error during integrity check repair".to_string(),
1182            ),
1183        })?;
1184
1185        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
1186            was_clean = false;
1187        }
1188
1189        if !was_clean {
1190            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next()?;
1191            let [data_root, system_root] = new_roots;
1192            self.mem.commit(
1193                data_root,
1194                system_root,
1195                next_transaction_id,
1196                true,
1197                ShrinkPolicy::Never,
1198            )?;
1199        }
1200
1201        self.mem.begin_writable()?;
1202
1203        Ok(was_clean)
1204    }
1205
1206    /// Compacts the database file
1207    ///
1208    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
1209    pub fn compact(&mut self) -> Result<bool, CompactionError> {
1210        if self
1211            .transaction_tracker
1212            .oldest_live_read_transaction()
1213            .map_err(CompactionError::Storage)?
1214            .is_some()
1215        {
1216            return Err(CompactionError::TransactionInProgress);
1217        }
1218        // Commit to free up any pending free pages
1219        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
1220        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
1221        // can cancel the compaction without requiring a full repair afterwards
1222        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1223        if txn.list_persistent_savepoints()?.next().is_some() {
1224            return Err(CompactionError::PersistentSavepointExists);
1225        }
1226        if self
1227            .transaction_tracker
1228            .any_savepoint_exists()
1229            .map_err(CompactionError::Storage)?
1230        {
1231            return Err(CompactionError::EphemeralSavepointExists);
1232        }
1233        txn.set_two_phase_commit(true);
1234        txn.commit().map_err(|e| e.into_storage_error())?;
1235        // Repeat, just in case executing list_persistent_savepoints() created a new table
1236        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1237        txn.set_two_phase_commit(true);
1238        txn.commit().map_err(|e| e.into_storage_error())?;
1239        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
1240        // should have been cleared out by the above commit()
1241        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1242        assert!(!txn.pending_free_pages()?);
1243        txn.abort()?;
1244
1245        let mut compacted = false;
1246        // Iteratively compact until no progress is made
1247        loop {
1248            let mut progress = false;
1249
1250            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1251            if txn.compact_pages()? {
1252                progress = true;
1253                txn.commit().map_err(|e| e.into_storage_error())?;
1254            } else {
1255                txn.abort()?;
1256            }
1257
1258            // Double commit to free up the relocated pages for reuse
1259            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1260            txn.set_two_phase_commit(true);
1261            // Also shrink the database file by the maximum amount
1262            txn.set_shrink_policy(ShrinkPolicy::Maximum);
1263            txn.commit().map_err(|e| e.into_storage_error())?;
1264            // Triple commit to free up the relocated pages for reuse
1265            // TODO: this really shouldn't be necessary, but the data freed tree is a system table
1266            // and so free'ing up its pages causes more deletes from the system tree
1267            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1268            txn.set_two_phase_commit(true);
1269            // Also shrink the database file by the maximum amount
1270            txn.set_shrink_policy(ShrinkPolicy::Maximum);
1271            txn.commit().map_err(|e| e.into_storage_error())?;
1272            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1273            assert!(!txn.pending_free_pages()?);
1274            txn.abort()?;
1275
1276            if !progress {
1277                break;
1278            }
1279
1280            compacted = true;
1281        }
1282
1283        Ok(compacted)
1284    }
1285
1286    /// Compacts the blob region, removing dead space left by deleted blobs.
1287    ///
1288    /// Uses a two-pass crash-safe algorithm:
1289    /// 1. Appends live blobs after the current region end, updates all offsets, commits.
1290    /// 2. Shifts live data to the start of the region, updates offsets, commits.
1291    /// 3. Truncates the file.
1292    ///
1293    /// Same constraints as [`compact()`](Self::compact): no active read transactions
1294    /// or persistent/ephemeral savepoints.
1295    pub fn compact_blobs(&mut self) -> core::result::Result<BlobCompactionReport, CompactionError> {
1296        if self
1297            .transaction_tracker
1298            .oldest_live_read_transaction()
1299            .map_err(CompactionError::Storage)?
1300            .is_some()
1301        {
1302            return Err(CompactionError::TransactionInProgress);
1303        }
1304
1305        // Check savepoint constraints (same as compact())
1306        {
1307            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1308            if txn.list_persistent_savepoints()?.next().is_some() {
1309                txn.abort().map_err(CompactionError::Storage)?;
1310                return Err(CompactionError::PersistentSavepointExists);
1311            }
1312            if self
1313                .transaction_tracker
1314                .any_savepoint_exists()
1315                .map_err(CompactionError::Storage)?
1316            {
1317                txn.abort().map_err(CompactionError::Storage)?;
1318                return Err(CompactionError::EphemeralSavepointExists);
1319            }
1320            txn.abort().map_err(CompactionError::Storage)?;
1321        }
1322
1323        // Gather stats to check if compaction is needed
1324        let stats = {
1325            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1326            let s = txn.blob_stats().map_err(CompactionError::Storage)?;
1327            txn.abort().map_err(CompactionError::Storage)?;
1328            s
1329        };
1330
1331        if stats.dead_bytes == 0 {
1332            return Ok(BlobCompactionReport {
1333                blobs_relocated: 0,
1334                live_bytes: stats.live_bytes,
1335                bytes_reclaimed: 0,
1336                was_noop: true,
1337            });
1338        }
1339
1340        let old_region_length = stats.region_bytes;
1341
1342        // -- Pass 1: Append live blobs after current region end ----------------
1343        let (blobs_relocated, total_live_size) = {
1344            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1345            let result = txn.compact_blobs_pass(false);
1346            match result {
1347                Ok(r) => {
1348                    txn.set_two_phase_commit(true);
1349                    txn.commit().map_err(|e| e.into_storage_error())?;
1350                    r
1351                }
1352                Err(e) => {
1353                    txn.abort().map_err(CompactionError::Storage)?;
1354                    return Err(CompactionError::Storage(e));
1355                }
1356            }
1357        };
1358
1359        // -- Pass 2: Shift data to region start -------------------------------
1360        {
1361            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1362            let result = txn.compact_blobs_pass(true);
1363            match result {
1364                Ok(_) => {
1365                    txn.set_two_phase_commit(true);
1366                    txn.commit().map_err(|e| e.into_storage_error())?;
1367                }
1368                Err(e) => {
1369                    txn.abort().map_err(CompactionError::Storage)?;
1370                    return Err(CompactionError::Storage(e));
1371                }
1372            }
1373        }
1374
1375        // -- Truncate the file ------------------------------------------------
1376        // IMPORTANT: Use committed state only. get_blob_state() may return
1377        // non-durable pending state (if a prior Durability::None blob write
1378        // set pending_blob_state.next_sequence != 0), which would cause
1379        // truncation to a wrong length and permanent database corruption.
1380        let blob_state = self.mem.get_committed_blob_state();
1381        let target_len = blob_state.region_offset + blob_state.region_length;
1382        if target_len > 0 {
1383            self.mem
1384                .truncate_to(target_len)
1385                .map_err(CompactionError::Storage)?;
1386        }
1387
1388        Ok(BlobCompactionReport {
1389            blobs_relocated,
1390            live_bytes: total_live_size,
1391            bytes_reclaimed: old_region_length - total_live_size,
1392            was_noop: false,
1393        })
1394    }
1395
1396    /// Checks blob region statistics against the configured
1397    /// [`BlobCompactionPolicy`] and returns `Some(stats)` if compaction is
1398    /// recommended, or `None` if the region is healthy.
1399    ///
1400    /// This is purely advisory -- the database never auto-compacts.
1401    pub fn should_compact_blobs(&self) -> Result<Option<BlobStats>, TransactionError> {
1402        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1403        let stats = txn.blob_stats().map_err(TransactionError::Storage)?;
1404        txn.abort().map_err(TransactionError::Storage)?;
1405
1406        let policy = &self.blob_compaction_policy;
1407        if stats.dead_bytes >= policy.min_dead_bytes
1408            && stats.fragmentation_ratio >= policy.fragmentation_threshold
1409        {
1410            Ok(Some(stats))
1411        } else {
1412            Ok(None)
1413        }
1414    }
1415
1416    /// Like [`compact_blobs()`](Self::compact_blobs), but invokes `callback`
1417    /// after each pass with `(blobs_processed, total_blobs, bytes_processed,
1418    /// total_bytes)`. Return `false` from the callback to cancel compaction.
1419    ///
1420    /// Same constraints as `compact_blobs`: no active read transactions or
1421    /// persistent/ephemeral savepoints.
1422    pub fn compact_blobs_with_progress(
1423        &mut self,
1424        mut callback: impl FnMut(u64, u64, u64, u64) -> bool,
1425    ) -> core::result::Result<BlobCompactionReport, CompactionError> {
1426        if self
1427            .transaction_tracker
1428            .oldest_live_read_transaction()
1429            .map_err(CompactionError::Storage)?
1430            .is_some()
1431        {
1432            return Err(CompactionError::TransactionInProgress);
1433        }
1434
1435        // Check savepoint constraints
1436        {
1437            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1438            if txn.list_persistent_savepoints()?.next().is_some() {
1439                txn.abort().map_err(CompactionError::Storage)?;
1440                return Err(CompactionError::PersistentSavepointExists);
1441            }
1442            if self
1443                .transaction_tracker
1444                .any_savepoint_exists()
1445                .map_err(CompactionError::Storage)?
1446            {
1447                txn.abort().map_err(CompactionError::Storage)?;
1448                return Err(CompactionError::EphemeralSavepointExists);
1449            }
1450            txn.abort().map_err(CompactionError::Storage)?;
1451        }
1452
1453        // Gather stats
1454        let stats = {
1455            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1456            let s = txn.blob_stats().map_err(CompactionError::Storage)?;
1457            txn.abort().map_err(CompactionError::Storage)?;
1458            s
1459        };
1460
1461        if stats.dead_bytes == 0 {
1462            return Ok(BlobCompactionReport {
1463                blobs_relocated: 0,
1464                live_bytes: stats.live_bytes,
1465                bytes_reclaimed: 0,
1466                was_noop: true,
1467            });
1468        }
1469
1470        let old_region_length = stats.region_bytes;
1471        let total_blobs = stats.blob_count;
1472        let total_bytes = stats.live_bytes;
1473
1474        // Progress: 0% before pass 1
1475        if !callback(0, total_blobs, 0, total_bytes) {
1476            return Err(CompactionError::Cancelled);
1477        }
1478
1479        // -- Pass 1: Append live blobs after current region end ----------------
1480        let (blobs_relocated, total_live_size) = {
1481            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1482            let result = txn.compact_blobs_pass(false);
1483            match result {
1484                Ok(r) => {
1485                    txn.set_two_phase_commit(true);
1486                    txn.commit().map_err(|e| e.into_storage_error())?;
1487                    r
1488                }
1489                Err(e) => {
1490                    txn.abort().map_err(CompactionError::Storage)?;
1491                    return Err(CompactionError::Storage(e));
1492                }
1493            }
1494        };
1495
1496        // Progress: ~50% after pass 1
1497        if !callback(blobs_relocated, total_blobs, total_live_size, total_bytes) {
1498            // Pass 1 data is committed but pass 2 hasn't shifted yet.
1499            // The database is consistent -- blobs point to appended area.
1500            return Err(CompactionError::Cancelled);
1501        }
1502
1503        // -- Pass 2: Shift data to region start -------------------------------
1504        {
1505            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1506            let result = txn.compact_blobs_pass(true);
1507            match result {
1508                Ok(_) => {
1509                    txn.set_two_phase_commit(true);
1510                    txn.commit().map_err(|e| e.into_storage_error())?;
1511                }
1512                Err(e) => {
1513                    txn.abort().map_err(CompactionError::Storage)?;
1514                    return Err(CompactionError::Storage(e));
1515                }
1516            }
1517        }
1518
1519        // -- Truncate the file ------------------------------------------------
1520        let blob_state = self.mem.get_committed_blob_state();
1521        let target_len = blob_state.region_offset + blob_state.region_length;
1522        if target_len > 0 {
1523            self.mem
1524                .truncate_to(target_len)
1525                .map_err(CompactionError::Storage)?;
1526        }
1527
1528        // Final progress: 100%
1529        let _ = callback(blobs_relocated, total_blobs, total_live_size, total_bytes);
1530
1531        Ok(BlobCompactionReport {
1532            blobs_relocated,
1533            live_bytes: total_live_size,
1534            bytes_reclaimed: old_region_length - total_live_size,
1535            was_noop: false,
1536        })
1537    }
1538
1539    /// Starts an incremental online compaction that allows concurrent readers.
1540    ///
1541    /// Unlike [`compact()`](Self::compact) which requires `&mut self` and blocks
1542    /// all readers, this method takes `&self` and performs compaction in small steps.
1543    /// Each step briefly acquires a write lock, relocates a batch of pages, and releases
1544    /// it -- allowing read transactions to proceed between steps.
1545    ///
1546    /// Persistent and ephemeral savepoints are not allowed during compaction because
1547    /// they pin old page versions indefinitely.
1548    ///
1549    /// # Example
1550    ///
1551    /// ```no_run
1552    /// # use shodh_redb::Database;
1553    /// let db = Database::create("my.db").unwrap();
1554    /// let handle = db.start_compaction().unwrap();
1555    /// let total = handle.run().unwrap();
1556    /// println!("Relocated {} pages", total);
1557    /// ```
1558    pub fn start_compaction(&self) -> core::result::Result<CompactionHandle<'_>, CompactionError> {
1559        // Check for persistent savepoints (they pin old page versions forever)
1560        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1561        if txn.list_persistent_savepoints()?.next().is_some() {
1562            txn.abort().map_err(CompactionError::Storage)?;
1563            return Err(CompactionError::PersistentSavepointExists);
1564        }
1565        if self
1566            .transaction_tracker
1567            .any_savepoint_exists()
1568            .map_err(CompactionError::Storage)?
1569        {
1570            txn.abort().map_err(CompactionError::Storage)?;
1571            return Err(CompactionError::EphemeralSavepointExists);
1572        }
1573        txn.abort().map_err(CompactionError::Storage)?;
1574
1575        Ok(CompactionHandle { db: self })
1576    }
1577
1578    /// Starts online blob compaction that allows concurrent readers between
1579    /// phases.
1580    ///
1581    /// Unlike [`compact_blobs()`](Self::compact_blobs), this takes `&self`
1582    /// and splits the work into two phases. Between phases, read transactions
1583    /// can proceed normally.
1584    ///
1585    /// No active read transactions may exist when the handle is created.
1586    /// Persistent and ephemeral savepoints are not allowed.
1587    pub fn start_blob_compaction(
1588        &self,
1589    ) -> core::result::Result<BlobCompactionHandle<'_>, CompactionError> {
1590        if self
1591            .transaction_tracker
1592            .oldest_live_read_transaction()
1593            .map_err(CompactionError::Storage)?
1594            .is_some()
1595        {
1596            return Err(CompactionError::TransactionInProgress);
1597        }
1598
1599        // Check savepoint constraints
1600        {
1601            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1602            if txn.list_persistent_savepoints()?.next().is_some() {
1603                txn.abort().map_err(CompactionError::Storage)?;
1604                return Err(CompactionError::PersistentSavepointExists);
1605            }
1606            if self
1607                .transaction_tracker
1608                .any_savepoint_exists()
1609                .map_err(CompactionError::Storage)?
1610            {
1611                txn.abort().map_err(CompactionError::Storage)?;
1612                return Err(CompactionError::EphemeralSavepointExists);
1613            }
1614            txn.abort().map_err(CompactionError::Storage)?;
1615        }
1616
1617        // Gather stats
1618        let stats = {
1619            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1620            let s = txn.blob_stats().map_err(CompactionError::Storage)?;
1621            txn.abort().map_err(CompactionError::Storage)?;
1622            s
1623        };
1624
1625        if stats.dead_bytes == 0 {
1626            // No dead space -- return a handle that reports complete immediately
1627            return Ok(BlobCompactionHandle {
1628                db: self,
1629                stats,
1630                phase: 2,
1631                blobs_relocated: 0,
1632                live_bytes: stats.live_bytes,
1633            });
1634        }
1635
1636        Ok(BlobCompactionHandle {
1637            db: self,
1638            stats,
1639            phase: 0,
1640            blobs_relocated: 0,
1641            live_bytes: 0,
1642        })
1643    }
1644
1645    /// Starts a background integrity scanner that periodically walks all
1646    /// B-tree pages and checks xxh3-128 checksums.
1647    ///
1648    /// The scanner runs on a dedicated thread and never blocks normal
1649    /// read/write traffic. Results are available via the returned handle.
1650    ///
1651    /// The thread is automatically stopped when the handle is dropped.
1652    #[cfg(feature = "std")]
1653    pub fn start_integrity_scanner(
1654        &self,
1655        config: crate::integrity_scanner::IntegrityScannerConfig,
1656    ) -> Result<crate::integrity_scanner::IntegrityScannerHandle, DatabaseError> {
1657        crate::integrity_scanner::IntegrityScannerHandle::start(self.mem.clone(), config)
1658            .map_err(DatabaseError::from)
1659    }
1660
1661    /// Export a logical incremental snapshot of all key/value changes since
1662    /// `since_txn`.
1663    ///
1664    /// Requires [`Builder::set_history_retention()`] > 0 and `since_txn` must
1665    /// still be within the retention window.
1666    #[cfg(feature = "std")]
1667    pub fn export_incremental(
1668        &self,
1669        since_txn: u64,
1670    ) -> core::result::Result<crate::incremental::IncrementalSnapshot, StorageError> {
1671        crate::incremental::export_incremental(self, since_txn)
1672    }
1673
1674    /// Apply an incremental snapshot to this database.
1675    ///
1676    /// Performs a logical replay: upserted entries are inserted, deleted
1677    /// entries are removed, and dropped tables are deleted. Executes as a
1678    /// single atomic write transaction.
1679    #[cfg(feature = "std")]
1680    pub fn import_incremental(
1681        &self,
1682        snapshot: &crate::incremental::IncrementalSnapshot,
1683    ) -> core::result::Result<crate::incremental::IncrementalImportReport, StorageError> {
1684        crate::incremental::import_incremental(self, snapshot)
1685    }
1686
1687    /// Write an incremental delta file containing only changes since `since_txn`.
1688    ///
1689    /// The destination file is a portable delta (not a valid database file)
1690    /// that can be applied with [`apply_incremental_backup()`](Self::apply_incremental_backup).
1691    #[cfg(feature = "std")]
1692    pub fn backup_incremental(
1693        &self,
1694        dest: impl AsRef<std::path::Path>,
1695        since_txn: u64,
1696    ) -> core::result::Result<crate::incremental::IncrementalBackupReport, StorageError> {
1697        crate::incremental::backup_incremental(self, dest.as_ref(), since_txn)
1698    }
1699
1700    /// Apply an incremental delta file produced by [`backup_incremental()`](Self::backup_incremental).
1701    ///
1702    /// Reads the file, verifies its SHA-256 integrity, and performs a logical
1703    /// import identical to [`import_incremental()`](Self::import_incremental).
1704    #[cfg(feature = "std")]
1705    pub fn apply_incremental_backup(
1706        &self,
1707        path: impl AsRef<std::path::Path>,
1708    ) -> core::result::Result<crate::incremental::IncrementalImportReport, StorageError> {
1709        crate::incremental::apply_incremental_backup(self, path.as_ref())
1710    }
1711
1712    #[cfg_attr(not(debug_assertions), expect(dead_code))]
1713    fn check_repaired_allocated_pages_table(
1714        system_root: Option<BtreeHeader>,
1715        mem: Arc<TransactionalMemory>,
1716    ) -> Result {
1717        let table_tree = TableTree::new(
1718            system_root,
1719            PageHint::None,
1720            Arc::new(TransactionGuard::Verification),
1721            mem.clone(),
1722        )?;
1723        if let Some(table_def) = table_tree
1724            .get_table::<TransactionIdWithPagination, PageList>(
1725                DATA_ALLOCATED_TABLE.name(),
1726                TableType::Normal,
1727            )
1728            .map_err(|e| e.into_storage_error_or_internal("Allocated pages table corrupted"))?
1729        {
1730            let InternalTableDefinition::Normal { table_root, .. } = table_def else {
1731                return Err(StorageError::Internal(
1732                    "unexpected non-normal table type for allocated pages table".to_string(),
1733                ));
1734            };
1735            let table: ReadOnlyTable<TransactionIdWithPagination, PageList> =
1736                ReadOnlyTable::new_uncompressed(
1737                    DATA_ALLOCATED_TABLE.name().to_string(),
1738                    table_root,
1739                    PageHint::None,
1740                    Arc::new(TransactionGuard::Verification),
1741                    mem.clone(),
1742                )?;
1743            for result in table.range::<TransactionIdWithPagination>(..)? {
1744                let (_, pages) = result?;
1745                for i in 0..pages.value().len() {
1746                    assert!(mem.is_allocated(pages.value().get(i)));
1747                }
1748            }
1749        }
1750
1751        Ok(())
1752    }
1753
1754    fn visit_freed_tree<K: Key, V: Value, F>(
1755        system_root: Option<BtreeHeader>,
1756        table_def: SystemTableDefinition<K, V>,
1757        mem: Arc<TransactionalMemory>,
1758        mut visitor: F,
1759    ) -> Result
1760    where
1761        F: FnMut(PageNumber) -> Result,
1762    {
1763        let fake_guard = Arc::new(TransactionGuard::Verification);
1764        let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
1765        let table_name = table_def.name();
1766        let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
1767            Ok(result) => result,
1768            Err(TableError::Storage(err)) => {
1769                return Err(err);
1770            }
1771            Err(TableError::TableDoesNotExist(_)) => {
1772                return Ok(());
1773            }
1774            Err(_) => {
1775                return Err(StorageError::Corrupted(format!(
1776                    "Unable to open {table_name}"
1777                )));
1778            }
1779        };
1780
1781        if let Some(definition) = result {
1782            let table_root = match definition {
1783                InternalTableDefinition::Normal { table_root, .. } => table_root,
1784                InternalTableDefinition::Multimap { .. } => {
1785                    return Err(StorageError::Corrupted(
1786                        "unexpected multimap table type in freed tree lookup".to_string(),
1787                    ));
1788                }
1789            };
1790            let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
1791                ReadOnlyTable::new_uncompressed(
1792                    table_name.to_string(),
1793                    table_root,
1794                    PageHint::None,
1795                    Arc::new(TransactionGuard::Verification),
1796                    mem.clone(),
1797                )?;
1798            for result in table.range::<TransactionIdWithPagination>(..)? {
1799                let (_, page_list) = result?;
1800                for i in 0..page_list.value().len() {
1801                    visitor(page_list.value().get(i))?;
1802                }
1803            }
1804        }
1805
1806        Ok(())
1807    }
1808
1809    #[cfg(debug_assertions)]
1810    fn mark_allocated_page_for_debug(
1811        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
1812    ) -> Result {
1813        let data_root = mem.get_data_root();
1814        {
1815            let fake = Arc::new(TransactionGuard::Verification);
1816            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
1817            tables.visit_all_pages(|path| {
1818                mem.mark_debug_allocated_page(path.page_number());
1819                Ok(())
1820            })?;
1821        }
1822
1823        let system_root = mem.get_system_root();
1824        {
1825            let fake = Arc::new(TransactionGuard::Verification);
1826            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
1827            system_tables.visit_all_pages(|path| {
1828                mem.mark_debug_allocated_page(path.page_number());
1829                Ok(())
1830            })?;
1831        }
1832
1833        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
1834            mem.mark_debug_allocated_page(page);
1835            Ok(())
1836        })?;
1837        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
1838            mem.mark_debug_allocated_page(page);
1839            Ok(())
1840        })?;
1841
1842        Ok(())
1843    }
1844
1845    fn do_repair(
1846        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
1847        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
1848    ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
1849        if !Self::verify_primary_checksums(mem.clone())? {
1850            if mem.used_two_phase_commit() {
1851                return Err(DatabaseError::Storage(StorageError::Corrupted(
1852                    "Primary is corrupted despite 2-phase commit".to_string(),
1853                )));
1854            }
1855
1856            // 0.3 because the repair takes 3 full scans and the first is done now
1857            let mut handle = RepairSession::new(0.3);
1858            repair_callback(&mut handle);
1859            if handle.aborted() {
1860                return Err(DatabaseError::RepairAborted);
1861            }
1862
1863            mem.repair_primary_corrupted();
1864            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
1865            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
1866            // that rolls back a partially committed transaction.
1867            mem.clear_read_cache();
1868            if !Self::verify_primary_checksums(mem.clone())? {
1869                return Err(DatabaseError::Storage(StorageError::Corrupted(
1870                    "Failed to repair database. All roots are corrupted".to_string(),
1871                )));
1872            }
1873        }
1874        // 0.6 because the repair takes 3 full scans and the second is done now
1875        let mut handle = RepairSession::new(0.6);
1876        repair_callback(&mut handle);
1877        if handle.aborted() {
1878            return Err(DatabaseError::RepairAborted);
1879        }
1880
1881        mem.begin_repair()?;
1882
1883        let data_root = mem.get_data_root();
1884        {
1885            let fake = Arc::new(TransactionGuard::Verification);
1886            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
1887            tables.visit_all_pages(|path| {
1888                mem.mark_page_allocated(path.page_number());
1889                Ok(())
1890            })?;
1891        }
1892
1893        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
1894        let mut handle = RepairSession::new(0.9);
1895        repair_callback(&mut handle);
1896        if handle.aborted() {
1897            return Err(DatabaseError::RepairAborted);
1898        }
1899
1900        let system_root = mem.get_system_root();
1901        {
1902            let fake = Arc::new(TransactionGuard::Verification);
1903            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
1904            system_tables.visit_all_pages(|path| {
1905                mem.mark_page_allocated(path.page_number());
1906                Ok(())
1907            })?;
1908        }
1909
1910        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
1911            mem.mark_page_allocated(page);
1912            Ok(())
1913        })?;
1914        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
1915            mem.mark_page_allocated(page);
1916            Ok(())
1917        })?;
1918        #[cfg(debug_assertions)]
1919        {
1920            Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
1921        }
1922
1923        mem.end_repair()?;
1924
1925        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
1926        // by storing an empty root during the below commit()
1927        mem.clear_read_cache();
1928
1929        Ok([data_root, system_root])
1930    }
1931
1932    #[allow(clippy::too_many_arguments)]
1933    fn new(
1934        file: Box<dyn StorageBackend>,
1935        allow_initialize: bool,
1936        page_size: usize,
1937        region_size: Option<u64>,
1938        read_cache_size_bytes: usize,
1939        write_cache_size_bytes: usize,
1940        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
1941        compression: CompressionConfig,
1942        blob_dedup_config: BlobDedupConfig,
1943        memory_budget: Option<usize>,
1944        cdc_config: CdcConfig,
1945        history_retention: u64,
1946        read_verification: ReadVerification,
1947        read_verification_callback: Option<Arc<ReadVerificationCallback>>,
1948        blob_compaction_policy: BlobCompactionPolicy,
1949        observer: Arc<dyn DatabaseObserver>,
1950        #[cfg(feature = "metrics")] db_metrics: Arc<DbMetrics>,
1951    ) -> Result<Self, DatabaseError> {
1952        #[cfg(feature = "logging")]
1953        let file_path = format!("{:?}", &file);
1954        #[cfg(feature = "logging")]
1955        info!("Opening database {:?}", &file_path);
1956        let mem = TransactionalMemory::new(
1957            file,
1958            allow_initialize,
1959            page_size,
1960            region_size,
1961            read_cache_size_bytes,
1962            write_cache_size_bytes,
1963            false,
1964            compression,
1965            memory_budget,
1966            read_verification,
1967            read_verification_callback,
1968        )?;
1969        let mut mem = Arc::new(mem);
1970        // If the last transaction used 2-phase commit and updated the allocator state table, then
1971        // we can just load the allocator state from there. Otherwise, we need a full repair
1972        if let Some(tree) = Self::get_allocator_state_table(&mem)? {
1973            #[cfg(feature = "logging")]
1974            info!("Found valid allocator state, full repair not needed");
1975            mem.load_allocator_state(&tree)?;
1976            #[cfg(debug_assertions)]
1977            Self::mark_allocated_page_for_debug(&mut mem)?;
1978        } else {
1979            #[cfg(feature = "logging")]
1980            warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
1981            let mut handle = RepairSession::new(0.0);
1982            repair_callback(&mut handle);
1983            if handle.aborted() {
1984                return Err(DatabaseError::RepairAborted);
1985            }
1986            let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
1987            let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
1988            mem.commit(
1989                data_root,
1990                system_root,
1991                next_transaction_id,
1992                true,
1993                ShrinkPolicy::Never,
1994            )?;
1995        }
1996
1997        mem.begin_writable()?;
1998        let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
1999
2000        let db = Database {
2001            mem,
2002            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
2003            blob_dedup_config: blob_dedup_config.clone(),
2004            cdc_config,
2005            history_retention,
2006            blob_compaction_policy,
2007            observer,
2008            #[cfg(feature = "metrics")]
2009            db_metrics,
2010            #[cfg(feature = "std")]
2011            group_committer: GroupCommitter::new(),
2012        };
2013
2014        // Restore the tracker state for any persistent savepoints
2015        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
2016        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
2017            db.transaction_tracker
2018                .restore_savepoint_counter_state(next_id)?;
2019        }
2020        for id in txn.list_persistent_savepoints()? {
2021            let savepoint = match txn.get_persistent_savepoint(id) {
2022                Ok(savepoint) => savepoint,
2023                Err(err) => match err {
2024                    SavepointError::InvalidSavepoint => {
2025                        return Err(StorageError::Corrupted(
2026                            "invalid savepoint encountered during database initialization"
2027                                .to_string(),
2028                        )
2029                        .into());
2030                    }
2031                    SavepointError::Storage(storage) => {
2032                        return Err(storage.into());
2033                    }
2034                },
2035            };
2036            db.transaction_tracker
2037                .register_persistent_savepoint(&savepoint)?;
2038        }
2039        // Restore history holds for retained snapshots
2040        let history_ids = txn.list_history_snapshot_ids()?;
2041        if history_retention > 0 {
2042            for id in &history_ids {
2043                db.transaction_tracker
2044                    .register_history_hold(TransactionId::new(*id))?;
2045            }
2046        }
2047        txn.abort()?;
2048
2049        // If retention is disabled but leftover history entries exist, purge them
2050        // in a committed transaction so the pages can be freed.
2051        if history_retention == 0 && !history_ids.is_empty() {
2052            let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
2053            txn.purge_all_history_snapshots()?;
2054            txn.commit().map_err(|e| match e {
2055                CommitError::Storage(s) => DatabaseError::Storage(s),
2056            })?;
2057        }
2058
2059        Ok(db)
2060    }
2061
2062    fn get_allocator_state_table(
2063        mem: &Arc<TransactionalMemory>,
2064    ) -> Result<Option<AllocatorStateTree>> {
2065        // The allocator state table is only valid if the primary was written using 2-phase commit
2066        if !mem.used_two_phase_commit() {
2067            return Ok(None);
2068        }
2069
2070        // See if it's present in the system table tree
2071        let system_table_tree = TableTree::new(
2072            mem.get_system_root(),
2073            PageHint::None,
2074            Arc::new(TransactionGuard::Verification),
2075            mem.clone(),
2076        )?;
2077        let Some(allocator_state_table) = system_table_tree
2078            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
2079            .map_err(|e| e.into_storage_error_or_internal("Unexpected TableError"))?
2080        else {
2081            return Ok(None);
2082        };
2083
2084        // Load the allocator state table
2085        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
2086            return Err(StorageError::Corrupted(
2087                "unexpected non-normal table type for allocator state table".to_string(),
2088            ));
2089        };
2090        let tree = Btree::new_uncompressed(
2091            table_root,
2092            PageHint::None,
2093            Arc::new(TransactionGuard::Verification),
2094            mem.clone(),
2095        )?;
2096
2097        // Make sure this isn't stale allocator state left over from a previous transaction
2098        if !mem.is_valid_allocator_state(&tree)? {
2099            return Ok(None);
2100        }
2101
2102        Ok(Some(tree))
2103    }
2104
2105    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
2106        let id = self
2107            .transaction_tracker
2108            .register_read_transaction(&self.mem)?;
2109
2110        Ok(TransactionGuard::new_read(
2111            id,
2112            self.transaction_tracker.clone(),
2113        ))
2114    }
2115
2116    /// Convenience method for [`Builder::new`]
2117    pub fn builder() -> Builder {
2118        Builder::new()
2119    }
2120
2121    /// Begins a write transaction
2122    ///
2123    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
2124    /// write may be in progress at a time. If a write is in progress, this function will block
2125    /// until it completes.
2126    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
2127        // Fail early if there has been an I/O error -- nothing can be committed in that case
2128        self.mem.check_io_errors()?;
2129        let guard = TransactionGuard::new_write(
2130            self.transaction_tracker.start_write_transaction()?,
2131            self.transaction_tracker.clone(),
2132        );
2133        WriteTransaction::new(
2134            guard,
2135            self.transaction_tracker.clone(),
2136            self.mem.clone(),
2137            self.blob_dedup_config.clone(),
2138            self.cdc_config.clone(),
2139            self.history_retention,
2140            Arc::clone(&self.observer),
2141            #[cfg(feature = "metrics")]
2142            Arc::clone(&self.db_metrics),
2143        )
2144        .map_err(|e| e.into())
2145    }
2146
2147    /// Returns the observer registered with this database.
2148    pub fn observer(&self) -> &Arc<dyn DatabaseObserver> {
2149        &self.observer
2150    }
2151
2152    /// Returns the database metrics counters.
2153    #[cfg(feature = "metrics")]
2154    pub fn metrics(&self) -> &DbMetrics {
2155        &self.db_metrics
2156    }
2157
2158    /// Begin a read transaction at a specific historical transaction ID.
2159    ///
2160    /// The database must have been opened with `set_history_retention()` > 0 and the
2161    /// requested transaction must still be within the retention window.
2162    ///
2163    /// Returns a [`ReadTransaction`] that sees the user-table state as of the
2164    /// requested commit.
2165    pub fn begin_read_at(&self, transaction_id: u64) -> Result<ReadTransaction, TransactionError> {
2166        let lookup_txn = self.begin_read()?;
2167        let snapshot = lookup_txn
2168            .get_history_snapshot_ro(transaction_id)
2169            .map_err(TransactionError::Storage)?
2170            .ok_or(TransactionError::Storage(
2171                StorageError::HistorySnapshotNotFound(transaction_id),
2172            ))?;
2173        let user_root = snapshot.user_root();
2174        let guard = self.allocate_read_transaction()?;
2175        drop(lookup_txn);
2176        ReadTransaction::new_historical(
2177            self.mem.clone(),
2178            guard,
2179            user_root,
2180            Arc::clone(&self.observer),
2181            #[cfg(feature = "metrics")]
2182            Arc::clone(&self.db_metrics),
2183        )
2184    }
2185
2186    /// Begin a read transaction at the latest snapshot whose timestamp is <= the given
2187    /// epoch-millisecond value.
2188    ///
2189    /// Requires the `std` feature (timestamps are only recorded with `std`).
2190    #[cfg(feature = "std")]
2191    pub fn begin_read_at_time(
2192        &self,
2193        timestamp_ms: u64,
2194    ) -> Result<ReadTransaction, TransactionError> {
2195        let lookup_txn = self.begin_read()?;
2196        let ids = lookup_txn
2197            .list_history_snapshot_ids_ro()
2198            .map_err(TransactionError::Storage)?;
2199        let mut best: Option<Option<BtreeHeader>> = None;
2200        for id in ids {
2201            if let Some(snap) = lookup_txn
2202                .get_history_snapshot_ro(id)
2203                .map_err(TransactionError::Storage)?
2204                && snap.timestamp_ms() <= timestamp_ms
2205            {
2206                best = Some(snap.user_root());
2207            }
2208        }
2209        let best_root = best.ok_or(TransactionError::Storage(
2210            StorageError::HistorySnapshotNotFound(timestamp_ms),
2211        ))?;
2212        let guard = self.allocate_read_transaction()?;
2213        drop(lookup_txn);
2214        ReadTransaction::new_historical(
2215            self.mem.clone(),
2216            guard,
2217            best_root,
2218            Arc::clone(&self.observer),
2219            #[cfg(feature = "metrics")]
2220            Arc::clone(&self.db_metrics),
2221        )
2222    }
2223
2224    /// List all retained transaction snapshots.
2225    ///
2226    /// Returns entries ordered by transaction ID (ascending).
2227    pub fn transaction_history(&self) -> Result<Vec<TransactionInfo>, TransactionError> {
2228        let lookup_txn = self.begin_read()?;
2229        let ids = lookup_txn
2230            .list_history_snapshot_ids_ro()
2231            .map_err(TransactionError::Storage)?;
2232        let mut result = Vec::with_capacity(ids.len());
2233        for id in ids {
2234            if let Some(snap) = lookup_txn
2235                .get_history_snapshot_ro(id)
2236                .map_err(TransactionError::Storage)?
2237            {
2238                result.push(TransactionInfo {
2239                    transaction_id: id,
2240                    timestamp_ms: snap.timestamp_ms(),
2241                });
2242            }
2243        }
2244        Ok(result)
2245    }
2246
2247    /// Submit a write batch to the group commit pipeline.
2248    ///
2249    /// Multiple concurrent callers will have their batches combined into a single
2250    /// write transaction with a single fsync, amortizing the durability cost across
2251    /// all participants.
2252    ///
2253    /// The batch closure receives a `&WriteTransaction` and performs all desired
2254    /// mutations (open tables, insert, remove, etc.). Do not call `commit()` or
2255    /// `abort()` within the closure -- the group committer manages the transaction
2256    /// lifecycle.
2257    ///
2258    /// If any batch in a group fails, the entire group is rolled back. The failed
2259    /// batch receives its specific error; all others receive `GroupCommitError::PeerFailed`
2260    /// and may retry.
2261    #[cfg(feature = "std")]
2262    pub fn submit_write_batch(&self, batch: WriteBatch) -> Result<(), GroupCommitError> {
2263        let (should_lead, result_rx) = self.group_committer.enqueue(batch)?;
2264
2265        if should_lead {
2266            self.run_group_commit();
2267        }
2268
2269        result_rx.recv().unwrap_or(Err(GroupCommitError::Shutdown))
2270    }
2271
2272    #[cfg(feature = "std")]
2273    fn run_group_commit(&self) {
2274        // Initial drain -- the leader's own batch (and any that arrived concurrently)
2275        // are already in the pending queue.
2276        let Ok(mut batches) = self.group_committer.drain_pending() else {
2277            // Mutex poisoned -- relinquish leadership (best-effort).
2278            let _ = self.group_committer.finish_leader();
2279            return;
2280        };
2281
2282        loop {
2283            if batches.is_empty() {
2284                // Nothing to process -- atomically relinquish leadership.
2285                // finish_leader returns any batches that arrived between our
2286                // last drain and now, preventing orphaned batches.
2287                match self.group_committer.finish_leader() {
2288                    Ok(remaining) if remaining.is_empty() => return,
2289                    Ok(remaining) => {
2290                        batches = remaining;
2291                        continue;
2292                    }
2293                    Err(_) => return,
2294                }
2295            }
2296
2297            let txn = match self.begin_write() {
2298                Ok(txn) => txn,
2299                Err(e) => {
2300                    let msg = e.into_storage_error().to_string();
2301                    for b in batches {
2302                        let _ = b.result_tx.send(Err(GroupCommitError::TransactionFailed(
2303                            StorageError::Corrupted(msg.clone()),
2304                        )));
2305                    }
2306                    let _ = self.group_committer.finish_leader();
2307                    return;
2308                }
2309            };
2310
2311            let mut senders = Vec::with_capacity(batches.len());
2312            let mut failed = false;
2313
2314            for pending in batches {
2315                if failed {
2316                    let _ = pending.result_tx.send(Err(GroupCommitError::PeerFailed));
2317                    continue;
2318                }
2319
2320                match pending.batch.apply(&txn) {
2321                    Ok(()) => {
2322                        senders.push(pending.result_tx);
2323                    }
2324                    Err(e) => {
2325                        failed = true;
2326                        let _ = pending
2327                            .result_tx
2328                            .send(Err(GroupCommitError::BatchFailed(e)));
2329                        for tx in senders.drain(..) {
2330                            let _ = tx.send(Err(GroupCommitError::PeerFailed));
2331                        }
2332                    }
2333                }
2334            }
2335
2336            if failed {
2337                let _ = txn.abort();
2338                // Re-drain: new batches may have arrived while we were processing.
2339                let Ok(b) = self.group_committer.drain_pending() else {
2340                    let _ = self.group_committer.finish_leader();
2341                    return;
2342                };
2343                batches = b;
2344                continue;
2345            }
2346
2347            match txn.commit() {
2348                Ok(()) => {
2349                    for tx in senders {
2350                        let _ = tx.send(Ok(()));
2351                    }
2352                }
2353                Err(e) => {
2354                    let msg = e.into_storage_error().to_string();
2355                    for tx in senders {
2356                        let _ = tx.send(Err(GroupCommitError::CommitFailed(
2357                            StorageError::Corrupted(msg.clone()),
2358                        )));
2359                    }
2360                }
2361            }
2362
2363            // Check for batches that arrived while we were committing.
2364            let Ok(b) = self.group_committer.drain_pending() else {
2365                let _ = self.group_committer.finish_leader();
2366                return;
2367            };
2368            batches = b;
2369        }
2370    }
2371
2372    fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
2373        // Make a new quick-repair commit to update the allocator state table
2374        #[cfg(feature = "logging")]
2375        debug!("Writing allocator state table");
2376        let mut tx = self.begin_write()?;
2377        tx.set_quick_repair(true);
2378        tx.set_shrink_policy(ShrinkPolicy::Maximum);
2379        tx.commit()?;
2380
2381        Ok(())
2382    }
2383}
2384
2385impl Drop for Database {
2386    fn drop(&mut self) {
2387        #[cfg(feature = "std")]
2388        self.group_committer.shutdown();
2389
2390        let is_panicking = {
2391            #[cfg(feature = "std")]
2392            {
2393                std::thread::panicking()
2394            }
2395            #[cfg(not(feature = "std"))]
2396            {
2397                false
2398            }
2399        };
2400
2401        if !is_panicking && self.ensure_allocator_state_table_and_trim().is_err() {
2402            #[cfg(feature = "logging")]
2403            warn!("Failed to write allocator state table. Repair may be required at restart.");
2404        }
2405
2406        if self.mem.close().is_err() {
2407            #[cfg(feature = "logging")]
2408            warn!("Failed to flush database file. Repair may be required at restart.");
2409        }
2410    }
2411}
2412
2413/// Handle for an incremental online compaction.
2414///
2415/// Created by [`Database::start_compaction()`]. Each call to [`step()`](Self::step)
2416/// briefly acquires a write transaction, relocates a batch of pages from the highest
2417/// file offsets to lower ones, and commits -- allowing concurrent readers to proceed
2418/// between steps.
2419///
2420/// Use [`run()`](Self::run) to loop until compaction is complete.
2421pub struct CompactionHandle<'db> {
2422    db: &'db Database,
2423}
2424
2425impl CompactionHandle<'_> {
2426    /// Performs one compaction step: flushes pending frees, relocates a batch of pages,
2427    /// then reclaims the freed space.
2428    ///
2429    /// Returns [`CompactionProgress`] indicating how many pages were moved and whether
2430    /// compaction is complete.
2431    pub fn step(&self) -> core::result::Result<CompactionProgress, CompactionError> {
2432        // Phase 1: Flush pending frees via a 2-phase commit
2433        let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2434        txn.set_two_phase_commit(true);
2435        txn.commit().map_err(|e| e.into_storage_error())?;
2436
2437        // Phase 2: Relocate pages
2438        let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2439        let relocated = txn.compact_pages()?;
2440        if relocated {
2441            txn.commit().map_err(|e| e.into_storage_error())?;
2442        } else {
2443            txn.abort()?;
2444            return Ok(CompactionProgress {
2445                pages_relocated: 0,
2446                complete: true,
2447            });
2448        }
2449
2450        // Phase 3: Two more 2-phase commits to process freed pages and shrink the file.
2451        // The first commit reclaims relocated pages; the second handles pages freed by
2452        // the data-freed tree itself (which is a system table whose own pages may move).
2453        let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2454        txn.set_two_phase_commit(true);
2455        txn.set_shrink_policy(ShrinkPolicy::Maximum);
2456        txn.commit().map_err(|e| e.into_storage_error())?;
2457
2458        let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2459        txn.set_two_phase_commit(true);
2460        txn.set_shrink_policy(ShrinkPolicy::Maximum);
2461        txn.commit().map_err(|e| e.into_storage_error())?;
2462
2463        let progress = CompactionProgress {
2464            pages_relocated: 1, // at least one batch was relocated
2465            complete: false,
2466        };
2467        self.db.observer.on_compaction_step(&progress);
2468        #[cfg(feature = "metrics")]
2469        self.db
2470            .db_metrics
2471            .compaction_pages_relocated
2472            .fetch_add(1, portable_atomic::Ordering::Relaxed);
2473        Ok(progress)
2474    }
2475
2476    /// Runs compaction to completion, returning the total number of steps performed.
2477    pub fn run(&self) -> core::result::Result<u64, CompactionError> {
2478        let mut steps = 0u64;
2479        loop {
2480            let progress = self.step()?;
2481            if progress.complete {
2482                break;
2483            }
2484            steps += 1;
2485        }
2486        self.db.observer.on_compaction_complete(steps);
2487        Ok(steps)
2488    }
2489}
2490
2491/// Progress report from a blob compaction step.
2492#[derive(Debug, Clone, Copy)]
2493pub struct BlobCompactionProgress {
2494    /// Number of blobs relocated so far.
2495    pub blobs_relocated: u64,
2496    /// Total live bytes relocated so far.
2497    pub live_bytes: u64,
2498    /// Which phase just completed (1 = append, 2 = shift+truncate).
2499    pub phase: u8,
2500    /// Whether blob compaction is complete.
2501    pub complete: bool,
2502}
2503
2504/// Handle for online blob compaction that allows concurrent readers between
2505/// phases.
2506///
2507/// Created by [`Database::start_blob_compaction()`]. The compaction runs in
2508/// two phases:
2509/// 1. **Append**: Live blobs are appended after the current region end (crash-safe).
2510/// 2. **Shift + truncate**: Data is shifted to region start and the file is truncated.
2511///
2512/// Between phases, read transactions can proceed normally.
2513/// Use [`run()`](Self::run) for a simple all-at-once call.
2514pub struct BlobCompactionHandle<'db> {
2515    db: &'db Database,
2516    stats: BlobStats,
2517    phase: u8,
2518    blobs_relocated: u64,
2519    live_bytes: u64,
2520}
2521
2522impl BlobCompactionHandle<'_> {
2523    /// Performs one phase of blob compaction.
2524    ///
2525    /// Call repeatedly until `complete` is `true`. Each call acquires and
2526    /// releases a write transaction, allowing readers in between.
2527    pub fn step(&mut self) -> core::result::Result<BlobCompactionProgress, CompactionError> {
2528        if self.phase == 0 {
2529            // Phase 1: Append live blobs after current region end
2530            let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2531            let result = txn.compact_blobs_pass(false);
2532            match result {
2533                Ok((relocated, live_size)) => {
2534                    txn.set_two_phase_commit(true);
2535                    txn.commit().map_err(|e| e.into_storage_error())?;
2536                    self.blobs_relocated = relocated;
2537                    self.live_bytes = live_size;
2538                    self.phase = 1;
2539                    Ok(BlobCompactionProgress {
2540                        blobs_relocated: relocated,
2541                        live_bytes: live_size,
2542                        phase: 1,
2543                        complete: false,
2544                    })
2545                }
2546                Err(e) => {
2547                    txn.abort().map_err(CompactionError::Storage)?;
2548                    Err(CompactionError::Storage(e))
2549                }
2550            }
2551        } else if self.phase == 1 {
2552            // Phase 2: Shift data to region start
2553            let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2554            let result = txn.compact_blobs_pass(true);
2555            match result {
2556                Ok(_) => {
2557                    txn.set_two_phase_commit(true);
2558                    txn.commit().map_err(|e| e.into_storage_error())?;
2559                }
2560                Err(e) => {
2561                    txn.abort().map_err(CompactionError::Storage)?;
2562                    return Err(CompactionError::Storage(e));
2563                }
2564            }
2565
2566            // Truncate the file
2567            let blob_state = self.db.mem.get_committed_blob_state();
2568            let target_len = blob_state.region_offset + blob_state.region_length;
2569            if target_len > 0 {
2570                self.db
2571                    .mem
2572                    .truncate_to(target_len)
2573                    .map_err(CompactionError::Storage)?;
2574            }
2575
2576            self.phase = 2;
2577            Ok(BlobCompactionProgress {
2578                blobs_relocated: self.blobs_relocated,
2579                live_bytes: self.live_bytes,
2580                phase: 2,
2581                complete: true,
2582            })
2583        } else {
2584            // Already complete
2585            Ok(BlobCompactionProgress {
2586                blobs_relocated: self.blobs_relocated,
2587                live_bytes: self.live_bytes,
2588                phase: 2,
2589                complete: true,
2590            })
2591        }
2592    }
2593
2594    /// Runs both phases to completion, returning a compaction report.
2595    pub fn run(&mut self) -> core::result::Result<BlobCompactionReport, CompactionError> {
2596        loop {
2597            let progress = self.step()?;
2598            if progress.complete {
2599                let bytes_reclaimed = self.stats.region_bytes.saturating_sub(self.live_bytes);
2600                return Ok(BlobCompactionReport {
2601                    blobs_relocated: self.blobs_relocated,
2602                    live_bytes: self.live_bytes,
2603                    bytes_reclaimed,
2604                    was_noop: false,
2605                });
2606            }
2607        }
2608    }
2609}
2610
2611pub struct RepairSession {
2612    progress: f64,
2613    aborted: bool,
2614}
2615
2616impl RepairSession {
2617    pub(crate) fn new(progress: f64) -> Self {
2618        Self {
2619            progress,
2620            aborted: false,
2621        }
2622    }
2623
2624    pub(crate) fn aborted(&self) -> bool {
2625        self.aborted
2626    }
2627
2628    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
2629    pub fn abort(&mut self) {
2630        self.aborted = true;
2631    }
2632
2633    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
2634    pub fn progress(&self) -> f64 {
2635        self.progress
2636    }
2637}
2638
2639/// Controls inline checksum verification during B-tree reads.
2640///
2641/// shodh-redb stores XXH3-128 checksums in a merkle-tree structure -- each
2642/// parent branch contains the expected checksum for every child page, and
2643/// the root page checksum lives in `BtreeHeader`. This enum controls
2644/// whether (and how often) those checksums are verified on the read path.
2645///
2646/// # Modes
2647///
2648/// | Mode | Overhead | Use case |
2649/// |------|----------|----------|
2650/// | `None` | 0 | Trusted storage, maximum throughput |
2651/// | `Sampled { rate }` | ~rate x 5 % | Edge devices, cheap flash |
2652/// | `Full` | ~5 % | Safety-critical, after detected corruption |
2653#[derive(Debug, Clone, Copy, PartialEq)]
2654pub enum ReadVerification {
2655    /// No verification on reads (current default behavior).
2656    None,
2657    /// Verify a random fraction of page reads. `rate` is clamped to `[0.0, 1.0]`.
2658    Sampled { rate: f32 },
2659    /// Verify every page read.
2660    Full,
2661}
2662
2663impl Default for ReadVerification {
2664    fn default() -> Self {
2665        Self::None
2666    }
2667}
2668
2669/// Action to take when a read verification failure is detected.
2670#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2671pub enum ReadVerificationAction {
2672    /// Return a `StorageError::Corrupted` to the caller.
2673    ReturnError,
2674    /// Log/record the corruption but allow the read to proceed.
2675    Continue,
2676}
2677
2678/// Callback signature for read verification failures.
2679///
2680/// Receives the internal page number (as `u64`) that failed checksum verification.
2681/// Returns the action the database should take.
2682pub type ReadVerificationCallback = dyn Fn(u64) -> ReadVerificationAction + Send + Sync + 'static;
2683
2684/// Configuration builder of a redb [Database].
2685pub struct Builder {
2686    page_size: usize,
2687    region_size: Option<u64>,
2688    read_cache_size_bytes: usize,
2689    write_cache_size_bytes: usize,
2690    compression: CompressionConfig,
2691    repair_callback: Box<dyn Fn(&mut RepairSession)>,
2692    blob_dedup_config: BlobDedupConfig,
2693    memory_budget: Option<usize>,
2694    cdc_config: CdcConfig,
2695    history_retention: u64,
2696    read_verification: ReadVerification,
2697    read_verification_callback: Option<Arc<ReadVerificationCallback>>,
2698    observer: Option<Arc<dyn DatabaseObserver>>,
2699    blob_compaction_policy: BlobCompactionPolicy,
2700}
2701
2702impl Builder {
2703    /// Construct a new [Builder] with sensible defaults.
2704    ///
2705    /// ## Defaults
2706    ///
2707    /// - `cache_size_bytes`: 1GiB
2708    #[allow(clippy::new_without_default)]
2709    pub fn new() -> Self {
2710        let mut result = Self {
2711            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
2712            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
2713            // It is part of the file format, so can be enabled in the future.
2714            page_size: PAGE_SIZE,
2715            region_size: None,
2716            // TODO: Default should probably take into account the total system memory
2717            read_cache_size_bytes: 0,
2718            // TODO: Default should probably take into account the total system memory
2719            write_cache_size_bytes: 0,
2720            compression: CompressionConfig::None,
2721            repair_callback: Box::new(|_| {}),
2722            blob_dedup_config: BlobDedupConfig::default(),
2723            memory_budget: None,
2724            cdc_config: CdcConfig::default(),
2725            history_retention: 0,
2726            read_verification: ReadVerification::None,
2727            read_verification_callback: None,
2728            observer: None,
2729            blob_compaction_policy: BlobCompactionPolicy::default(),
2730        };
2731
2732        result.set_cache_size(1024 * 1024 * 1024);
2733        result
2734    }
2735
2736    /// Set a callback which will be invoked periodically in the event that the database file needs
2737    /// to be repaired.
2738    ///
2739    /// The [`RepairSession`] argument can be used to control the repair process.
2740    ///
2741    /// If the database file needs repair, the callback will be invoked at least once.
2742    /// There is no upper limit on the number of times it may be called.
2743    pub fn set_repair_callback(
2744        &mut self,
2745        callback: impl Fn(&mut RepairSession) + 'static,
2746    ) -> &mut Self {
2747        self.repair_callback = Box::new(callback);
2748        self
2749    }
2750
2751    /// Register an observer for database lifecycle events.
2752    ///
2753    /// The observer receives synchronous callbacks on the committing/reading
2754    /// thread. Implementations must not block, panic, or perform fallible I/O.
2755    ///
2756    /// See [`DatabaseObserver`] for the full list of events.
2757    pub fn set_observer(&mut self, observer: impl DatabaseObserver) -> &mut Self {
2758        self.observer = Some(Arc::new(observer));
2759        self
2760    }
2761
2762    /// Set the advisory policy used by
2763    /// [`Database::should_compact_blobs()`](crate::Database::should_compact_blobs).
2764    ///
2765    /// This only affects the advisory recommendation; the database never
2766    /// auto-compacts blobs.
2767    pub fn set_blob_compaction_policy(&mut self, policy: BlobCompactionPolicy) -> &mut Self {
2768        self.blob_compaction_policy = policy;
2769        self
2770    }
2771
2772    /// Set the internal page size of the database
2773    ///
2774    /// Valid values are powers of two, greater than or equal to 512
2775    ///
2776    /// ## Defaults
2777    ///
2778    /// Default to 4 Kib pages.
2779    #[cfg(any(fuzzing, test))]
2780    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
2781        assert!(size.is_power_of_two());
2782        self.page_size = std::cmp::max(size, 512);
2783        self
2784    }
2785
2786    /// Set the amount of memory (in bytes) used for caching data
2787    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
2788        // TODO: allow dynamic expansion of the read/write cache
2789        self.read_cache_size_bytes = bytes / 10 * 9;
2790        self.write_cache_size_bytes = bytes / 10;
2791        self
2792    }
2793
2794    /// Set a hard memory budget (in bytes) for the database.
2795    ///
2796    /// The budget is partitioned as follows:
2797    /// - 70% for the read cache
2798    /// - 20% for the write buffer
2799    /// - 10% reserved for internal overhead
2800    ///
2801    /// When the budget is set, the database will:
2802    /// - Perform cross-stripe cache eviction when memory pressure is high
2803    /// - Auto-flush the write buffer before it exceeds its partition
2804    /// - Skip caching read results when total usage exceeds the budget
2805    ///
2806    /// This overrides any prior call to [`set_cache_size`](Self::set_cache_size).
2807    ///
2808    /// # Example
2809    ///
2810    /// ```rust,ignore
2811    /// let db = Database::builder()
2812    ///     .set_memory_budget(50 * 1024 * 1024)  // 50 MiB hard cap
2813    ///     .create("constrained.redb")?;
2814    /// ```
2815    pub fn set_memory_budget(&mut self, bytes: usize) -> &mut Self {
2816        assert!(
2817            bytes >= 16384,
2818            "Memory budget must be at least 16 KiB (got {bytes} bytes). \
2819             Budgets below 4 page sizes cannot cache any data."
2820        );
2821        self.memory_budget = Some(bytes);
2822        self.read_cache_size_bytes = bytes / 100 * 70;
2823        self.write_cache_size_bytes = bytes / 100 * 20;
2824        self
2825    }
2826
2827    #[cfg(any(test, fuzzing))]
2828    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
2829        assert!(size.is_power_of_two());
2830        self.region_size = Some(size);
2831        self
2832    }
2833
2834    /// Set the page compression algorithm.
2835    ///
2836    /// When creating a new database, pages will be compressed using this algorithm
2837    /// before writing to disk. When opening an existing database, the compression
2838    /// setting stored in the database header takes precedence.
2839    ///
2840    /// Requires the corresponding feature flag (`compression_lz4` or `compression_zstd`).
2841    pub fn set_compression(&mut self, compression: CompressionConfig) -> &mut Self {
2842        self.compression = compression;
2843        self
2844    }
2845
2846    /// Enable or disable content-addressable blob deduplication using SHA-256.
2847    ///
2848    /// When enabled, identical blobs are stored once with reference counting.
2849    /// Disabled by default.
2850    pub fn set_blob_dedup(&mut self, enabled: bool) -> &mut Self {
2851        self.blob_dedup_config.enabled = enabled;
2852        self
2853    }
2854
2855    /// Set the minimum blob size (bytes) for dedup consideration.
2856    ///
2857    /// Blobs smaller than this threshold skip SHA-256 computation and are
2858    /// always stored as separate copies. Default: 4096 bytes.
2859    pub fn set_blob_dedup_min_size(&mut self, min_size: usize) -> &mut Self {
2860        self.blob_dedup_config.min_size = min_size;
2861        self
2862    }
2863
2864    /// Enable Change Data Capture (CDC) with the given configuration.
2865    ///
2866    /// When enabled, all table mutations (insert, update, delete) are recorded
2867    /// in an internal log that can be polled via
2868    /// [`ReadTransaction::read_cdc_since()`](crate::ReadTransaction::read_cdc_since).
2869    /// When disabled (the default), CDC has zero overhead.
2870    pub fn set_cdc(&mut self, config: CdcConfig) -> &mut Self {
2871        self.cdc_config = config;
2872        self
2873    }
2874
2875    /// Set the number of committed transactions to retain for time-travel reads.
2876    ///
2877    /// When greater than zero, each durable commit saves a snapshot of the user-table
2878    /// root so that past states can be read via
2879    /// [`Database::begin_read_at()`] or [`Database::begin_read_at_time()`].
2880    /// Old snapshots beyond this limit are pruned automatically.
2881    ///
2882    /// The default is `0` (disabled, zero overhead).
2883    pub fn set_history_retention(&mut self, max_snapshots: u64) -> &mut Self {
2884        self.history_retention = max_snapshots;
2885        self
2886    }
2887
2888    /// Set the read verification mode.
2889    ///
2890    /// Controls whether B-tree page checksums are verified during reads.
2891    /// See [`ReadVerification`] for details on each mode.
2892    ///
2893    /// Default: [`ReadVerification::None`].
2894    pub fn set_read_verification(&mut self, mode: ReadVerification) -> &mut Self {
2895        self.read_verification = mode;
2896        self
2897    }
2898
2899    /// Set a callback invoked when read verification detects a corrupted page.
2900    ///
2901    /// The callback receives the internal page number (as `u64`) that failed
2902    /// and returns a [`ReadVerificationAction`] controlling whether the read
2903    /// returns an error or continues.
2904    ///
2905    /// Without a callback, corrupted pages always return
2906    /// [`StorageError::Corrupted`].
2907    pub fn set_read_verification_callback(
2908        &mut self,
2909        callback: impl Fn(u64) -> ReadVerificationAction + Send + Sync + 'static,
2910    ) -> &mut Self {
2911        self.read_verification_callback = Some(Arc::new(callback));
2912        self
2913    }
2914
2915    /// Opens the specified file as a redb database.
2916    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
2917    /// * if the file is a valid redb database, it will be opened
2918    /// * otherwise this function will return an error
2919    #[cfg(feature = "std")]
2920    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
2921        let file = OpenOptions::new()
2922            .read(true)
2923            .write(true)
2924            .create(true)
2925            .truncate(false)
2926            .open(path)?;
2927
2928        Database::new(
2929            Box::new(FileBackend::new(file)?),
2930            true,
2931            self.page_size,
2932            self.region_size,
2933            self.read_cache_size_bytes,
2934            self.write_cache_size_bytes,
2935            &self.repair_callback,
2936            self.compression,
2937            self.blob_dedup_config.clone(),
2938            self.memory_budget,
2939            self.cdc_config.clone(),
2940            self.history_retention,
2941            self.read_verification,
2942            self.read_verification_callback.clone(),
2943            self.blob_compaction_policy,
2944            self.resolve_observer(),
2945            #[cfg(feature = "metrics")]
2946            Self::resolve_metrics(),
2947        )
2948    }
2949
2950    /// Opens an existing redb database.
2951    #[cfg(feature = "std")]
2952    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
2953        let file = OpenOptions::new().read(true).write(true).open(path)?;
2954
2955        Database::new(
2956            Box::new(FileBackend::new(file)?),
2957            false,
2958            self.page_size,
2959            None,
2960            self.read_cache_size_bytes,
2961            self.write_cache_size_bytes,
2962            &self.repair_callback,
2963            self.compression,
2964            self.blob_dedup_config.clone(),
2965            self.memory_budget,
2966            self.cdc_config.clone(),
2967            self.history_retention,
2968            self.read_verification,
2969            self.read_verification_callback.clone(),
2970            self.blob_compaction_policy,
2971            self.resolve_observer(),
2972            #[cfg(feature = "metrics")]
2973            Self::resolve_metrics(),
2974        )
2975    }
2976
2977    /// Opens an existing redb database.
2978    ///
2979    /// If the file has been opened for writing (i.e. as a [`Database`]) [`DatabaseError::DatabaseAlreadyOpen`]
2980    /// will be returned on platforms which support file locks (macOS, Windows, Linux). On other platforms,
2981    /// the caller MUST avoid calling this method when the database is open for writing.
2982    #[cfg(feature = "std")]
2983    pub fn open_read_only(
2984        &self,
2985        path: impl AsRef<Path>,
2986    ) -> Result<ReadOnlyDatabase, DatabaseError> {
2987        let file = OpenOptions::new().read(true).open(path)?;
2988
2989        ReadOnlyDatabase::new(
2990            Box::new(FileBackend::new_internal(file, true)?),
2991            self.page_size,
2992            None,
2993            self.read_cache_size_bytes,
2994            self.compression,
2995            self.memory_budget,
2996            self.read_verification,
2997            self.read_verification_callback.clone(),
2998        )
2999    }
3000
3001    /// Open an existing or create a new database in the given `file`.
3002    ///
3003    /// The file must be empty or contain a valid database.
3004    #[cfg(feature = "std")]
3005    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
3006        Database::new(
3007            Box::new(FileBackend::new(file)?),
3008            true,
3009            self.page_size,
3010            self.region_size,
3011            self.read_cache_size_bytes,
3012            self.write_cache_size_bytes,
3013            &self.repair_callback,
3014            self.compression,
3015            self.blob_dedup_config.clone(),
3016            self.memory_budget,
3017            self.cdc_config.clone(),
3018            self.history_retention,
3019            self.read_verification,
3020            self.read_verification_callback.clone(),
3021            self.blob_compaction_policy,
3022            self.resolve_observer(),
3023            #[cfg(feature = "metrics")]
3024            Self::resolve_metrics(),
3025        )
3026    }
3027
3028    /// Open an existing or create a new database with the given backend.
3029    pub fn create_with_backend(
3030        &self,
3031        backend: impl StorageBackend,
3032    ) -> Result<Database, DatabaseError> {
3033        Database::new(
3034            Box::new(backend),
3035            true,
3036            self.page_size,
3037            self.region_size,
3038            self.read_cache_size_bytes,
3039            self.write_cache_size_bytes,
3040            &self.repair_callback,
3041            self.compression,
3042            self.blob_dedup_config.clone(),
3043            self.memory_budget,
3044            self.cdc_config.clone(),
3045            self.history_retention,
3046            self.read_verification,
3047            self.read_verification_callback.clone(),
3048            self.blob_compaction_policy,
3049            self.resolve_observer(),
3050            #[cfg(feature = "metrics")]
3051            Self::resolve_metrics(),
3052        )
3053    }
3054
3055    fn resolve_observer(&self) -> Arc<dyn DatabaseObserver> {
3056        self.observer
3057            .as_ref()
3058            .map_or_else(default_observer, Arc::clone)
3059    }
3060
3061    #[cfg(feature = "metrics")]
3062    fn resolve_metrics() -> Arc<DbMetrics> {
3063        Arc::new(DbMetrics::new())
3064    }
3065}
3066
3067impl Debug for Database {
3068    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
3069        f.debug_struct("Database").finish()
3070    }
3071}
3072
3073#[cfg(test)]
3074mod test {
3075    use crate::backends::FileBackend;
3076    use crate::error::BackendError;
3077    use crate::{
3078        CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
3079        ReadableTableMetadata, StorageBackend, StorageError, TableDefinition, TransactionError,
3080    };
3081    use core::sync::atomic::Ordering;
3082    use portable_atomic::AtomicU64;
3083    use std::fs::File;
3084    use std::io::{ErrorKind, Read, Seek, SeekFrom};
3085    use std::sync::Arc;
3086
3087    #[derive(Debug)]
3088    struct FailingBackend {
3089        inner: FileBackend,
3090        countdown: Arc<AtomicU64>,
3091    }
3092
3093    impl FailingBackend {
3094        fn new(backend: FileBackend, countdown: u64) -> Self {
3095            Self {
3096                inner: backend,
3097                countdown: Arc::new(AtomicU64::new(countdown)),
3098            }
3099        }
3100
3101        fn check_countdown(&self) -> Result<(), BackendError> {
3102            if self.countdown.load(Ordering::SeqCst) == 0 {
3103                return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
3104            }
3105
3106            Ok(())
3107        }
3108
3109        fn decrement_countdown(&self) -> Result<(), BackendError> {
3110            if self
3111                .countdown
3112                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
3113                    if x > 0 { Some(x - 1) } else { None }
3114                })
3115                .is_err()
3116            {
3117                return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
3118            }
3119
3120            Ok(())
3121        }
3122    }
3123
3124    impl StorageBackend for FailingBackend {
3125        fn len(&self) -> Result<u64, BackendError> {
3126            self.inner.len()
3127        }
3128
3129        fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), BackendError> {
3130            self.check_countdown()?;
3131            self.inner.read(offset, out)
3132        }
3133
3134        fn set_len(&self, len: u64) -> Result<(), BackendError> {
3135            self.inner.set_len(len)
3136        }
3137
3138        fn sync_data(&self) -> Result<(), BackendError> {
3139            self.check_countdown()?;
3140            self.inner.sync_data()
3141        }
3142
3143        fn write(&self, offset: u64, data: &[u8]) -> Result<(), BackendError> {
3144            self.decrement_countdown()?;
3145            self.inner.write(offset, data)
3146        }
3147    }
3148
3149    #[test]
3150    fn crash_regression4() {
3151        let tmpfile = crate::create_tempfile();
3152        let (file, path) = tmpfile.into_parts();
3153
3154        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
3155        let db = Database::builder()
3156            .set_cache_size(12686)
3157            .set_page_size(8 * 1024)
3158            .set_region_size(32 * 4096)
3159            .create_with_backend(backend)
3160            .unwrap();
3161
3162        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3163
3164        let tx = db.begin_write().unwrap();
3165        let _savepoint = tx.ephemeral_savepoint().unwrap();
3166        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
3167        tx.commit().unwrap();
3168        let tx = db.begin_write().unwrap();
3169        {
3170            let mut table = tx.open_table(table_def).unwrap();
3171            let _ = table.insert_reserve(118821, 360).unwrap();
3172        }
3173        let result = tx.commit();
3174        assert!(result.is_err());
3175
3176        drop(db);
3177        Database::builder()
3178            .set_cache_size(1024 * 1024)
3179            .set_page_size(8 * 1024)
3180            .set_region_size(32 * 4096)
3181            .create(&path)
3182            .unwrap();
3183    }
3184
3185    #[test]
3186    fn transient_io_error() {
3187        let tmpfile = crate::create_tempfile();
3188        let (file, path) = tmpfile.into_parts();
3189
3190        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
3191        let countdown = backend.countdown.clone();
3192        let db = Database::builder()
3193            .set_cache_size(0)
3194            .create_with_backend(backend)
3195            .unwrap();
3196
3197        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
3198
3199        // Create some garbage
3200        let tx = db.begin_write().unwrap();
3201        {
3202            let mut table = tx.open_table(table_def).unwrap();
3203            table.insert(0, 0).unwrap();
3204        }
3205        tx.commit().unwrap();
3206        let tx = db.begin_write().unwrap();
3207        {
3208            let mut table = tx.open_table(table_def).unwrap();
3209            table.insert(0, 1).unwrap();
3210        }
3211        tx.commit().unwrap();
3212
3213        let tx = db.begin_write().unwrap();
3214        // Cause an error in the commit
3215        countdown.store(0, Ordering::SeqCst);
3216        let result = tx.commit().err().unwrap();
3217        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
3218        let result = db.begin_write().err().unwrap();
3219        assert!(matches!(
3220            result,
3221            TransactionError::Storage(StorageError::PreviousIo)
3222        ));
3223        // Simulate a transient error
3224        countdown.store(u64::MAX, Ordering::SeqCst);
3225        drop(db);
3226
3227        // Check that recovery flag is set, even though the error has "cleared"
3228        let mut file = File::open(&path).unwrap();
3229        file.seek(SeekFrom::Start(9)).unwrap();
3230        let mut god_byte = vec![0u8];
3231        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
3232        assert_ne!(god_byte[0] & 2, 0);
3233    }
3234
3235    #[test]
3236    fn small_pages() {
3237        let tmpfile = crate::create_tempfile();
3238
3239        let db = Database::builder()
3240            .set_page_size(512)
3241            .create(tmpfile.path())
3242            .unwrap();
3243
3244        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3245        let txn = db.begin_write().unwrap();
3246        {
3247            txn.open_table(table_definition).unwrap();
3248        }
3249        txn.commit().unwrap();
3250    }
3251
3252    #[test]
3253    fn small_pages2() {
3254        let tmpfile = crate::create_tempfile();
3255
3256        let db = Database::builder()
3257            .set_page_size(512)
3258            .create(tmpfile.path())
3259            .unwrap();
3260
3261        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3262
3263        let mut tx = db.begin_write().unwrap();
3264        tx.set_two_phase_commit(true);
3265        let savepoint0 = tx.ephemeral_savepoint().unwrap();
3266        {
3267            tx.open_table(table_def).unwrap();
3268        }
3269        tx.commit().unwrap();
3270
3271        let mut tx = db.begin_write().unwrap();
3272        tx.set_two_phase_commit(true);
3273        let savepoint1 = tx.ephemeral_savepoint().unwrap();
3274        tx.restore_savepoint(&savepoint0).unwrap();
3275        tx.set_durability(Durability::None).unwrap();
3276        {
3277            let mut t = tx.open_table(table_def).unwrap();
3278            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
3279            assert!(t.remove(&291295).unwrap().is_none());
3280        }
3281        tx.commit().unwrap();
3282
3283        let mut tx = db.begin_write().unwrap();
3284        tx.set_two_phase_commit(true);
3285        tx.restore_savepoint(&savepoint0).unwrap();
3286        {
3287            tx.open_table(table_def).unwrap();
3288        }
3289        tx.commit().unwrap();
3290
3291        let mut tx = db.begin_write().unwrap();
3292        tx.set_two_phase_commit(true);
3293        let savepoint2 = tx.ephemeral_savepoint().unwrap();
3294        drop(savepoint0);
3295        tx.restore_savepoint(&savepoint2).unwrap();
3296        {
3297            let mut t = tx.open_table(table_def).unwrap();
3298            assert!(t.get(&2059).unwrap().is_none());
3299            assert!(t.remove(&145227).unwrap().is_none());
3300            assert!(t.remove(&145227).unwrap().is_none());
3301        }
3302        tx.commit().unwrap();
3303
3304        let mut tx = db.begin_write().unwrap();
3305        tx.set_two_phase_commit(true);
3306        let savepoint3 = tx.ephemeral_savepoint().unwrap();
3307        drop(savepoint1);
3308        tx.restore_savepoint(&savepoint3).unwrap();
3309        {
3310            tx.open_table(table_def).unwrap();
3311        }
3312        tx.commit().unwrap();
3313
3314        let mut tx = db.begin_write().unwrap();
3315        tx.set_two_phase_commit(true);
3316        let savepoint4 = tx.ephemeral_savepoint().unwrap();
3317        drop(savepoint2);
3318        tx.restore_savepoint(&savepoint3).unwrap();
3319        tx.set_durability(Durability::None).unwrap();
3320        {
3321            let mut t = tx.open_table(table_def).unwrap();
3322            assert!(t.remove(&207936).unwrap().is_none());
3323        }
3324        tx.abort().unwrap();
3325
3326        let mut tx = db.begin_write().unwrap();
3327        tx.set_two_phase_commit(true);
3328        let savepoint5 = tx.ephemeral_savepoint().unwrap();
3329        drop(savepoint3);
3330        assert!(tx.restore_savepoint(&savepoint4).is_err());
3331        {
3332            tx.open_table(table_def).unwrap();
3333        }
3334        tx.commit().unwrap();
3335
3336        let mut tx = db.begin_write().unwrap();
3337        tx.set_two_phase_commit(true);
3338        tx.restore_savepoint(&savepoint5).unwrap();
3339        tx.set_durability(Durability::None).unwrap();
3340        {
3341            tx.open_table(table_def).unwrap();
3342        }
3343        tx.commit().unwrap();
3344    }
3345
3346    #[test]
3347    fn small_pages3() {
3348        let tmpfile = crate::create_tempfile();
3349
3350        let db = Database::builder()
3351            .set_page_size(1024)
3352            .create(tmpfile.path())
3353            .unwrap();
3354
3355        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3356
3357        let mut tx = db.begin_write().unwrap();
3358        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
3359        tx.set_durability(Durability::None).unwrap();
3360        {
3361            let mut t = tx.open_table(table_def).unwrap();
3362            let value = vec![0; 306];
3363            t.insert(&539717, value.as_slice()).unwrap();
3364        }
3365        tx.abort().unwrap();
3366
3367        let mut tx = db.begin_write().unwrap();
3368        let savepoint1 = tx.ephemeral_savepoint().unwrap();
3369        tx.restore_savepoint(&savepoint1).unwrap();
3370        tx.set_durability(Durability::None).unwrap();
3371        {
3372            let mut t = tx.open_table(table_def).unwrap();
3373            let value = vec![0; 2008];
3374            t.insert(&784384, value.as_slice()).unwrap();
3375        }
3376        tx.abort().unwrap();
3377    }
3378
3379    #[test]
3380    fn small_pages4() {
3381        let tmpfile = crate::create_tempfile();
3382
3383        let db = Database::builder()
3384            .set_cache_size(1024 * 1024)
3385            .set_page_size(1024)
3386            .create(tmpfile.path())
3387            .unwrap();
3388
3389        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3390
3391        let tx = db.begin_write().unwrap();
3392        {
3393            tx.open_table(table_def).unwrap();
3394        }
3395        tx.commit().unwrap();
3396
3397        let tx = db.begin_write().unwrap();
3398        {
3399            let mut t = tx.open_table(table_def).unwrap();
3400            assert!(t.get(&131072).unwrap().is_none());
3401            let value = vec![0xFF; 1130];
3402            t.insert(&42394, value.as_slice()).unwrap();
3403            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
3404            assert!(t.get(&0).unwrap().is_none());
3405        }
3406        tx.abort().unwrap();
3407
3408        let tx = db.begin_write().unwrap();
3409        {
3410            let mut t = tx.open_table(table_def).unwrap();
3411            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
3412        }
3413        tx.abort().unwrap();
3414    }
3415
3416    #[test]
3417    fn dynamic_shrink() {
3418        let tmpfile = crate::create_tempfile();
3419        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3420        let big_value = vec![0u8; 1024];
3421
3422        let db = Database::builder()
3423            .set_region_size(1024 * 1024)
3424            .create(tmpfile.path())
3425            .unwrap();
3426
3427        let txn = db.begin_write().unwrap();
3428        {
3429            let mut table = txn.open_table(table_definition).unwrap();
3430            for i in 0..2048 {
3431                table.insert(&i, big_value.as_slice()).unwrap();
3432            }
3433        }
3434        txn.commit().unwrap();
3435
3436        let file_size = tmpfile.as_file().metadata().unwrap().len();
3437
3438        let txn = db.begin_write().unwrap();
3439        {
3440            let mut table = txn.open_table(table_definition).unwrap();
3441            for i in 0..2048 {
3442                table.remove(&i).unwrap();
3443            }
3444        }
3445        txn.commit().unwrap();
3446
3447        // Perform a couple more commits to be sure the database has a chance to compact
3448        let txn = db.begin_write().unwrap();
3449        {
3450            let mut table = txn.open_table(table_definition).unwrap();
3451            table.insert(0, [].as_slice()).unwrap();
3452        }
3453        txn.commit().unwrap();
3454        let txn = db.begin_write().unwrap();
3455        {
3456            let mut table = txn.open_table(table_definition).unwrap();
3457            table.remove(0).unwrap();
3458        }
3459        txn.commit().unwrap();
3460        let txn = db.begin_write().unwrap();
3461        txn.commit().unwrap();
3462
3463        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
3464        assert!(final_file_size < file_size);
3465    }
3466
3467    #[test]
3468    fn create_new_db_in_empty_file() {
3469        let tmpfile = crate::create_tempfile();
3470
3471        let _db = Database::builder()
3472            .create_file(tmpfile.into_file())
3473            .unwrap();
3474    }
3475
3476    #[test]
3477    fn open_missing_file() {
3478        let tmpfile = crate::create_tempfile();
3479
3480        let err = Database::builder()
3481            .open(tmpfile.path().with_extension("missing"))
3482            .unwrap_err();
3483
3484        match err {
3485            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
3486            err => panic!("Unexpected error for empty file: {err}"),
3487        }
3488    }
3489
3490    #[test]
3491    fn open_empty_file() {
3492        let tmpfile = crate::create_tempfile();
3493
3494        let err = Database::builder().open(tmpfile.path()).unwrap_err();
3495
3496        match err {
3497            DatabaseError::Storage(StorageError::FormatError { .. }) => {}
3498            err => panic!("Unexpected error for empty file: {err}"),
3499        }
3500    }
3501
3502    #[test]
3503    fn salvage_valid_database() {
3504        const T1: TableDefinition<&str, u64> = TableDefinition::new("users");
3505        const T2: TableDefinition<u64, &[u8]> = TableDefinition::new("blobs");
3506
3507        let src = crate::create_tempfile();
3508        let dst = crate::create_tempfile();
3509
3510        // Populate source database with two tables
3511        {
3512            let db = Database::create(src.path()).unwrap();
3513            let txn = db.begin_write().unwrap();
3514            {
3515                let mut t = txn.open_table(T1).unwrap();
3516                t.insert("alice", &1u64).unwrap();
3517                t.insert("bob", &2u64).unwrap();
3518                t.insert("charlie", &3u64).unwrap();
3519            }
3520            {
3521                let mut t = txn.open_table(T2).unwrap();
3522                t.insert(100u64, b"hello".as_slice()).unwrap();
3523                t.insert(200u64, b"world".as_slice()).unwrap();
3524            }
3525            txn.commit().unwrap();
3526        }
3527
3528        let report = Database::salvage(src.path(), dst.path()).unwrap();
3529
3530        assert_eq!(report.tables_found, 2);
3531        assert_eq!(report.tables_recovered, 2);
3532        assert!(
3533            report.rows_recovered >= 5,
3534            "expected >= 5 rows, got {rows}",
3535            rows = report.rows_recovered
3536        );
3537        assert_eq!(report.rows_lost, 0);
3538        assert!(report.corrupt_details.is_empty());
3539
3540        // Verify recovered data in the output database
3541        let db = Database::open(dst.path()).unwrap();
3542        let txn = db.begin_read().unwrap();
3543        {
3544            let raw: TableDefinition<&[u8], &[u8]> = TableDefinition::new("users");
3545            let t = txn.open_table(raw).unwrap();
3546            assert_eq!(t.len().unwrap(), 3);
3547        }
3548        {
3549            let raw: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blobs");
3550            let t = txn.open_table(raw).unwrap();
3551            assert_eq!(t.len().unwrap(), 2);
3552        }
3553    }
3554
3555    #[test]
3556    fn salvage_empty_file_returns_error() {
3557        let src = crate::create_tempfile();
3558        let dst = crate::create_tempfile();
3559
3560        // salvage on an empty/corrupted file should return an error
3561        let result = Database::salvage(src.path(), dst.path());
3562        assert!(result.is_err());
3563    }
3564
3565    #[test]
3566    fn salvage_with_data_corruption() {
3567        const TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("data");
3568
3569        let src = crate::create_tempfile();
3570        let dst = crate::create_tempfile();
3571
3572        // Write enough data to create multiple B-tree pages
3573        {
3574            let db = Database::create(src.path()).unwrap();
3575            let txn = db.begin_write().unwrap();
3576            {
3577                let mut t = txn.open_table(TABLE).unwrap();
3578                // Write many rows with large values to force B-tree splits
3579                let payload = [0xABu8; 200];
3580                for i in 0..500u64 {
3581                    t.insert(i, payload.as_slice()).unwrap();
3582                }
3583            }
3584            txn.commit().unwrap();
3585        }
3586
3587        // Corrupt some data pages in the middle of the file
3588        {
3589            use std::io::{Seek, SeekFrom, Write};
3590            let mut f = std::fs::OpenOptions::new()
3591                .write(true)
3592                .open(src.path())
3593                .unwrap();
3594            let file_len = f.metadata().unwrap().len();
3595            // Corrupt a region in the middle-third of the file (likely B-tree data pages)
3596            let corrupt_offset = file_len / 3;
3597            f.seek(SeekFrom::Start(corrupt_offset)).unwrap();
3598            f.write_all(&[0xFF; 4096]).unwrap();
3599            f.sync_all().unwrap();
3600        }
3601
3602        // Salvage should succeed (possibly with some lost rows)
3603        let report = Database::salvage(src.path(), dst.path()).unwrap();
3604        // We should recover at least some data
3605        assert!(
3606            report.rows_recovered > 0 || report.tables_found > 0,
3607            "expected some recovery, got: {report:?}"
3608        );
3609    }
3610
3611    #[test]
3612    fn online_compaction_reduces_file_size() {
3613        const TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("data");
3614
3615        let tmpfile = crate::create_tempfile();
3616        let db = Database::create(tmpfile.path()).unwrap();
3617
3618        // Write a bunch of data
3619        let payload = [0xCDu8; 512];
3620        let txn = db.begin_write().unwrap();
3621        {
3622            let mut t = txn.open_table(TABLE).unwrap();
3623            for i in 0..500u64 {
3624                t.insert(i, payload.as_slice()).unwrap();
3625            }
3626        }
3627        txn.commit().unwrap();
3628
3629        // Delete most of it to create free space
3630        let txn = db.begin_write().unwrap();
3631        {
3632            let mut t = txn.open_table(TABLE).unwrap();
3633            for i in 0..450u64 {
3634                t.remove(i).unwrap();
3635            }
3636        }
3637        txn.commit().unwrap();
3638
3639        let size_before = std::fs::metadata(tmpfile.path()).unwrap().len();
3640
3641        // Run online compaction (takes &self, not &mut self)
3642        let handle = db.start_compaction().unwrap();
3643        let steps = handle.run().unwrap();
3644        assert!(steps > 0, "expected at least one compaction step");
3645
3646        let size_after = std::fs::metadata(tmpfile.path()).unwrap().len();
3647        assert!(
3648            size_after < size_before,
3649            "file should shrink: before={size_before}, after={size_after}"
3650        );
3651
3652        // Verify remaining data is intact
3653        let txn = db.begin_read().unwrap();
3654        let t = txn.open_table(TABLE).unwrap();
3655        assert_eq!(t.len().unwrap(), 50);
3656        for i in 450..500u64 {
3657            let val = t.get(i).unwrap().unwrap();
3658            assert_eq!(val.value(), payload.as_slice());
3659        }
3660    }
3661
3662    #[test]
3663    fn online_compaction_allows_concurrent_reads() {
3664        const TABLE: TableDefinition<u64, u64> = TableDefinition::new("nums");
3665
3666        let tmpfile = crate::create_tempfile();
3667        let db = Database::create(tmpfile.path()).unwrap();
3668
3669        // Populate
3670        let txn = db.begin_write().unwrap();
3671        {
3672            let mut t = txn.open_table(TABLE).unwrap();
3673            for i in 0..100u64 {
3674                t.insert(i, &(i * 10)).unwrap();
3675            }
3676        }
3677        txn.commit().unwrap();
3678
3679        // Delete half to create reclaimable space
3680        let txn = db.begin_write().unwrap();
3681        {
3682            let mut t = txn.open_table(TABLE).unwrap();
3683            for i in 0..50u64 {
3684                t.remove(i).unwrap();
3685            }
3686        }
3687        txn.commit().unwrap();
3688
3689        // Open a read transaction BEFORE compaction
3690        let read_txn = db.begin_read().unwrap();
3691        let read_table = read_txn.open_table(TABLE).unwrap();
3692
3693        // Run one compaction step while read transaction is open
3694        let handle = db.start_compaction().unwrap();
3695        let progress = handle.step().unwrap();
3696        // May or may not have relocated pages, but should not error
3697        let _ = progress;
3698
3699        // Read transaction should still work correctly
3700        // (it sees a snapshot from before compaction started)
3701        assert_eq!(read_table.len().unwrap(), 50);
3702        for i in 50..100u64 {
3703            let val = read_table.get(i).unwrap().unwrap();
3704            assert_eq!(val.value(), i * 10);
3705        }
3706    }
3707
3708    #[test]
3709    fn online_compaction_rejects_persistent_savepoint() {
3710        const TABLE: TableDefinition<u64, u64> = TableDefinition::new("sp_test");
3711
3712        let tmpfile = crate::create_tempfile();
3713        let db = Database::create(tmpfile.path()).unwrap();
3714
3715        // Write some data first
3716        let txn = db.begin_write().unwrap();
3717        {
3718            let mut t = txn.open_table(TABLE).unwrap();
3719            t.insert(1u64, &1u64).unwrap();
3720        }
3721        txn.commit().unwrap();
3722
3723        // Create a persistent savepoint (must be done before opening any tables)
3724        let txn = db.begin_write().unwrap();
3725        let _sp = txn.persistent_savepoint().unwrap();
3726        txn.commit().unwrap();
3727
3728        // start_compaction should fail because of persistent savepoint
3729        let result = db.start_compaction();
3730        assert!(result.is_err());
3731    }
3732}