manifold/
db.rs

1use crate::transaction_tracker::{TransactionId, TransactionTracker};
2use crate::tree_store::{
3    BtreeHeader, InternalTableDefinition, PageHint, PageNumber, ReadOnlyBackend, ShrinkPolicy,
4    TableTree, TableType, TransactionalMemory, PAGE_SIZE,
5};
6use crate::types::{Key, Value};
7use crate::{
8    CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
9};
10use crate::{ReadTransaction, Result, WriteTransaction};
11use std::fmt::{Debug, Display, Formatter};
12
13use std::fs::{File, OpenOptions};
14use std::marker::PhantomData;
15use std::path::Path;
16use std::sync::Arc;
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22    AllocatorStateKey, AllocatorStateTree, PageList, SystemTableDefinition,
23    TransactionIdWithPagination, ALLOCATOR_STATE_TABLE_NAME, DATA_ALLOCATED_TABLE,
24    DATA_FREED_TABLE, SYSTEM_FREED_TABLE,
25};
26use crate::tree_store::file_backend::FileBackend;
27#[cfg(feature = "logging")]
28use log::{debug, info, warn};
29
30#[allow(clippy::len_without_is_empty)]
31/// Implements persistent storage for a database.
32pub trait StorageBackend: 'static + Debug + Send + Sync {
33    /// Gets the current length of the storage.
34    fn len(&self) -> std::result::Result<u64, io::Error>;
35
36    /// Reads the specified array of bytes from the storage.
37    ///
38    /// If `out.len()` + `offset` exceeds the length of the storage an appropriate `Error` must be returned.
39    fn read(&self, offset: u64, out: &mut [u8]) -> std::result::Result<(), io::Error>;
40
41    /// Sets the length of the storage.
42    ///
43    /// New positions in the storage must be initialized to zero.
44    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
45
46    /// Syncs all buffered data with the persistent storage.
47    fn sync_data(&self) -> std::result::Result<(), io::Error>;
48
49    /// Writes the specified array to the storage.
50    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
51
52    /// Release any resources held by the backend
53    ///
54    /// Note: redb will not access the backend after calling this method and will call it exactly
55    /// once when the [`Database`] is dropped
56    fn close(&self) -> std::result::Result<(), io::Error> {
57        Ok(())
58    }
59}
60
61pub trait TableHandle: Sealed {
62    // Returns the name of the table
63    fn name(&self) -> &str;
64}
65
66#[derive(Clone)]
67pub struct UntypedTableHandle {
68    name: String,
69}
70
71impl UntypedTableHandle {
72    pub(crate) fn new(name: String) -> Self {
73        Self { name }
74    }
75}
76
77impl TableHandle for UntypedTableHandle {
78    fn name(&self) -> &str {
79        &self.name
80    }
81}
82
83impl Sealed for UntypedTableHandle {}
84
85pub trait MultimapTableHandle: Sealed {
86    // Returns the name of the multimap table
87    fn name(&self) -> &str;
88}
89
90#[derive(Clone)]
91pub struct UntypedMultimapTableHandle {
92    name: String,
93}
94
95impl UntypedMultimapTableHandle {
96    pub(crate) fn new(name: String) -> Self {
97        Self { name }
98    }
99}
100
101impl MultimapTableHandle for UntypedMultimapTableHandle {
102    fn name(&self) -> &str {
103        &self.name
104    }
105}
106
107impl Sealed for UntypedMultimapTableHandle {}
108
109/// Defines the name and types of a table
110///
111/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
112///
113/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
114/// that is stored or retreived from the table
115pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
116    name: &'a str,
117    _key_type: PhantomData<K>,
118    _value_type: PhantomData<V>,
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
122    /// Construct a new table with given `name`
123    ///
124    /// ## Invariant
125    ///
126    /// `name` must not be empty.
127    pub const fn new(name: &'a str) -> Self {
128        assert!(!name.is_empty());
129        Self {
130            name,
131            _key_type: PhantomData,
132            _value_type: PhantomData,
133        }
134    }
135}
136
137impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
138    fn name(&self) -> &str {
139        self.name
140    }
141}
142
143impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
144
145impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
146    fn clone(&self) -> Self {
147        *self
148    }
149}
150
151impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
152
153impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        write!(
156            f,
157            "{}<{}, {}>",
158            self.name,
159            K::type_name().name(),
160            V::type_name().name()
161        )
162    }
163}
164
165/// Defines the name and types of a multimap table
166///
167/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
168///
169/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
170///
171/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
172/// that is stored or retreived from the table
173pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
174    name: &'a str,
175    _key_type: PhantomData<K>,
176    _value_type: PhantomData<V>,
177}
178
179impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
180    pub const fn new(name: &'a str) -> Self {
181        assert!(!name.is_empty());
182        Self {
183            name,
184            _key_type: PhantomData,
185            _value_type: PhantomData,
186        }
187    }
188}
189
190impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
191    fn name(&self) -> &str {
192        self.name
193    }
194}
195
196impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
197
198impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
199    fn clone(&self) -> Self {
200        *self
201    }
202}
203
204impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
205
206impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
207    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208        write!(
209            f,
210            "{}<{}, {}>",
211            self.name,
212            K::type_name().name(),
213            V::type_name().name()
214        )
215    }
216}
217
218/// Information regarding the usage of the in-memory cache
219///
220/// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
221#[derive(Debug)]
222pub struct CacheStats {
223    pub(crate) evictions: u64,
224    pub(crate) read_hits: u64,
225    pub(crate) read_misses: u64,
226    pub(crate) write_hits: u64,
227    pub(crate) write_misses: u64,
228    pub(crate) used_bytes: usize,
229}
230
231impl CacheStats {
232    /// Number of times that data has been evicted, due to the cache being full
233    ///
234    /// To increase the cache size use [`Builder::set_cache_size`]
235    pub fn evictions(&self) -> u64 {
236        self.evictions
237    }
238
239    /// Number of times that unmodified data has been read from the cache
240    pub fn read_hits(&self) -> u64 {
241        self.read_hits
242    }
243
244    /// Number of times that unmodified data was not in the cache and was read from storage
245    pub fn read_misses(&self) -> u64 {
246        self.read_misses
247    }
248
249    /// Number of times that data modified in a transaction has been read from the cache
250    pub fn write_hits(&self) -> u64 {
251        self.write_hits
252    }
253
254    /// Number of times that data modified in a transaction was not in the cache and was read from storage
255    pub fn write_misses(&self) -> u64 {
256        self.write_misses
257    }
258
259    /// Number of bytes in the cache
260    pub fn used_bytes(&self) -> usize {
261        self.used_bytes
262    }
263}
264
265pub(crate) struct TransactionGuard {
266    transaction_tracker: Option<Arc<TransactionTracker>>,
267    transaction_id: Option<TransactionId>,
268    write_transaction: bool,
269}
270
271impl TransactionGuard {
272    pub(crate) fn new_read(
273        transaction_id: TransactionId,
274        tracker: Arc<TransactionTracker>,
275    ) -> Self {
276        Self {
277            transaction_tracker: Some(tracker),
278            transaction_id: Some(transaction_id),
279            write_transaction: false,
280        }
281    }
282
283    pub(crate) fn new_write(
284        transaction_id: TransactionId,
285        tracker: Arc<TransactionTracker>,
286    ) -> Self {
287        Self {
288            transaction_tracker: Some(tracker),
289            transaction_id: Some(transaction_id),
290            write_transaction: true,
291        }
292    }
293
294    // TODO: remove this hack
295    pub(crate) fn fake() -> Self {
296        Self {
297            transaction_tracker: None,
298            transaction_id: None,
299            write_transaction: false,
300        }
301    }
302
303    pub(crate) fn id(&self) -> TransactionId {
304        self.transaction_id.unwrap()
305    }
306
307    pub(crate) fn leak(mut self) -> TransactionId {
308        self.transaction_id.take().unwrap()
309    }
310}
311
312impl Drop for TransactionGuard {
313    fn drop(&mut self) {
314        if self.transaction_tracker.is_none() {
315            return;
316        }
317        if let Some(transaction_id) = self.transaction_id {
318            if self.write_transaction {
319                self.transaction_tracker
320                    .as_ref()
321                    .unwrap()
322                    .end_write_transaction(transaction_id);
323            } else {
324                self.transaction_tracker
325                    .as_ref()
326                    .unwrap()
327                    .deallocate_read_transaction(transaction_id);
328            }
329        }
330    }
331}
332
333pub trait ReadableDatabase {
334    /// Begins a read transaction
335    ///
336    /// Captures a snapshot of the database, so that only data committed before calling this method
337    /// is visible in the transaction
338    ///
339    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
340    /// may exist concurrently with writes
341    fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
342
343    /// Information regarding the usage of the in-memory cache
344    ///
345    /// Note: these metrics are only collected when the "`cache_metrics`" feature is enabled
346    fn cache_stats(&self) -> CacheStats;
347}
348
349/// A redb database opened in read-only mode
350///
351/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
352///
353/// Multiple processes may open a [`ReadOnlyDatabase`], but it may not be opened concurrently
354/// with a [`Database`].
355///
356/// # Examples
357///
358/// Basic usage:
359///
360/// ```rust
361/// use manifold::*;
362/// # use tempfile::NamedTempFile;
363/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
364///
365/// # fn main() -> Result<(), Error> {
366/// # #[cfg(not(target_os = "wasi"))]
367/// # let tmpfile = NamedTempFile::new().unwrap();
368/// # #[cfg(target_os = "wasi")]
369/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
370/// # let filename = tmpfile.path();
371/// let db = Database::create(filename)?;
372/// let txn = db.begin_write()?;
373/// {
374///     let mut table = txn.open_table(TABLE)?;
375///     table.insert(&0, &0)?;
376/// }
377/// txn.commit()?;
378/// drop(db);
379///
380/// let db = ReadOnlyDatabase::open(filename)?;
381/// let txn = db.begin_read()?;
382/// {
383///     let mut table = txn.open_table(TABLE)?;
384///     println!("{}", table.get(&0)?.unwrap().value());
385/// }
386/// # Ok(())
387/// # }
388/// ```
389pub struct ReadOnlyDatabase {
390    mem: Arc<TransactionalMemory>,
391    transaction_tracker: Arc<TransactionTracker>,
392}
393
394impl ReadableDatabase for ReadOnlyDatabase {
395    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
396        let id = self
397            .transaction_tracker
398            .register_read_transaction(&self.mem)?;
399        #[cfg(feature = "logging")]
400        debug!("Beginning read transaction id={id:?}");
401
402        let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
403
404        ReadTransaction::new(self.mem.clone(), guard)
405    }
406
407    fn cache_stats(&self) -> CacheStats {
408        self.mem.cache_stats()
409    }
410}
411
412impl ReadOnlyDatabase {
413    /// Opens an existing redb database.
414    pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
415        Builder::new().open_read_only(path)
416    }
417
418    fn new(
419        file: Box<dyn StorageBackend>,
420        page_size: usize,
421        region_size: Option<u64>,
422        read_cache_size_bytes: usize,
423    ) -> Result<Self, DatabaseError> {
424        #[cfg(feature = "logging")]
425        let file_path = format!("{:?}", &file);
426        #[cfg(feature = "logging")]
427        info!("Opening database in read-only {:?}", &file_path);
428        let mem = TransactionalMemory::new(
429            Box::new(ReadOnlyBackend::new(file)),
430            false,
431            page_size,
432            region_size,
433            read_cache_size_bytes,
434            0,
435            true,
436        )?;
437        let mem = Arc::new(mem);
438        // If the last transaction used 2-phase commit and updated the allocator state table, then
439        // we can just load the allocator state from there. Otherwise, we need a full repair
440        if let Some(tree) = Database::get_allocator_state_table(&mem)? {
441            mem.load_allocator_state(&tree)?;
442        } else {
443            #[cfg(feature = "logging")]
444            warn!(
445                "Database {:?} not shutdown cleanly. Repair required",
446                &file_path
447            );
448            return Err(DatabaseError::RepairAborted);
449        }
450
451        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
452        let db = Self {
453            mem,
454            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
455        };
456
457        Ok(db)
458    }
459}
460
461/// Opened redb database file
462///
463/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
464/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
465///
466/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
467/// may be in progress at a time.
468///
469/// # Examples
470///
471/// Basic usage:
472///
473/// ```rust
474/// use manifold::*;
475/// # use tempfile::NamedTempFile;
476/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
477///
478/// # fn main() -> Result<(), Error> {
479/// # #[cfg(not(target_os = "wasi"))]
480/// # let tmpfile = NamedTempFile::new().unwrap();
481/// # #[cfg(target_os = "wasi")]
482/// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
483/// # let filename = tmpfile.path();
484/// let db = Database::create(filename)?;
485/// let write_txn = db.begin_write()?;
486/// {
487///     let mut table = write_txn.open_table(TABLE)?;
488///     table.insert(&0, &0)?;
489/// }
490/// write_txn.commit()?;
491/// # Ok(())
492/// # }
493/// ```
494pub struct Database {
495    mem: Arc<TransactionalMemory>,
496    transaction_tracker: Arc<TransactionTracker>,
497}
498
499impl ReadableDatabase for Database {
500    fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
501        let guard = self.allocate_read_transaction()?;
502        #[cfg(feature = "logging")]
503        debug!("Beginning read transaction id={:?}", guard.id());
504        ReadTransaction::new(self.get_memory(), guard)
505    }
506
507    fn cache_stats(&self) -> CacheStats {
508        self.mem.cache_stats()
509    }
510}
511
512impl Database {
513    /// Opens the specified file as a redb database.
514    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
515    /// * if the file is a valid redb database, it will be opened
516    /// * otherwise this function will return an error
517    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
518        Self::builder().create(path)
519    }
520
521    /// Opens an existing redb database.
522    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
523        Self::builder().open(path)
524    }
525
526    pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
527        self.mem.clone()
528    }
529
530    pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
531        let table_tree = TableTree::new(
532            mem.get_data_root(),
533            PageHint::None,
534            Arc::new(TransactionGuard::fake()),
535            mem.clone(),
536        )?;
537        if !table_tree.verify_checksums()? {
538            return Ok(false);
539        }
540        let system_table_tree = TableTree::new(
541            mem.get_system_root(),
542            PageHint::None,
543            Arc::new(TransactionGuard::fake()),
544            mem.clone(),
545        )?;
546        if !system_table_tree.verify_checksums()? {
547            return Ok(false);
548        }
549
550        Ok(true)
551    }
552
553    /// Force a check of the integrity of the database file, and repair it if possible.
554    ///
555    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
556    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
557    /// quite slow and should only be used when you suspect the database file may have been modified
558    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
559    ///
560    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
561    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
562    pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
563        let allocator_hash = self.mem.allocator_hash();
564        let mut was_clean = Arc::get_mut(&mut self.mem)
565            .unwrap()
566            .clear_cache_and_reload()?;
567
568        let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
569
570        let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
571            DatabaseError::Storage(storage_err) => storage_err,
572            _ => unreachable!(),
573        })?;
574
575        if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
576            was_clean = false;
577        }
578
579        if !was_clean {
580            let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
581            let [data_root, system_root] = new_roots;
582            self.mem.commit(
583                data_root,
584                system_root,
585                next_transaction_id,
586                true,
587                ShrinkPolicy::Never,
588            )?;
589        }
590
591        self.mem.begin_writable()?;
592
593        Ok(was_clean)
594    }
595
596    /// Compacts the database file
597    ///
598    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
599    pub fn compact(&mut self) -> Result<bool, CompactionError> {
600        if self
601            .transaction_tracker
602            .oldest_live_read_transaction()
603            .is_some()
604        {
605            return Err(CompactionError::TransactionInProgress);
606        }
607        // Commit to free up any pending free pages
608        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter.
609        // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user
610        // can cancel the compaction without requiring a full repair afterwards
611        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
612        if txn.list_persistent_savepoints()?.next().is_some() {
613            return Err(CompactionError::PersistentSavepointExists);
614        }
615        if self.transaction_tracker.any_savepoint_exists() {
616            return Err(CompactionError::EphemeralSavepointExists);
617        }
618        txn.set_two_phase_commit(true);
619        txn.commit().map_err(|e| e.into_storage_error())?;
620        // Repeat, just in case executing list_persistent_savepoints() created a new table
621        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
622        txn.set_two_phase_commit(true);
623        txn.commit().map_err(|e| e.into_storage_error())?;
624        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
625        // should have been cleared out by the above commit()
626        let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
627        assert!(!txn.pending_free_pages()?);
628        txn.abort()?;
629
630        let mut compacted = false;
631        // Iteratively compact until no progress is made
632        loop {
633            let mut progress = false;
634
635            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
636            if txn.compact_pages()? {
637                progress = true;
638                txn.commit().map_err(|e| e.into_storage_error())?;
639            } else {
640                txn.abort()?;
641            }
642
643            // Double commit to free up the relocated pages for reuse
644            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
645            txn.set_two_phase_commit(true);
646            // Also shrink the database file by the maximum amount
647            txn.set_shrink_policy(ShrinkPolicy::Maximum);
648            txn.commit().map_err(|e| e.into_storage_error())?;
649            // Triple commit to free up the relocated pages for reuse
650            // TODO: this really shouldn't be necessary, but the data freed tree is a system table
651            // and so free'ing up its pages causes more deletes from the system tree
652            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
653            txn.set_two_phase_commit(true);
654            // Also shrink the database file by the maximum amount
655            txn.set_shrink_policy(ShrinkPolicy::Maximum);
656            txn.commit().map_err(|e| e.into_storage_error())?;
657            let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
658            assert!(!txn.pending_free_pages()?);
659            txn.abort()?;
660
661            if !progress {
662                break;
663            }
664
665            compacted = true;
666        }
667
668        Ok(compacted)
669    }
670
671    #[cfg_attr(not(debug_assertions), expect(dead_code))]
672    fn check_repaired_allocated_pages_table(
673        system_root: Option<BtreeHeader>,
674        mem: Arc<TransactionalMemory>,
675    ) -> Result {
676        let table_tree = TableTree::new(
677            system_root,
678            PageHint::None,
679            Arc::new(TransactionGuard::fake()),
680            mem.clone(),
681        )?;
682        if let Some(table_def) = table_tree
683            .get_table::<TransactionIdWithPagination, PageList>(
684                DATA_ALLOCATED_TABLE.name(),
685                TableType::Normal,
686            )
687            .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
688        {
689            let InternalTableDefinition::Normal { table_root, .. } = table_def else {
690                unreachable!()
691            };
692            let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
693                DATA_ALLOCATED_TABLE.name().to_string(),
694                table_root,
695                PageHint::None,
696                Arc::new(TransactionGuard::fake()),
697                mem.clone(),
698            )?;
699            for result in table.range::<TransactionIdWithPagination>(..)? {
700                let (_, pages) = result?;
701                for i in 0..pages.value().len() {
702                    assert!(mem.is_allocated(pages.value().get(i)));
703                }
704            }
705        }
706
707        Ok(())
708    }
709
710    fn visit_freed_tree<K: Key, V: Value, F>(
711        system_root: Option<BtreeHeader>,
712        table_def: SystemTableDefinition<K, V>,
713        mem: Arc<TransactionalMemory>,
714        mut visitor: F,
715    ) -> Result
716    where
717        F: FnMut(PageNumber) -> Result,
718    {
719        let fake_guard = Arc::new(TransactionGuard::fake());
720        let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
721        let table_name = table_def.name();
722        let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
723            Ok(result) => result,
724            Err(TableError::Storage(err)) => {
725                return Err(err);
726            }
727            Err(TableError::TableDoesNotExist(_)) => {
728                return Ok(());
729            }
730            Err(_) => {
731                return Err(StorageError::Corrupted(format!(
732                    "Unable to open {table_name}"
733                )));
734            }
735        };
736
737        if let Some(definition) = result {
738            let table_root = match definition {
739                InternalTableDefinition::Normal { table_root, .. } => table_root,
740                InternalTableDefinition::Multimap { .. } => unreachable!(),
741            };
742            let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
743                ReadOnlyTable::new(
744                    table_name.to_string(),
745                    table_root,
746                    PageHint::None,
747                    Arc::new(TransactionGuard::fake()),
748                    mem.clone(),
749                )?;
750            for result in table.range::<TransactionIdWithPagination>(..)? {
751                let (_, page_list) = result?;
752                for i in 0..page_list.value().len() {
753                    visitor(page_list.value().get(i))?;
754                }
755            }
756        }
757
758        Ok(())
759    }
760
761    #[cfg(debug_assertions)]
762    fn mark_allocated_page_for_debug(
763        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
764    ) -> Result {
765        let data_root = mem.get_data_root();
766        {
767            eprintln!("[MARK_DEBUG] Visiting data tree, root: {data_root:?}");
768            let fake = Arc::new(TransactionGuard::fake());
769            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
770            tables.visit_all_pages(|path| {
771                eprintln!(
772                    "[MARK_DEBUG] Data tree: marking page {:?}",
773                    path.page_number()
774                );
775                mem.mark_debug_allocated_page(path.page_number());
776                Ok(())
777            })?;
778        }
779
780        let system_root = mem.get_system_root();
781        {
782            eprintln!("[MARK_DEBUG] Visiting system tree, root: {system_root:?}");
783            let fake = Arc::new(TransactionGuard::fake());
784            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
785            system_tables.visit_all_pages(|path| {
786                eprintln!(
787                    "[MARK_DEBUG] System tree: marking page {:?}",
788                    path.page_number()
789                );
790                mem.mark_debug_allocated_page(path.page_number());
791                Ok(())
792            })?;
793        }
794
795        eprintln!("[MARK_DEBUG] Visiting DATA_FREED_TABLE");
796        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
797            eprintln!("[MARK_DEBUG] DATA_FREED_TABLE: marking page {page:?}");
798            mem.mark_debug_allocated_page(page);
799            Ok(())
800        })?;
801        eprintln!("[MARK_DEBUG] Visiting SYSTEM_FREED_TABLE");
802        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
803            eprintln!("[MARK_DEBUG] SYSTEM_FREED_TABLE: marking page {page:?}");
804            mem.mark_debug_allocated_page(page);
805            Ok(())
806        })?;
807
808        Ok(())
809    }
810
811    fn do_repair(
812        mem: &mut Arc<TransactionalMemory>, // Only &mut to ensure exclusivity
813        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
814    ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
815        if !Self::verify_primary_checksums(mem.clone())? {
816            if mem.used_two_phase_commit() {
817                return Err(DatabaseError::Storage(StorageError::Corrupted(
818                    "Primary is corrupted despite 2-phase commit".to_string(),
819                )));
820            }
821
822            // 0.3 because the repair takes 3 full scans and the first is done now
823            let mut handle = RepairSession::new(0.3);
824            repair_callback(&mut handle);
825            if handle.aborted() {
826                return Err(DatabaseError::RepairAborted);
827            }
828
829            mem.repair_primary_corrupted();
830            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
831            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
832            // that rolls back a partially committed transaction.
833            mem.clear_read_cache();
834            if !Self::verify_primary_checksums(mem.clone())? {
835                return Err(DatabaseError::Storage(StorageError::Corrupted(
836                    "Failed to repair database. All roots are corrupted".to_string(),
837                )));
838            }
839        }
840        // 0.6 because the repair takes 3 full scans and the second is done now
841        let mut handle = RepairSession::new(0.6);
842        repair_callback(&mut handle);
843        if handle.aborted() {
844            return Err(DatabaseError::RepairAborted);
845        }
846
847        mem.begin_repair()?;
848
849        let data_root = mem.get_data_root();
850        {
851            let fake = Arc::new(TransactionGuard::fake());
852            let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
853            tables.visit_all_pages(|path| {
854                mem.mark_page_allocated(path.page_number());
855                Ok(())
856            })?;
857        }
858
859        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
860        let mut handle = RepairSession::new(0.9);
861        repair_callback(&mut handle);
862        if handle.aborted() {
863            return Err(DatabaseError::RepairAborted);
864        }
865
866        let system_root = mem.get_system_root();
867        {
868            let fake = Arc::new(TransactionGuard::fake());
869            let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
870            system_tables.visit_all_pages(|path| {
871                mem.mark_page_allocated(path.page_number());
872                Ok(())
873            })?;
874        }
875
876        Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
877            mem.mark_page_allocated(page);
878            Ok(())
879        })?;
880        Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
881            mem.mark_page_allocated(page);
882            Ok(())
883        })?;
884        #[cfg(debug_assertions)]
885        {
886            Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
887        }
888
889        mem.end_repair()?;
890
891        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
892        // by storing an empty root during the below commit()
893        mem.clear_read_cache();
894
895        Ok([data_root, system_root])
896    }
897
898    #[allow(clippy::too_many_arguments)]
899    fn new(
900        file: Box<dyn StorageBackend>,
901        allow_initialize: bool,
902        page_size: usize,
903        region_size: Option<u64>,
904        read_cache_size_bytes: usize,
905        write_cache_size_bytes: usize,
906        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
907    ) -> Result<Self, DatabaseError> {
908        #[cfg(feature = "logging")]
909        let file_path = format!("{:?}", &file);
910        #[cfg(feature = "logging")]
911        info!("Opening database {:?}", &file_path);
912        let mem = TransactionalMemory::new(
913            file,
914            allow_initialize,
915            page_size,
916            region_size,
917            read_cache_size_bytes,
918            write_cache_size_bytes,
919            false,
920        )?;
921        let mut mem = Arc::new(mem);
922        // If the last transaction used 2-phase commit and updated the allocator state table, then
923        // we can just load the allocator state from there. Otherwise, we need a full repair
924        if let Some(tree) = Self::get_allocator_state_table(&mem)? {
925            #[cfg(feature = "logging")]
926            info!("Found valid allocator state, full repair not needed");
927            mem.load_allocator_state(&tree)?;
928            #[cfg(debug_assertions)]
929            Self::mark_allocated_page_for_debug(&mut mem)?;
930        } else {
931            #[cfg(feature = "logging")]
932            warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
933            let mut handle = RepairSession::new(0.0);
934            repair_callback(&mut handle);
935            if handle.aborted() {
936                return Err(DatabaseError::RepairAborted);
937            }
938            let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
939            let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
940            mem.commit(
941                data_root,
942                system_root,
943                next_transaction_id,
944                true,
945                ShrinkPolicy::Never,
946            )?;
947        }
948
949        mem.begin_writable()?;
950        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
951
952        let db = Database {
953            mem,
954            transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
955        };
956
957        // Restore the tracker state for any persistent savepoints
958        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
959        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
960            db.transaction_tracker
961                .restore_savepoint_counter_state(next_id);
962        }
963        for id in txn.list_persistent_savepoints()? {
964            let savepoint = match txn.get_persistent_savepoint(id) {
965                Ok(savepoint) => savepoint,
966                Err(err) => match err {
967                    SavepointError::InvalidSavepoint => unreachable!(),
968                    SavepointError::Storage(storage) => {
969                        return Err(storage.into());
970                    }
971                },
972            };
973            db.transaction_tracker
974                .register_persistent_savepoint(&savepoint);
975        }
976        txn.abort()?;
977
978        Ok(db)
979    }
980
981    pub(crate) fn get_allocator_state_table(
982        mem: &Arc<TransactionalMemory>,
983    ) -> Result<Option<AllocatorStateTree>> {
984        // The allocator state table is only valid if the primary was written using 2-phase commit
985        if !mem.used_two_phase_commit() {
986            return Ok(None);
987        }
988
989        // See if it's present in the system table tree
990        let system_table_tree = TableTree::new(
991            mem.get_system_root(),
992            PageHint::None,
993            Arc::new(TransactionGuard::fake()),
994            mem.clone(),
995        )?;
996        let Some(allocator_state_table) = system_table_tree
997            .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
998            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
999        else {
1000            return Ok(None);
1001        };
1002
1003        // Load the allocator state table
1004        let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
1005            unreachable!();
1006        };
1007        let tree = AllocatorStateTree::new(
1008            table_root,
1009            PageHint::None,
1010            Arc::new(TransactionGuard::fake()),
1011            mem.clone(),
1012        )?;
1013
1014        // Make sure this isn't stale allocator state left over from a previous transaction
1015        if !mem.is_valid_allocator_state(&tree)? {
1016            return Ok(None);
1017        }
1018
1019        Ok(Some(tree))
1020    }
1021
1022    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1023        let id = self
1024            .transaction_tracker
1025            .register_read_transaction(&self.mem)?;
1026
1027        Ok(TransactionGuard::new_read(
1028            id,
1029            self.transaction_tracker.clone(),
1030        ))
1031    }
1032
1033    /// Convenience method for [`Builder::new`]
1034    pub fn builder() -> Builder {
1035        Builder::new()
1036    }
1037
1038    /// Begins a write transaction
1039    ///
1040    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
1041    /// write may be in progress at a time. If a write is in progress, this function will block
1042    /// until it completes.
1043    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1044        // Fail early if there has been an I/O error -- nothing can be committed in that case
1045        self.mem.check_io_errors()?;
1046        let guard = TransactionGuard::new_write(
1047            self.transaction_tracker.start_write_transaction(),
1048            self.transaction_tracker.clone(),
1049        );
1050        WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1051            .map_err(|e| e.into())
1052    }
1053
1054    fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
1055        // Make a new quick-repair commit to update the allocator state table
1056        #[cfg(feature = "logging")]
1057        debug!("Writing allocator state table");
1058        eprintln!("[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: begin_write");
1059        let mut tx = self.begin_write()?;
1060        eprintln!(
1061            "[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: setting quick_repair"
1062        );
1063        tx.set_quick_repair(true);
1064        tx.set_shrink_policy(ShrinkPolicy::Maximum);
1065        eprintln!("[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: about to commit");
1066        tx.commit()?;
1067        eprintln!("[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: commit done");
1068
1069        Ok(())
1070    }
1071}
1072
1073impl Drop for Database {
1074    fn drop(&mut self) {
1075        eprintln!("[DATABASE_DROP_DEBUG] Database::drop called for instance {self:p}");
1076
1077        if !thread::panicking() && self.ensure_allocator_state_table_and_trim().is_err() {
1078            #[cfg(feature = "logging")]
1079            warn!("Failed to write allocator state table. Repair may be required at restart.");
1080        }
1081
1082        eprintln!("[DATABASE_DROP_DEBUG] About to call mem.close()");
1083        if self.mem.close().is_err() {
1084            #[cfg(feature = "logging")]
1085            warn!("Failed to flush database file. Repair may be required at restart.");
1086        }
1087        eprintln!("[DATABASE_DROP_DEBUG] Database::drop completed");
1088    }
1089}
1090
1091pub struct RepairSession {
1092    progress: f64,
1093    aborted: bool,
1094}
1095
1096impl RepairSession {
1097    pub(crate) fn new(progress: f64) -> Self {
1098        Self {
1099            progress,
1100            aborted: false,
1101        }
1102    }
1103
1104    pub(crate) fn aborted(&self) -> bool {
1105        self.aborted
1106    }
1107
1108    /// Abort the repair process. The coorresponding call to [`Builder::open`] or [`Builder::create`] will return an error
1109    pub fn abort(&mut self) {
1110        self.aborted = true;
1111    }
1112
1113    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
1114    pub fn progress(&self) -> f64 {
1115        self.progress
1116    }
1117}
1118
1119/// Configuration builder of a redb [Database].
1120pub struct Builder {
1121    page_size: usize,
1122    region_size: Option<u64>,
1123    read_cache_size_bytes: usize,
1124    write_cache_size_bytes: usize,
1125    repair_callback: Box<dyn Fn(&mut RepairSession)>,
1126}
1127
1128impl Builder {
1129    /// Construct a new [Builder] with sensible defaults.
1130    ///
1131    /// ## Defaults
1132    ///
1133    /// - `cache_size_bytes`: 1GiB
1134    #[allow(clippy::new_without_default)]
1135    pub fn new() -> Self {
1136        let mut result = Self {
1137            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
1138            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
1139            // It is part of the file format, so can be enabled in the future.
1140            page_size: PAGE_SIZE,
1141            region_size: None,
1142            // TODO: Default should probably take into account the total system memory
1143            read_cache_size_bytes: 0,
1144            // TODO: Default should probably take into account the total system memory
1145            write_cache_size_bytes: 0,
1146            repair_callback: Box::new(|_| {}),
1147        };
1148
1149        result.set_cache_size(1024 * 1024 * 1024);
1150        result
1151    }
1152
1153    /// Set a callback which will be invoked periodically in the event that the database file needs
1154    /// to be repaired.
1155    ///
1156    /// The [`RepairSession`] argument can be used to control the repair process.
1157    ///
1158    /// If the database file needs repair, the callback will be invoked at least once.
1159    /// There is no upper limit on the number of times it may be called.
1160    pub fn set_repair_callback(
1161        &mut self,
1162        callback: impl Fn(&mut RepairSession) + 'static,
1163    ) -> &mut Self {
1164        self.repair_callback = Box::new(callback);
1165        self
1166    }
1167
1168    /// Set the internal page size of the database
1169    ///
1170    /// Valid values are powers of two, greater than or equal to 512
1171    ///
1172    /// ## Defaults
1173    ///
1174    /// Default to 4 Kib pages.
1175    #[cfg(any(fuzzing, test))]
1176    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1177        assert!(size.is_power_of_two());
1178        self.page_size = std::cmp::max(size, 512);
1179        self
1180    }
1181
1182    /// Set the amount of memory (in bytes) used for caching data
1183    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1184        // TODO: allow dynamic expansion of the read/write cache
1185        self.read_cache_size_bytes = bytes / 10 * 9;
1186        self.write_cache_size_bytes = bytes / 10;
1187        self
1188    }
1189
1190    #[cfg(any(test, fuzzing))]
1191    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1192        assert!(size.is_power_of_two());
1193        self.region_size = Some(size);
1194        self
1195    }
1196
1197    /// Opens the specified file as a redb database.
1198    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
1199    /// * if the file is a valid redb database, it will be opened
1200    /// * otherwise this function will return an error
1201    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1202        let file = OpenOptions::new()
1203            .read(true)
1204            .write(true)
1205            .create(true)
1206            .truncate(false)
1207            .open(path)?;
1208
1209        Database::new(
1210            Box::new(FileBackend::new(file)?),
1211            true,
1212            self.page_size,
1213            self.region_size,
1214            self.read_cache_size_bytes,
1215            self.write_cache_size_bytes,
1216            &self.repair_callback,
1217        )
1218    }
1219
1220    /// Opens an existing redb database.
1221    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1222        let file = OpenOptions::new().read(true).write(true).open(path)?;
1223
1224        Database::new(
1225            Box::new(FileBackend::new(file)?),
1226            false,
1227            self.page_size,
1228            None,
1229            self.read_cache_size_bytes,
1230            self.write_cache_size_bytes,
1231            &self.repair_callback,
1232        )
1233    }
1234
1235    /// Opens an existing redb database.
1236    ///
1237    /// If the file has been opened for writing (i.e. as a [`Database`]) [`DatabaseError::DatabaseAlreadyOpen`]
1238    /// will be returned on platforms which support file locks (macOS, Windows, Linux). On other platforms,
1239    /// the caller MUST avoid calling this method when the database is open for writing.
1240    pub fn open_read_only(
1241        &self,
1242        path: impl AsRef<Path>,
1243    ) -> Result<ReadOnlyDatabase, DatabaseError> {
1244        let file = OpenOptions::new().read(true).open(path)?;
1245
1246        ReadOnlyDatabase::new(
1247            Box::new(FileBackend::new_internal(file, true)?),
1248            self.page_size,
1249            None,
1250            self.read_cache_size_bytes,
1251        )
1252    }
1253
1254    /// Open an existing or create a new database in the given `file`.
1255    ///
1256    /// The file must be empty or contain a valid database.
1257    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1258        Database::new(
1259            Box::new(FileBackend::new(file)?),
1260            true,
1261            self.page_size,
1262            self.region_size,
1263            self.read_cache_size_bytes,
1264            self.write_cache_size_bytes,
1265            &self.repair_callback,
1266        )
1267    }
1268
1269    /// Open an existing or create a new database with the given backend.
1270    pub fn create_with_backend(
1271        &self,
1272        backend: impl StorageBackend,
1273    ) -> Result<Database, DatabaseError> {
1274        Database::new(
1275            Box::new(backend),
1276            true,
1277            self.page_size,
1278            self.region_size,
1279            self.read_cache_size_bytes,
1280            self.write_cache_size_bytes,
1281            &self.repair_callback,
1282        )
1283    }
1284}
1285
1286impl std::fmt::Debug for Database {
1287    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1288        f.debug_struct("Database").finish()
1289    }
1290}
1291
1292#[cfg(test)]
1293mod test {
1294    use crate::backends::FileBackend;
1295    use crate::{
1296        CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1297        StorageError, TableDefinition, TransactionError,
1298    };
1299    use std::fs::File;
1300    use std::io::{ErrorKind, Read, Seek, SeekFrom};
1301    use std::sync::atomic::{AtomicU64, Ordering};
1302    use std::sync::Arc;
1303
1304    #[derive(Debug)]
1305    struct FailingBackend {
1306        inner: FileBackend,
1307        countdown: Arc<AtomicU64>,
1308    }
1309
1310    impl FailingBackend {
1311        fn new(backend: FileBackend, countdown: u64) -> Self {
1312            Self {
1313                inner: backend,
1314                countdown: Arc::new(AtomicU64::new(countdown)),
1315            }
1316        }
1317
1318        fn check_countdown(&self) -> Result<(), std::io::Error> {
1319            if self.countdown.load(Ordering::SeqCst) == 0 {
1320                return Err(std::io::Error::from(ErrorKind::Other));
1321            }
1322
1323            Ok(())
1324        }
1325
1326        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1327            if self
1328                .countdown
1329                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1330                    if x > 0 {
1331                        Some(x - 1)
1332                    } else {
1333                        None
1334                    }
1335                })
1336                .is_err()
1337            {
1338                return Err(std::io::Error::from(ErrorKind::Other));
1339            }
1340
1341            Ok(())
1342        }
1343    }
1344
1345    impl StorageBackend for FailingBackend {
1346        fn len(&self) -> Result<u64, std::io::Error> {
1347            self.inner.len()
1348        }
1349
1350        fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), std::io::Error> {
1351            self.check_countdown()?;
1352            self.inner.read(offset, out)
1353        }
1354
1355        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1356            self.inner.set_len(len)
1357        }
1358
1359        fn sync_data(&self) -> Result<(), std::io::Error> {
1360            self.check_countdown()?;
1361            self.inner.sync_data()
1362        }
1363
1364        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1365            self.decrement_countdown()?;
1366            self.inner.write(offset, data)
1367        }
1368    }
1369
1370    #[test]
1371    fn crash_regression4() {
1372        let tmpfile = crate::create_tempfile();
1373        let (file, path) = tmpfile.into_parts();
1374
1375        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
1376        let db = Database::builder()
1377            .set_cache_size(12686)
1378            .set_page_size(8 * 1024)
1379            .set_region_size(32 * 4096)
1380            .create_with_backend(backend)
1381            .unwrap();
1382
1383        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1384
1385        let tx = db.begin_write().unwrap();
1386        let _savepoint = tx.ephemeral_savepoint().unwrap();
1387        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1388        tx.commit().unwrap();
1389        let tx = db.begin_write().unwrap();
1390        {
1391            let mut table = tx.open_table(table_def).unwrap();
1392            let _ = table.insert_reserve(118821, 360).unwrap();
1393        }
1394        let result = tx.commit();
1395        assert!(result.is_err());
1396
1397        drop(db);
1398        Database::builder()
1399            .set_cache_size(1024 * 1024)
1400            .set_page_size(8 * 1024)
1401            .set_region_size(32 * 4096)
1402            .create(&path)
1403            .unwrap();
1404    }
1405
1406    #[test]
1407    fn transient_io_error() {
1408        let tmpfile = crate::create_tempfile();
1409        let (file, path) = tmpfile.into_parts();
1410
1411        let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1412        let countdown = backend.countdown.clone();
1413        let db = Database::builder()
1414            .set_cache_size(0)
1415            .create_with_backend(backend)
1416            .unwrap();
1417
1418        let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1419
1420        // Create some garbage
1421        let tx = db.begin_write().unwrap();
1422        {
1423            let mut table = tx.open_table(table_def).unwrap();
1424            table.insert(0, 0).unwrap();
1425        }
1426        tx.commit().unwrap();
1427        let tx = db.begin_write().unwrap();
1428        {
1429            let mut table = tx.open_table(table_def).unwrap();
1430            table.insert(0, 1).unwrap();
1431        }
1432        tx.commit().unwrap();
1433
1434        let tx = db.begin_write().unwrap();
1435        // Cause an error in the commit
1436        countdown.store(0, Ordering::SeqCst);
1437        let result = tx.commit().err().unwrap();
1438        assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1439        let result = db.begin_write().err().unwrap();
1440        assert!(matches!(
1441            result,
1442            TransactionError::Storage(StorageError::PreviousIo)
1443        ));
1444        // Simulate a transient error
1445        countdown.store(u64::MAX, Ordering::SeqCst);
1446        drop(db);
1447
1448        // Check that recovery flag is set, even though the error has "cleared"
1449        let mut file = File::open(&path).unwrap();
1450        file.seek(SeekFrom::Start(9)).unwrap();
1451        let mut god_byte = vec![0u8];
1452        assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1453        assert_ne!(god_byte[0] & 2, 0);
1454    }
1455
1456    #[test]
1457    fn small_pages() {
1458        let tmpfile = crate::create_tempfile();
1459
1460        let db = Database::builder()
1461            .set_page_size(512)
1462            .create(tmpfile.path())
1463            .unwrap();
1464
1465        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1466        let txn = db.begin_write().unwrap();
1467        {
1468            txn.open_table(table_definition).unwrap();
1469        }
1470        txn.commit().unwrap();
1471    }
1472
1473    #[test]
1474    fn small_pages2() {
1475        let tmpfile = crate::create_tempfile();
1476
1477        let db = Database::builder()
1478            .set_page_size(512)
1479            .create(tmpfile.path())
1480            .unwrap();
1481
1482        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1483
1484        let mut tx = db.begin_write().unwrap();
1485        tx.set_two_phase_commit(true);
1486        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1487        {
1488            tx.open_table(table_def).unwrap();
1489        }
1490        tx.commit().unwrap();
1491
1492        let mut tx = db.begin_write().unwrap();
1493        tx.set_two_phase_commit(true);
1494        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1495        tx.restore_savepoint(&savepoint0).unwrap();
1496        tx.set_durability(Durability::None).unwrap();
1497        {
1498            let mut t = tx.open_table(table_def).unwrap();
1499            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1500            assert!(t.remove(&291295).unwrap().is_none());
1501        }
1502        tx.commit().unwrap();
1503
1504        let mut tx = db.begin_write().unwrap();
1505        tx.set_two_phase_commit(true);
1506        tx.restore_savepoint(&savepoint0).unwrap();
1507        {
1508            tx.open_table(table_def).unwrap();
1509        }
1510        tx.commit().unwrap();
1511
1512        let mut tx = db.begin_write().unwrap();
1513        tx.set_two_phase_commit(true);
1514        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1515        drop(savepoint0);
1516        tx.restore_savepoint(&savepoint2).unwrap();
1517        {
1518            let mut t = tx.open_table(table_def).unwrap();
1519            assert!(t.get(&2059).unwrap().is_none());
1520            assert!(t.remove(&145227).unwrap().is_none());
1521            assert!(t.remove(&145227).unwrap().is_none());
1522        }
1523        tx.commit().unwrap();
1524
1525        let mut tx = db.begin_write().unwrap();
1526        tx.set_two_phase_commit(true);
1527        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1528        drop(savepoint1);
1529        tx.restore_savepoint(&savepoint3).unwrap();
1530        {
1531            tx.open_table(table_def).unwrap();
1532        }
1533        tx.commit().unwrap();
1534
1535        let mut tx = db.begin_write().unwrap();
1536        tx.set_two_phase_commit(true);
1537        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1538        drop(savepoint2);
1539        tx.restore_savepoint(&savepoint3).unwrap();
1540        tx.set_durability(Durability::None).unwrap();
1541        {
1542            let mut t = tx.open_table(table_def).unwrap();
1543            assert!(t.remove(&207936).unwrap().is_none());
1544        }
1545        tx.abort().unwrap();
1546
1547        let mut tx = db.begin_write().unwrap();
1548        tx.set_two_phase_commit(true);
1549        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1550        drop(savepoint3);
1551        assert!(tx.restore_savepoint(&savepoint4).is_err());
1552        {
1553            tx.open_table(table_def).unwrap();
1554        }
1555        tx.commit().unwrap();
1556
1557        let mut tx = db.begin_write().unwrap();
1558        tx.set_two_phase_commit(true);
1559        tx.restore_savepoint(&savepoint5).unwrap();
1560        tx.set_durability(Durability::None).unwrap();
1561        {
1562            tx.open_table(table_def).unwrap();
1563        }
1564        tx.commit().unwrap();
1565    }
1566
1567    #[test]
1568    fn small_pages3() {
1569        let tmpfile = crate::create_tempfile();
1570
1571        let db = Database::builder()
1572            .set_page_size(1024)
1573            .create(tmpfile.path())
1574            .unwrap();
1575
1576        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1577
1578        let mut tx = db.begin_write().unwrap();
1579        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1580        tx.set_durability(Durability::None).unwrap();
1581        {
1582            let mut t = tx.open_table(table_def).unwrap();
1583            let value = vec![0; 306];
1584            t.insert(&539717, value.as_slice()).unwrap();
1585        }
1586        tx.abort().unwrap();
1587
1588        let mut tx = db.begin_write().unwrap();
1589        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1590        tx.restore_savepoint(&savepoint1).unwrap();
1591        tx.set_durability(Durability::None).unwrap();
1592        {
1593            let mut t = tx.open_table(table_def).unwrap();
1594            let value = vec![0; 2008];
1595            t.insert(&784384, value.as_slice()).unwrap();
1596        }
1597        tx.abort().unwrap();
1598    }
1599
1600    #[test]
1601    fn small_pages4() {
1602        let tmpfile = crate::create_tempfile();
1603
1604        let db = Database::builder()
1605            .set_cache_size(1024 * 1024)
1606            .set_page_size(1024)
1607            .create(tmpfile.path())
1608            .unwrap();
1609
1610        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1611
1612        let tx = db.begin_write().unwrap();
1613        {
1614            tx.open_table(table_def).unwrap();
1615        }
1616        tx.commit().unwrap();
1617
1618        let tx = db.begin_write().unwrap();
1619        {
1620            let mut t = tx.open_table(table_def).unwrap();
1621            assert!(t.get(&131072).unwrap().is_none());
1622            let value = vec![0xFF; 1130];
1623            t.insert(&42394, value.as_slice()).unwrap();
1624            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1625            assert!(t.get(&0).unwrap().is_none());
1626        }
1627        tx.abort().unwrap();
1628
1629        let tx = db.begin_write().unwrap();
1630        {
1631            let mut t = tx.open_table(table_def).unwrap();
1632            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1633        }
1634        tx.abort().unwrap();
1635    }
1636
1637    #[test]
1638    fn dynamic_shrink() {
1639        let tmpfile = crate::create_tempfile();
1640        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1641        let big_value = vec![0u8; 1024];
1642
1643        let db = Database::builder()
1644            .set_region_size(1024 * 1024)
1645            .create(tmpfile.path())
1646            .unwrap();
1647
1648        let txn = db.begin_write().unwrap();
1649        {
1650            let mut table = txn.open_table(table_definition).unwrap();
1651            for i in 0..2048 {
1652                table.insert(&i, big_value.as_slice()).unwrap();
1653            }
1654        }
1655        txn.commit().unwrap();
1656
1657        let file_size = tmpfile.as_file().metadata().unwrap().len();
1658
1659        let txn = db.begin_write().unwrap();
1660        {
1661            let mut table = txn.open_table(table_definition).unwrap();
1662            for i in 0..2048 {
1663                table.remove(&i).unwrap();
1664            }
1665        }
1666        txn.commit().unwrap();
1667
1668        // Perform a couple more commits to be sure the database has a chance to compact
1669        let txn = db.begin_write().unwrap();
1670        {
1671            let mut table = txn.open_table(table_definition).unwrap();
1672            table.insert(0, [].as_slice()).unwrap();
1673        }
1674        txn.commit().unwrap();
1675        let txn = db.begin_write().unwrap();
1676        {
1677            let mut table = txn.open_table(table_definition).unwrap();
1678            table.remove(0).unwrap();
1679        }
1680        txn.commit().unwrap();
1681        let txn = db.begin_write().unwrap();
1682        txn.commit().unwrap();
1683
1684        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1685        assert!(final_file_size < file_size);
1686    }
1687
1688    #[test]
1689    fn create_new_db_in_empty_file() {
1690        let tmpfile = crate::create_tempfile();
1691
1692        let _db = Database::builder()
1693            .create_file(tmpfile.into_file())
1694            .unwrap();
1695    }
1696
1697    #[test]
1698    fn open_missing_file() {
1699        let tmpfile = crate::create_tempfile();
1700
1701        let err = Database::builder()
1702            .open(tmpfile.path().with_extension("missing"))
1703            .unwrap_err();
1704
1705        match err {
1706            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1707            err => panic!("Unexpected error for empty file: {err}"),
1708        }
1709    }
1710
1711    #[test]
1712    fn open_empty_file() {
1713        let tmpfile = crate::create_tempfile();
1714
1715        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1716
1717        match err {
1718            DatabaseError::Storage(StorageError::Io(err))
1719                if err.kind() == ErrorKind::InvalidData => {}
1720            err => panic!("Unexpected error for empty file: {err}"),
1721        }
1722    }
1723}