Skip to main content

redb/
db.rs

1use crate::transaction_tracker::{TransactionId, TransactionTracker};
2use crate::tree_store::{
3    BtreeHeader, InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, ReadOnlyBackend,
4    ShrinkPolicy, TableTree, TableType, TransactionalMemory,
5};
6use crate::types::{Key, Value};
7use crate::{
8    CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
9};
10use crate::{ReadTransaction, Result, WriteTransaction};
11use std::fmt::{Debug, Display, Formatter};
12
13use std::fs::{File, OpenOptions};
14use std::marker::PhantomData;
15use std::path::Path;
16use std::sync::Arc;
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22    ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
23    DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
24    TransactionIdWithPagination,
25};
26use crate::tree_store::file_backend::FileBackend;
27#[cfg(feature = "logging")]
28use log::{debug, info, warn};
29
30#[allow(clippy::len_without_is_empty)]
31/// Implements persistent storage for a database.
32pub trait StorageBackend: 'static + Debug + Send + Sync {
33    /// Gets the current length of the storage.
34    fn len(&self) -> std::result::Result<u64, io::Error>;
35
36    /// Reads the specified array of bytes from the storage.
37    ///
38    /// If `out.len()` + `offset` exceeds the length of the storage an appropriate `Error` must be returned.
39    fn read(&self, offset: u64, out: &mut [u8]) -> std::result::Result<(), io::Error>;
40
41    /// Sets the length of the storage.
42    ///
43    /// New positions in the storage must be initialized to zero.
44    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
45
46    /// Syncs all buffered data with the persistent storage.
47    fn sync_data(&self) -> std::result::Result<(), io::Error>;
48
49    /// Writes the specified array to the storage.
50    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
51
52    /// Release any resources held by the backend
53    ///
54    /// Note: redb will not access the backend after calling this method and will call it exactly
55    /// once when the [`Database`] is dropped
56    fn close(&self) -> std::result::Result<(), io::Error> {
57        Ok(())
58    }
59}
60
61pub trait TableHandle: Sealed {
62    // Returns the name of the table
63    fn name(&self) -> &str;
64}
65
66#[derive(Clone)]
67pub struct UntypedTableHandle {
68    name: String,
69}
70
71impl UntypedTableHandle {
72    pub(crate) fn new(name: String) -> Self {
73        Self { name }
74    }
75}
76
77impl TableHandle for UntypedTableHandle {
78    fn name(&self) -> &str {
79        &self.name
80    }
81}
82
83impl Sealed for UntypedTableHandle {}
84
85pub trait MultimapTableHandle: Sealed {
86    // Returns the name of the multimap table
87    fn name(&self) -> &str;
88}
89
90#[derive(Clone)]
91pub struct UntypedMultimapTableHandle {
92    name: String,
93}
94
95impl UntypedMultimapTableHandle {
96    pub(crate) fn new(name: String) -> Self {
97        Self { name }
98    }
99}
100
101impl MultimapTableHandle for UntypedMultimapTableHandle {
102    fn name(&self) -> &str {
103        &self.name
104    }
105}
106
107impl Sealed for UntypedMultimapTableHandle {}
108
109/// Defines the name and types of a table
110///
111/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
112///
113/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
114/// that is stored or retreived from the table
115pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
116    name: &'a str,
117    _key_type: PhantomData<K>,
118    _value_type: PhantomData<V>,
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
122    /// Construct a new table with given `name`
123    ///
124    /// ## Invariant
125    ///
126    /// `name` must not be empty.
127    pub const fn new(name: &'a str) -> Self {
128        assert!(!name.is_empty());
129        Self {
130            name,
131            _key_type: PhantomData,
132            _value_type: PhantomData,
133        }
134    }
135}
136
137impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
138    fn name(&self) -> &str {
139        self.name
140    }
141}
142
143impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
144
145impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
146    fn clone(&self) -> Self {
147        *self
148    }
149}
150
151impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
152
153impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        write!(
156            f,
157            "{}<{}, {}>",
158            self.name,
159            K::type_name().name(),
160            V::type_name().name()
161        )
162    }
163}
164
165/// Defines the name and types of a multimap table
166///
167/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
168///
169/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
170///
171/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
172/// that is stored or retreived from the table
173pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
174    name: &'a str,
175    _key_type: PhantomData<K>,
176    _value_type: PhantomData<V>,
177}
178
179impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
180    pub const fn new(name: &'a str) -> Self {
181        assert!(!name.is_empty());
182        Self {
183            name,
184            _key_type: PhantomData,
185            _value_type: PhantomData,
186        }
187    }
188}
189
190impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
191    fn name(&self) -> &str {
192        self.name
193    }
194}
195
196impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
197
198impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
199    fn clone(&self) -> Self {
200        *self
201    }
202}
203
204impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
205
206impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
207    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208        write!(
209            f,
210            "{}<{}, {}>",
211            self.name,
212            K::type_name().name(),
213            V::type_name().name()
214        )
215    }
216}
217
218/// Information regarding the usage of the in-memory cache
219///
220/// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
221#[derive(Debug)]
222pub struct CacheStats {
223    pub(crate) evictions: u64,
224    pub(crate) read_hits: u64,
225    pub(crate) read_misses: u64,
226    pub(crate) write_hits: u64,
227    pub(crate) write_misses: u64,
228    pub(crate) used_bytes: usize,
229}
230
231impl CacheStats {
232    /// Number of times that data has been evicted, due to the cache being full
233    ///
234    /// To increase the cache size use [`Builder::set_cache_size`]
235    pub fn evictions(&self) -> u64 {
236        self.evictions
237    }
238
239    /// Number of times that unmodified data has been read from the cache
240    pub fn read_hits(&self) -> u64 {
241        self.read_hits
242    }
243
244    /// Number of times that unmodified data was not in the cache and was read from storage
245    pub fn read_misses(&self) -> u64 {
246        self.read_misses
247    }
248
249    /// Number of times that data modified in a transaction has been read from the cache
250    pub fn write_hits(&self) -> u64 {
251        self.write_hits
252    }
253
254    /// Number of times that data modified in a transaction was not in the cache and was read from storage
255    pub fn write_misses(&self) -> u64 {
256        self.write_misses
257    }
258
259    /// Number of bytes in the cache
260    pub fn used_bytes(&self) -> usize {
261        self.used_bytes
262    }
263}
264
265pub(crate) struct TransactionGuard {
266    transaction_tracker: Option<Arc<TransactionTracker>>,
267    transaction_id: Option<TransactionId>,
268    write_transaction: bool,
269}
270
271impl TransactionGuard {
272    pub(crate) fn new_read(
273        transaction_id: TransactionId,
274        tracker: Arc<TransactionTracker>,
275    ) -> Self {
276        Self {
277            transaction_tracker: Some(tracker),
278            transaction_id: Some(transaction_id),
279            write_transaction: false,
280        }
281    }
282
283    pub(crate) fn new_write(
284        transaction_id: TransactionId,
285        tracker: Arc<TransactionTracker>,
286    ) -> Self {
287        Self {
288            transaction_tracker: Some(tracker),
289            transaction_id: Some(transaction_id),
290            write_transaction: true,
291        }
292    }
293
294    // TODO: remove this hack
295    pub(crate) fn fake() -> Self {
296        Self {
297            transaction_tracker: None,
298            transaction_id: None,
299            write_transaction: false,
300        }
301    }
302
303    pub(crate) fn id(&self) -> TransactionId {
304        self.transaction_id.unwrap()
305    }
306
307    pub(crate) fn leak(mut self) -> TransactionId {
308        self.transaction_id.take().unwrap()
309    }
310}
311
312impl Drop for TransactionGuard {
313    fn drop(&mut self) {
314        if self.transaction_tracker.is_none() {
315            return;
316        }
317        if let Some(transaction_id) = self.transaction_id {
318            if self.write_transaction {
319                self.transaction_tracker
320                    .as_ref()
321                    .unwrap()
322                    .end_write_transaction(transaction_id);
323            } else {
324                self.transaction_tracker
325                    .as_ref()
326                    .unwrap()
327                    .deallocate_read_transaction(transaction_id);
328            }
329        }
330    }
331}
332
333pub trait ReadableDatabase {
334    /// Begins a read transaction
335    ///
336    /// Captures a snapshot of the database, so that only data committed before calling this method
337    /// is visible in the transaction
338    ///
339    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
340    /// may exist concurrently with writes
341    fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
342
343    /// Information regarding the usage of the in-memory cache
344    ///
345    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
346    fn cache_stats(&self) -> CacheStats;
347}
348
349/// A redb database opened in read-only mode
350///
351/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
352///
353/// Multiple processes may open a [`ReadOnlyDatabase`], but it may not be opened concurrently
354/// with a [`Database`].
355///
356/// # Examples
357///
358/// Basic usage:
359///
360/// ```rust
361/// use redb::*;
362/// # use tempfile::NamedTempFile;
363/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
364///
365/// # fn main() -> Result<(), Error> {
366/// # #[cfg(not(target_os = "wasi"))]
367/// # let tmpfile = NamedTempFile::new().unwrap();
368/// # #[cfg(target_os = "wasi")]
369/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
370/// # let filename = tmpfile.path();
371/// let db = Database::create(filename)?;
372/// let txn = db.begin_write()?;
373/// {
374///     let mut table = txn.open_table(TABLE)?;
375///     table.insert(&0, &0)?;
376/// }
377/// txn.commit()?;
378/// drop(db);
379///
380/// let db = ReadOnlyDatabase::open(filename)?;
381/// let txn = db.begin_read()?;
382/// {
383///     let mut table = txn.open_table(TABLE)?;
384///     println!("{}", table.get(&0)?.unwrap().value());
385/// }
386/// # Ok(())
387/// # }
388/// ```
389pub struct ReadOnlyDatabase {
390    mem: Arc<TransactionalMemory>,
391    transaction_tracker: Arc<TransactionTracker>,
392}
393
394impl ReadableDatabase for ReadOnlyDatabase {
395    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
396        let id = self
397            .transaction_tracker
398            .register_read_transaction(&self.mem)?;
399        #[cfg(feature = "logging")]
400        debug!("Beginning read transaction id={id:?}");
401
402        let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
403
404        ReadTransaction::new(self.mem.clone(), guard)
405    }
406
407    fn cache_stats(&self) -> CacheStats {
408        self.mem.cache_stats()
409    }
410}
411
412impl ReadOnlyDatabase {
413    /// Opens an existing redb database.
414    pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
415        Builder::new().open_read_only(path)
416    }
417
418    fn new(
419        file: Box<dyn StorageBackend>,
420        page_size: usize,
421        region_size: Option<u64>,
422        read_cache_size_bytes: usize,
423    ) -> Result<Self, DatabaseError> {
424        #[cfg(feature = "logging")]
425        let file_path = format!("{:?}", &file);
426        #[cfg(feature = "logging")]
427        info!("Opening database in read-only {:?}", &file_path);
428        let mem = TransactionalMemory::new(
429            Box::new(ReadOnlyBackend::new(file)),
430            false,
431            page_size,
432            region_size,
433            read_cache_size_bytes,
434            0,
435            true,
436        )?;
437        let mem = Arc::new(mem);
438        // If the last transaction used 2-phase commit and updated the allocator state table, then
439        // we can just load the allocator state from there. Otherwise, we need a full repair
440        if let Some(tree) = Database::get_allocator_state_table(&mem)? {
441            mem.load_allocator_state(&tree)?;
442        } else {
443            #[cfg(feature = "logging")]
444            warn!(
445                "Database {:?} not shutdown cleanly. Repair required",
446                &file_path
447            );
448            return Err(DatabaseError::RepairAborted);
449        }
450
451        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
452        let db = Self {
453            mem,
454            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
455        };
456
457        Ok(db)
458    }
459}
460
461/// Opened redb database file
462///
463/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
464/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
465///
466/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
467/// may be in progress at a time.
468///
469/// # Examples
470///
471/// Basic usage:
472///
473/// ```rust
474/// use redb::*;
475/// # use tempfile::NamedTempFile;
476/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
477///
478/// # fn main() -> Result<(), Error> {
479/// # #[cfg(not(target_os = "wasi"))]
480/// # let tmpfile = NamedTempFile::new().unwrap();
481/// # #[cfg(target_os = "wasi")]
482/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
483/// # let filename = tmpfile.path();
484/// let db = Database::create(filename)?;
485/// let write_txn = db.begin_write()?;
486/// {
487///     let mut table = write_txn.open_table(TABLE)?;
488///     table.insert(&0, &0)?;
489/// }
490/// write_txn.commit()?;
491/// # Ok(())
492/// # }
493/// ```
494pub struct Database {
495    mem: Arc<TransactionalMemory>,
496    transaction_tracker: Arc<TransactionTracker>,
497}
498
499impl ReadableDatabase for Database {
500    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
501        let guard = self.allocate_read_transaction()?;
502        #[cfg(feature = "logging")]
503        debug!("Beginning read transaction id={:?}", guard.id());
504        ReadTransaction::new(self.get_memory(), guard)
505    }
506
507    fn cache_stats(&self) -> CacheStats {
508        self.mem.cache_stats()
509    }
510}
511
512impl Database {
513    /// Opens the specified file as a redb database.
514    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
515    /// * if the file is a valid redb database, it will be opened
516    /// * otherwise this function will return an error
517    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
518        Self::builder().create(path)
519    }
520
521    /// Opens an existing redb database.
522    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
523        Self::builder().open(path)
524    }
525
526    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
527        self.mem.clone()
528    }
529
530    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
531        let table_tree = TableTree::new(
532            mem.get_data_root(),
533            PageHint::None,
534            Arc::new(TransactionGuard::fake()),
535            mem.clone(),
536        )?;
537        if !table_tree.verify_checksums()? {
538            return Ok(false);
539        }
540        let system_table_tree = TableTree::new(
541            mem.get_system_root(),
542            PageHint::None,
543            Arc::new(TransactionGuard::fake()),
544            mem.clone(),
545        )?;
546        if !system_table_tree.verify_checksums()? {
547            return Ok(false);
548        }
549
550        Ok(true)
551    }
552
553    /// Force a check of the integrity of the database file, and repair it if possible.
554    ///
555    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
556    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
557    /// quite slow and should only be used when you suspect the database file may have been modified
558    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
559    ///
560    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
561    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
562    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
563        let allocator_hash = self.mem.allocator_hash();
564        let mut was_clean = Arc::get_mut(&mut self.mem)
565            .unwrap()
566            .clear_cache_and_reload()?;
567
568        let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
569
570        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
571            DatabaseError::Storage(storage_err) => storage_err,
572            _ => unreachable!(),
573        })?;
574
575        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
576            was_clean = false;
577        }
578
579        if !was_clean {
580            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
581            let [data_root, system_root] = new_roots;
582            self.mem.commit(
583                data_root,
584                system_root,
585                next_transaction_id,
586                true,
587                ShrinkPolicy::Never,
588            )?;
589        }
590
591        self.mem.begin_writable()?;
592
593        Ok(was_clean)
594    }
595
596    /// Compacts the database file
597    ///
598    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
599    pub fn compact(&mut self) -> Result<bool, CompactionError> {
600        if self
601            .transaction_tracker
602            .oldest_live_read_transaction()
603            .is_some()
604        {
605            return Err(CompactionError::TransactionInProgress);
606        }
607        // Commit to free up any pending free pages
608        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
609        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
610        // can cancel the compaction without requiring a full repair afterwards
611        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
612        if txn.list_persistent_savepoints()?.next().is_some() {
613            return Err(CompactionError::PersistentSavepointExists);
614        }
615        if self.transaction_tracker.any_savepoint_exists() {
616            return Err(CompactionError::EphemeralSavepointExists);
617        }
618        txn.set_two_phase_commit(true);
619        txn.commit().map_err(|e| e.into_storage_error())?;
620        // Repeat, just in case executing list_persistent_savepoints() created a new table
621        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
622        txn.set_two_phase_commit(true);
623        txn.commit().map_err(|e| e.into_storage_error())?;
624        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
625        // should have been cleared out by the above commit()
626        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
627        assert!(!txn.pending_free_pages()?);
628        txn.abort()?;
629
630        let mut compacted = false;
631        // Iteratively compact until no progress is made
632        loop {
633            let mut progress = false;
634
635            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
636            if txn.compact_pages()? {
637                progress = true;
638                txn.commit().map_err(|e| e.into_storage_error())?;
639            } else {
640                txn.abort()?;
641            }
642
643            // Double commit to free up the relocated pages for reuse
644            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
645            txn.set_two_phase_commit(true);
646            // Also shrink the database file by the maximum amount
647            txn.set_shrink_policy(ShrinkPolicy::Maximum);
648            txn.commit().map_err(|e| e.into_storage_error())?;
649            // Triple commit to free up the relocated pages for reuse
650            // TODO: this really shouldn't be necessary, but the data freed tree is a system table
651            // and so free'ing up its pages causes more deletes from the system tree
652            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
653            txn.set_two_phase_commit(true);
654            // Also shrink the database file by the maximum amount
655            txn.set_shrink_policy(ShrinkPolicy::Maximum);
656            txn.commit().map_err(|e| e.into_storage_error())?;
657            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
658            assert!(!txn.pending_free_pages()?);
659            txn.abort()?;
660
661            if !progress {
662                break;
663            }
664
665            compacted = true;
666        }
667
668        Ok(compacted)
669    }
670
671    #[cfg_attr(not(debug_assertions), expect(dead_code))]
672    fn check_repaired_allocated_pages_table(
673        system_root: Option<BtreeHeader>,
674        mem: Arc<TransactionalMemory>,
675    ) -> Result {
676        let table_tree = TableTree::new(
677            system_root,
678            PageHint::None,
679            Arc::new(TransactionGuard::fake()),
680            mem.clone(),
681        )?;
682        if let Some(table_def) = table_tree
683            .get_table::<TransactionIdWithPagination, PageList>(
684                DATA_ALLOCATED_TABLE.name(),
685                TableType::Normal,
686            )
687            .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
688        {
689            let InternalTableDefinition::Normal { table_root, .. } = table_def else {
690                unreachable!()
691            };
692            let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
693                DATA_ALLOCATED_TABLE.name().to_string(),
694                table_root,
695                PageHint::None,
696                Arc::new(TransactionGuard::fake()),
697                mem.clone(),
698            )?;
699            for result in table.range::<TransactionIdWithPagination>(..)? {
700                let (_, pages) = result?;
701                for i in 0..pages.value().len() {
702                    assert!(mem.is_allocated(pages.value().get(i)));
703                }
704            }
705        }
706
707        Ok(())
708    }
709
710    fn visit_freed_tree<K: Key, V: Value, F>(
711        system_root: Option<BtreeHeader>,
712        table_def: SystemTableDefinition<K, V>,
713        mem: Arc<TransactionalMemory>,
714        mut visitor: F,
715    ) -> Result
716    where
717        F: FnMut(PageNumber) -> Result,
718    {
719        let fake_guard = Arc::new(TransactionGuard::fake());
720        let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
721        let table_name = table_def.name();
722        let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
723            Ok(result) => result,
724            Err(TableError::Storage(err)) => {
725                return Err(err);
726            }
727            Err(TableError::TableDoesNotExist(_)) => {
728                return Ok(());
729            }
730            Err(_) => {
731                return Err(StorageError::Corrupted(format!(
732                    "Unable to open {table_name}"
733                )));
734            }
735        };
736
737        if let Some(definition) = result {
738            let table_root = match definition {
739                InternalTableDefinition::Normal { table_root, .. } => table_root,
740                InternalTableDefinition::Multimap { .. } => unreachable!(),
741            };
742            let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
743                ReadOnlyTable::new(
744                    table_name.to_string(),
745                    table_root,
746                    PageHint::None,
747                    Arc::new(TransactionGuard::fake()),
748                    mem.clone(),
749                )?;
750            for result in table.range::<TransactionIdWithPagination>(..)? {
751                let (_, page_list) = result?;
752                for i in 0..page_list.value().len() {
753                    visitor(page_list.value().get(i))?;
754                }
755            }
756        }
757
758        Ok(())
759    }
760
761    #[cfg(debug_assertions)]
762    fn mark_allocated_page_for_debug(
763        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
764    ) -> Result {
765        let data_root = mem.get_data_root();
766        {
767            let fake = Arc::new(TransactionGuard::fake());
768            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
769            tables.visit_all_pages(|path| {
770                mem.mark_debug_allocated_page(path.page_number());
771                Ok(())
772            })?;
773        }
774
775        let system_root = mem.get_system_root();
776        {
777            let fake = Arc::new(TransactionGuard::fake());
778            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
779            system_tables.visit_all_pages(|path| {
780                mem.mark_debug_allocated_page(path.page_number());
781                Ok(())
782            })?;
783        }
784
785        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
786            mem.mark_debug_allocated_page(page);
787            Ok(())
788        })?;
789        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
790            mem.mark_debug_allocated_page(page);
791            Ok(())
792        })?;
793
794        Ok(())
795    }
796
797    fn do_repair(
798        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
799        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
800    ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
801        if !Self::verify_primary_checksums(mem.clone())? {
802            if mem.used_two_phase_commit() {
803                return Err(DatabaseError::Storage(StorageError::Corrupted(
804                    "Primary is corrupted despite 2-phase commit".to_string(),
805                )));
806            }
807
808            // 0.3 because the repair takes 3 full scans and the first is done now
809            let mut handle = RepairSession::new(0.3);
810            repair_callback(&mut handle);
811            if handle.aborted() {
812                return Err(DatabaseError::RepairAborted);
813            }
814
815            mem.repair_primary_corrupted();
816            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
817            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
818            // that rolls back a partially committed transaction.
819            mem.clear_read_cache();
820            if !Self::verify_primary_checksums(mem.clone())? {
821                return Err(DatabaseError::Storage(StorageError::Corrupted(
822                    "Failed to repair database. All roots are corrupted".to_string(),
823                )));
824            }
825        }
826        // 0.6 because the repair takes 3 full scans and the second is done now
827        let mut handle = RepairSession::new(0.6);
828        repair_callback(&mut handle);
829        if handle.aborted() {
830            return Err(DatabaseError::RepairAborted);
831        }
832
833        mem.begin_repair()?;
834
835        let data_root = mem.get_data_root();
836        {
837            let fake = Arc::new(TransactionGuard::fake());
838            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
839            tables.visit_all_pages(|path| {
840                mem.mark_page_allocated(path.page_number());
841                Ok(())
842            })?;
843        }
844
845        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
846        let mut handle = RepairSession::new(0.9);
847        repair_callback(&mut handle);
848        if handle.aborted() {
849            return Err(DatabaseError::RepairAborted);
850        }
851
852        let system_root = mem.get_system_root();
853        {
854            let fake = Arc::new(TransactionGuard::fake());
855            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
856            system_tables.visit_all_pages(|path| {
857                mem.mark_page_allocated(path.page_number());
858                Ok(())
859            })?;
860        }
861
862        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
863            mem.mark_page_allocated(page);
864            Ok(())
865        })?;
866        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
867            mem.mark_page_allocated(page);
868            Ok(())
869        })?;
870        #[cfg(debug_assertions)]
871        {
872            Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
873        }
874
875        mem.end_repair()?;
876
877        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
878        // by storing an empty root during the below commit()
879        mem.clear_read_cache();
880
881        Ok([data_root, system_root])
882    }
883
884    #[allow(clippy::too_many_arguments)]
885    fn new(
886        file: Box<dyn StorageBackend>,
887        allow_initialize: bool,
888        page_size: usize,
889        region_size: Option<u64>,
890        read_cache_size_bytes: usize,
891        write_cache_size_bytes: usize,
892        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
893    ) -> Result<Self, DatabaseError> {
894        #[cfg(feature = "logging")]
895        let file_path = format!("{:?}", &file);
896        #[cfg(feature = "logging")]
897        info!("Opening database {:?}", &file_path);
898        let mem = TransactionalMemory::new(
899            file,
900            allow_initialize,
901            page_size,
902            region_size,
903            read_cache_size_bytes,
904            write_cache_size_bytes,
905            false,
906        )?;
907        let mut mem = Arc::new(mem);
908        // If the last transaction used 2-phase commit and updated the allocator state table, then
909        // we can just load the allocator state from there. Otherwise, we need a full repair
910        if let Some(tree) = Self::get_allocator_state_table(&mem)? {
911            #[cfg(feature = "logging")]
912            info!("Found valid allocator state, full repair not needed");
913            mem.load_allocator_state(&tree)?;
914            #[cfg(debug_assertions)]
915            Self::mark_allocated_page_for_debug(&mut mem)?;
916        } else {
917            #[cfg(feature = "logging")]
918            warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
919            let mut handle = RepairSession::new(0.0);
920            repair_callback(&mut handle);
921            if handle.aborted() {
922                return Err(DatabaseError::RepairAborted);
923            }
924            let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
925            let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
926            mem.commit(
927                data_root,
928                system_root,
929                next_transaction_id,
930                true,
931                ShrinkPolicy::Never,
932            )?;
933        }
934
935        mem.begin_writable()?;
936        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
937
938        let db = Database {
939            mem,
940            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
941        };
942
943        // Restore the tracker state for any persistent savepoints
944        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
945        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
946            db.transaction_tracker
947                .restore_savepoint_counter_state(next_id);
948        }
949        for id in txn.list_persistent_savepoints()? {
950            let savepoint = match txn.get_persistent_savepoint(id) {
951                Ok(savepoint) => savepoint,
952                Err(err) => match err {
953                    SavepointError::InvalidSavepoint => unreachable!(),
954                    SavepointError::Storage(storage) => {
955                        return Err(storage.into());
956                    }
957                },
958            };
959            db.transaction_tracker
960                .register_persistent_savepoint(&savepoint);
961        }
962        txn.abort()?;
963
964        Ok(db)
965    }
966
967    fn get_allocator_state_table(
968        mem: &Arc<TransactionalMemory>,
969    ) -> Result<Option<AllocatorStateTree>> {
970        // The allocator state table is only valid if the primary was written using 2-phase commit
971        if !mem.used_two_phase_commit() {
972            return Ok(None);
973        }
974
975        // See if it's present in the system table tree
976        let system_table_tree = TableTree::new(
977            mem.get_system_root(),
978            PageHint::None,
979            Arc::new(TransactionGuard::fake()),
980            mem.clone(),
981        )?;
982        let Some(allocator_state_table) = system_table_tree
983            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
984            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
985        else {
986            return Ok(None);
987        };
988
989        // Load the allocator state table
990        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
991            unreachable!();
992        };
993        let tree = AllocatorStateTree::new(
994            table_root,
995            PageHint::None,
996            Arc::new(TransactionGuard::fake()),
997            mem.clone(),
998        )?;
999
1000        // Make sure this isn't stale allocator state left over from a previous transaction
1001        if !mem.is_valid_allocator_state(&tree)? {
1002            return Ok(None);
1003        }
1004
1005        Ok(Some(tree))
1006    }
1007
1008    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1009        let id = self
1010            .transaction_tracker
1011            .register_read_transaction(&self.mem)?;
1012
1013        Ok(TransactionGuard::new_read(
1014            id,
1015            self.transaction_tracker.clone(),
1016        ))
1017    }
1018
1019    /// Convenience method for [`Builder::new`]
1020    pub fn builder() -> Builder {
1021        Builder::new()
1022    }
1023
1024    /// Begins a write transaction
1025    ///
1026    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
1027    /// write may be in progress at a time. If a write is in progress, this function will block
1028    /// until it completes.
1029    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1030        // Fail early if there has been an I/O error -- nothing can be committed in that case
1031        self.mem.check_io_errors()?;
1032        let guard = TransactionGuard::new_write(
1033            self.transaction_tracker.start_write_transaction(),
1034            self.transaction_tracker.clone(),
1035        );
1036        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1037            .map_err(|e| e.into())
1038    }
1039
1040    fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
1041        // Make a new quick-repair commit to update the allocator state table
1042        #[cfg(feature = "logging")]
1043        debug!("Writing allocator state table");
1044        let mut tx = self.begin_write()?;
1045        tx.set_quick_repair(true);
1046        tx.set_shrink_policy(ShrinkPolicy::Maximum);
1047        tx.commit()?;
1048
1049        Ok(())
1050    }
1051}
1052
1053impl Drop for Database {
1054    fn drop(&mut self) {
1055        if !thread::panicking() && self.ensure_allocator_state_table_and_trim().is_err() {
1056            #[cfg(feature = "logging")]
1057            warn!("Failed to write allocator state table. Repair may be required at restart.");
1058        }
1059
1060        if self.mem.close().is_err() {
1061            #[cfg(feature = "logging")]
1062            warn!("Failed to flush database file. Repair may be required at restart.");
1063        }
1064    }
1065}
1066
1067pub struct RepairSession {
1068    progress: f64,
1069    aborted: bool,
1070}
1071
1072impl RepairSession {
1073    pub(crate) fn new(progress: f64) -> Self {
1074        Self {
1075            progress,
1076            aborted: false,
1077        }
1078    }
1079
1080    pub(crate) fn aborted(&self) -> bool {
1081        self.aborted
1082    }
1083
1084    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
1085    pub fn abort(&mut self) {
1086        self.aborted = true;
1087    }
1088
1089    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
1090    pub fn progress(&self) -> f64 {
1091        self.progress
1092    }
1093}
1094
1095/// Configuration builder of a redb [Database].
1096pub struct Builder {
1097    page_size: usize,
1098    region_size: Option<u64>,
1099    read_cache_size_bytes: usize,
1100    write_cache_size_bytes: usize,
1101    repair_callback: Box<dyn Fn(&mut RepairSession)>,
1102}
1103
1104impl Builder {
1105    /// Construct a new [Builder] with sensible defaults.
1106    ///
1107    /// ## Defaults
1108    ///
1109    /// - `cache_size_bytes`: 1GiB
1110    #[allow(clippy::new_without_default)]
1111    pub fn new() -> Self {
1112        let mut result = Self {
1113            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
1114            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
1115            // It is part of the file format, so can be enabled in the future.
1116            page_size: PAGE_SIZE,
1117            region_size: None,
1118            // TODO: Default should probably take into account the total system memory
1119            read_cache_size_bytes: 0,
1120            // TODO: Default should probably take into account the total system memory
1121            write_cache_size_bytes: 0,
1122            repair_callback: Box::new(|_| {}),
1123        };
1124
1125        result.set_cache_size(1024 * 1024 * 1024);
1126        result
1127    }
1128
1129    /// Set a callback which will be invoked periodically in the event that the database file needs
1130    /// to be repaired.
1131    ///
1132    /// The [`RepairSession`] argument can be used to control the repair process.
1133    ///
1134    /// If the database file needs repair, the callback will be invoked at least once.
1135    /// There is no upper limit on the number of times it may be called.
1136    pub fn set_repair_callback(
1137        &mut self,
1138        callback: impl Fn(&mut RepairSession) + 'static,
1139    ) -> &mut Self {
1140        self.repair_callback = Box::new(callback);
1141        self
1142    }
1143
1144    /// Set the internal page size of the database
1145    ///
1146    /// Valid values are powers of two, greater than or equal to 512
1147    ///
1148    /// ## Defaults
1149    ///
1150    /// Default to 4 Kib pages.
1151    #[cfg(any(fuzzing, test))]
1152    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1153        assert!(size.is_power_of_two());
1154        self.page_size = std::cmp::max(size, 512);
1155        self
1156    }
1157
1158    /// Set the amount of memory (in bytes) used for caching data
1159    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1160        // TODO: allow dynamic expansion of the read/write cache
1161        self.read_cache_size_bytes = bytes / 10 * 9;
1162        self.write_cache_size_bytes = bytes / 10;
1163        self
1164    }
1165
1166    #[cfg(any(test, fuzzing))]
1167    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1168        assert!(size.is_power_of_two());
1169        self.region_size = Some(size);
1170        self
1171    }
1172
1173    /// Opens the specified file as a redb database.
1174    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1175    /// * if the file is a valid redb database, it will be opened
1176    /// * otherwise this function will return an error
1177    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1178        let file = OpenOptions::new()
1179            .read(true)
1180            .write(true)
1181            .create(true)
1182            .truncate(false)
1183            .open(path)?;
1184
1185        Database::new(
1186            Box::new(FileBackend::new(file)?),
1187            true,
1188            self.page_size,
1189            self.region_size,
1190            self.read_cache_size_bytes,
1191            self.write_cache_size_bytes,
1192            &self.repair_callback,
1193        )
1194    }
1195
1196    /// Opens an existing redb database.
1197    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1198        let file = OpenOptions::new().read(true).write(true).open(path)?;
1199
1200        Database::new(
1201            Box::new(FileBackend::new(file)?),
1202            false,
1203            self.page_size,
1204            None,
1205            self.read_cache_size_bytes,
1206            self.write_cache_size_bytes,
1207            &self.repair_callback,
1208        )
1209    }
1210
1211    /// Opens an existing redb database.
1212    ///
1213    /// If the file has been opened for writing (i.e. as a [`Database`]) [`DatabaseError::DatabaseAlreadyOpen`]
1214    /// will be returned on platforms which support file locks (macOS, Windows, Linux). On other platforms,
1215    /// the caller MUST avoid calling this method when the database is open for writing.
1216    pub fn open_read_only(
1217        &self,
1218        path: impl AsRef<Path>,
1219    ) -> Result<ReadOnlyDatabase, DatabaseError> {
1220        let file = OpenOptions::new().read(true).open(path)?;
1221
1222        ReadOnlyDatabase::new(
1223            Box::new(FileBackend::new_internal(file, true)?),
1224            self.page_size,
1225            None,
1226            self.read_cache_size_bytes,
1227        )
1228    }
1229
1230    /// Open an existing or create a new database in the given `file`.
1231    ///
1232    /// The file must be empty or contain a valid database.
1233    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1234        Database::new(
1235            Box::new(FileBackend::new(file)?),
1236            true,
1237            self.page_size,
1238            self.region_size,
1239            self.read_cache_size_bytes,
1240            self.write_cache_size_bytes,
1241            &self.repair_callback,
1242        )
1243    }
1244
1245    /// Open an existing or create a new database with the given backend.
1246    pub fn create_with_backend(
1247        &self,
1248        backend: impl StorageBackend,
1249    ) -> Result<Database, DatabaseError> {
1250        Database::new(
1251            Box::new(backend),
1252            true,
1253            self.page_size,
1254            self.region_size,
1255            self.read_cache_size_bytes,
1256            self.write_cache_size_bytes,
1257            &self.repair_callback,
1258        )
1259    }
1260}
1261
1262impl std::fmt::Debug for Database {
1263    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1264        f.debug_struct("Database").finish()
1265    }
1266}
1267
1268#[cfg(test)]
1269mod test {
1270    use crate::backends::FileBackend;
1271    use crate::{
1272        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1273        StorageError, TableDefinition, TransactionError,
1274    };
1275    use std::fs::File;
1276    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1277    use std::sync::Arc;
1278    use std::sync::atomic::{AtomicU64, Ordering};
1279
1280    #[derive(Debug)]
1281    struct FailingBackend {
1282        inner: FileBackend,
1283        countdown: Arc<AtomicU64>,
1284    }
1285
1286    impl FailingBackend {
1287        fn new(backend: FileBackend, countdown: u64) -> Self {
1288            Self {
1289                inner: backend,
1290                countdown: Arc::new(AtomicU64::new(countdown)),
1291            }
1292        }
1293
1294        fn check_countdown(&self) -> Result<(), std::io::Error> {
1295            if self.countdown.load(Ordering::SeqCst) == 0 {
1296                return Err(std::io::Error::from(ErrorKind::Other));
1297            }
1298
1299            Ok(())
1300        }
1301
1302        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1303            if self
1304                .countdown
1305                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1306                    if x > 0 { Some(x - 1) } else { None }
1307                })
1308                .is_err()
1309            {
1310                return Err(std::io::Error::from(ErrorKind::Other));
1311            }
1312
1313            Ok(())
1314        }
1315    }
1316
1317    impl StorageBackend for FailingBackend {
1318        fn len(&self) -> Result<u64, std::io::Error> {
1319            self.inner.len()
1320        }
1321
1322        fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), std::io::Error> {
1323            self.check_countdown()?;
1324            self.inner.read(offset, out)
1325        }
1326
1327        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1328            self.inner.set_len(len)
1329        }
1330
1331        fn sync_data(&self) -> Result<(), std::io::Error> {
1332            self.check_countdown()?;
1333            self.inner.sync_data()
1334        }
1335
1336        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1337            self.decrement_countdown()?;
1338            self.inner.write(offset, data)
1339        }
1340    }
1341
1342    #[test]
1343    fn crash_regression4() {
1344        let tmpfile = crate::create_tempfile();
1345        let (file, path) = tmpfile.into_parts();
1346
1347        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
1348        let db = Database::builder()
1349            .set_cache_size(12686)
1350            .set_page_size(8 * 1024)
1351            .set_region_size(32 * 4096)
1352            .create_with_backend(backend)
1353            .unwrap();
1354
1355        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1356
1357        let tx = db.begin_write().unwrap();
1358        let _savepoint = tx.ephemeral_savepoint().unwrap();
1359        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1360        tx.commit().unwrap();
1361        let tx = db.begin_write().unwrap();
1362        {
1363            let mut table = tx.open_table(table_def).unwrap();
1364            let _ = table.insert_reserve(118821, 360).unwrap();
1365        }
1366        let result = tx.commit();
1367        assert!(result.is_err());
1368
1369        drop(db);
1370        Database::builder()
1371            .set_cache_size(1024 * 1024)
1372            .set_page_size(8 * 1024)
1373            .set_region_size(32 * 4096)
1374            .create(&path)
1375            .unwrap();
1376    }
1377
1378    #[test]
1379    fn transient_io_error() {
1380        let tmpfile = crate::create_tempfile();
1381        let (file, path) = tmpfile.into_parts();
1382
1383        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1384        let countdown = backend.countdown.clone();
1385        let db = Database::builder()
1386            .set_cache_size(0)
1387            .create_with_backend(backend)
1388            .unwrap();
1389
1390        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1391
1392        // Create some garbage
1393        let tx = db.begin_write().unwrap();
1394        {
1395            let mut table = tx.open_table(table_def).unwrap();
1396            table.insert(0, 0).unwrap();
1397        }
1398        tx.commit().unwrap();
1399        let tx = db.begin_write().unwrap();
1400        {
1401            let mut table = tx.open_table(table_def).unwrap();
1402            table.insert(0, 1).unwrap();
1403        }
1404        tx.commit().unwrap();
1405
1406        let tx = db.begin_write().unwrap();
1407        // Cause an error in the commit
1408        countdown.store(0, Ordering::SeqCst);
1409        let result = tx.commit().err().unwrap();
1410        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1411        let result = db.begin_write().err().unwrap();
1412        assert!(matches!(
1413            result,
1414            TransactionError::Storage(StorageError::PreviousIo)
1415        ));
1416        // Simulate a transient error
1417        countdown.store(u64::MAX, Ordering::SeqCst);
1418        drop(db);
1419
1420        // Check that recovery flag is set, even though the error has "cleared"
1421        let mut file = File::open(&path).unwrap();
1422        file.seek(SeekFrom::Start(9)).unwrap();
1423        let mut god_byte = vec![0u8];
1424        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1425        assert_ne!(god_byte[0] & 2, 0);
1426    }
1427
1428    #[test]
1429    fn small_pages() {
1430        let tmpfile = crate::create_tempfile();
1431
1432        let db = Database::builder()
1433            .set_page_size(512)
1434            .create(tmpfile.path())
1435            .unwrap();
1436
1437        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1438        let txn = db.begin_write().unwrap();
1439        {
1440            txn.open_table(table_definition).unwrap();
1441        }
1442        txn.commit().unwrap();
1443    }
1444
1445    #[test]
1446    fn small_pages2() {
1447        let tmpfile = crate::create_tempfile();
1448
1449        let db = Database::builder()
1450            .set_page_size(512)
1451            .create(tmpfile.path())
1452            .unwrap();
1453
1454        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1455
1456        let mut tx = db.begin_write().unwrap();
1457        tx.set_two_phase_commit(true);
1458        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1459        {
1460            tx.open_table(table_def).unwrap();
1461        }
1462        tx.commit().unwrap();
1463
1464        let mut tx = db.begin_write().unwrap();
1465        tx.set_two_phase_commit(true);
1466        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1467        tx.restore_savepoint(&savepoint0).unwrap();
1468        tx.set_durability(Durability::None).unwrap();
1469        {
1470            let mut t = tx.open_table(table_def).unwrap();
1471            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1472            assert!(t.remove(&291295).unwrap().is_none());
1473        }
1474        tx.commit().unwrap();
1475
1476        let mut tx = db.begin_write().unwrap();
1477        tx.set_two_phase_commit(true);
1478        tx.restore_savepoint(&savepoint0).unwrap();
1479        {
1480            tx.open_table(table_def).unwrap();
1481        }
1482        tx.commit().unwrap();
1483
1484        let mut tx = db.begin_write().unwrap();
1485        tx.set_two_phase_commit(true);
1486        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1487        drop(savepoint0);
1488        tx.restore_savepoint(&savepoint2).unwrap();
1489        {
1490            let mut t = tx.open_table(table_def).unwrap();
1491            assert!(t.get(&2059).unwrap().is_none());
1492            assert!(t.remove(&145227).unwrap().is_none());
1493            assert!(t.remove(&145227).unwrap().is_none());
1494        }
1495        tx.commit().unwrap();
1496
1497        let mut tx = db.begin_write().unwrap();
1498        tx.set_two_phase_commit(true);
1499        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1500        drop(savepoint1);
1501        tx.restore_savepoint(&savepoint3).unwrap();
1502        {
1503            tx.open_table(table_def).unwrap();
1504        }
1505        tx.commit().unwrap();
1506
1507        let mut tx = db.begin_write().unwrap();
1508        tx.set_two_phase_commit(true);
1509        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1510        drop(savepoint2);
1511        tx.restore_savepoint(&savepoint3).unwrap();
1512        tx.set_durability(Durability::None).unwrap();
1513        {
1514            let mut t = tx.open_table(table_def).unwrap();
1515            assert!(t.remove(&207936).unwrap().is_none());
1516        }
1517        tx.abort().unwrap();
1518
1519        let mut tx = db.begin_write().unwrap();
1520        tx.set_two_phase_commit(true);
1521        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1522        drop(savepoint3);
1523        assert!(tx.restore_savepoint(&savepoint4).is_err());
1524        {
1525            tx.open_table(table_def).unwrap();
1526        }
1527        tx.commit().unwrap();
1528
1529        let mut tx = db.begin_write().unwrap();
1530        tx.set_two_phase_commit(true);
1531        tx.restore_savepoint(&savepoint5).unwrap();
1532        tx.set_durability(Durability::None).unwrap();
1533        {
1534            tx.open_table(table_def).unwrap();
1535        }
1536        tx.commit().unwrap();
1537    }
1538
1539    #[test]
1540    fn small_pages3() {
1541        let tmpfile = crate::create_tempfile();
1542
1543        let db = Database::builder()
1544            .set_page_size(1024)
1545            .create(tmpfile.path())
1546            .unwrap();
1547
1548        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1549
1550        let mut tx = db.begin_write().unwrap();
1551        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1552        tx.set_durability(Durability::None).unwrap();
1553        {
1554            let mut t = tx.open_table(table_def).unwrap();
1555            let value = vec![0; 306];
1556            t.insert(&539717, value.as_slice()).unwrap();
1557        }
1558        tx.abort().unwrap();
1559
1560        let mut tx = db.begin_write().unwrap();
1561        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1562        tx.restore_savepoint(&savepoint1).unwrap();
1563        tx.set_durability(Durability::None).unwrap();
1564        {
1565            let mut t = tx.open_table(table_def).unwrap();
1566            let value = vec![0; 2008];
1567            t.insert(&784384, value.as_slice()).unwrap();
1568        }
1569        tx.abort().unwrap();
1570    }
1571
1572    #[test]
1573    fn small_pages4() {
1574        let tmpfile = crate::create_tempfile();
1575
1576        let db = Database::builder()
1577            .set_cache_size(1024 * 1024)
1578            .set_page_size(1024)
1579            .create(tmpfile.path())
1580            .unwrap();
1581
1582        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1583
1584        let tx = db.begin_write().unwrap();
1585        {
1586            tx.open_table(table_def).unwrap();
1587        }
1588        tx.commit().unwrap();
1589
1590        let tx = db.begin_write().unwrap();
1591        {
1592            let mut t = tx.open_table(table_def).unwrap();
1593            assert!(t.get(&131072).unwrap().is_none());
1594            let value = vec![0xFF; 1130];
1595            t.insert(&42394, value.as_slice()).unwrap();
1596            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1597            assert!(t.get(&0).unwrap().is_none());
1598        }
1599        tx.abort().unwrap();
1600
1601        let tx = db.begin_write().unwrap();
1602        {
1603            let mut t = tx.open_table(table_def).unwrap();
1604            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1605        }
1606        tx.abort().unwrap();
1607    }
1608
1609    #[test]
1610    fn dynamic_shrink() {
1611        let tmpfile = crate::create_tempfile();
1612        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1613        let big_value = vec![0u8; 1024];
1614
1615        let db = Database::builder()
1616            .set_region_size(1024 * 1024)
1617            .create(tmpfile.path())
1618            .unwrap();
1619
1620        let txn = db.begin_write().unwrap();
1621        {
1622            let mut table = txn.open_table(table_definition).unwrap();
1623            for i in 0..2048 {
1624                table.insert(&i, big_value.as_slice()).unwrap();
1625            }
1626        }
1627        txn.commit().unwrap();
1628
1629        let file_size = tmpfile.as_file().metadata().unwrap().len();
1630
1631        let txn = db.begin_write().unwrap();
1632        {
1633            let mut table = txn.open_table(table_definition).unwrap();
1634            for i in 0..2048 {
1635                table.remove(&i).unwrap();
1636            }
1637        }
1638        txn.commit().unwrap();
1639
1640        // Perform a couple more commits to be sure the database has a chance to compact
1641        let txn = db.begin_write().unwrap();
1642        {
1643            let mut table = txn.open_table(table_definition).unwrap();
1644            table.insert(0, [].as_slice()).unwrap();
1645        }
1646        txn.commit().unwrap();
1647        let txn = db.begin_write().unwrap();
1648        {
1649            let mut table = txn.open_table(table_definition).unwrap();
1650            table.remove(0).unwrap();
1651        }
1652        txn.commit().unwrap();
1653        let txn = db.begin_write().unwrap();
1654        txn.commit().unwrap();
1655
1656        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1657        assert!(final_file_size < file_size);
1658    }
1659
1660    #[test]
1661    fn create_new_db_in_empty_file() {
1662        let tmpfile = crate::create_tempfile();
1663
1664        let _db = Database::builder()
1665            .create_file(tmpfile.into_file())
1666            .unwrap();
1667    }
1668
1669    #[test]
1670    fn open_missing_file() {
1671        let tmpfile = crate::create_tempfile();
1672
1673        let err = Database::builder()
1674            .open(tmpfile.path().with_extension("missing"))
1675            .unwrap_err();
1676
1677        match err {
1678            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1679            err => panic!("Unexpected error for empty file: {err}"),
1680        }
1681    }
1682
1683    #[test]
1684    fn open_empty_file() {
1685        let tmpfile = crate::create_tempfile();
1686
1687        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1688
1689        match err {
1690            DatabaseError::Storage(StorageError::Io(err))
1691                if err.kind() == ErrorKind::InvalidData => {}
1692            err => panic!("Unexpected error for empty file: {err}"),
1693        }
1694    }
1695}