redb/
db.rs

1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3    AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey,
4    InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree,
5    TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{RedbKey, RedbValue};
8use crate::{
9    CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, SavepointError,
10    StorageError,
11};
12use crate::{ReadTransaction, Result, WriteTransaction};
13use std::fmt::{Debug, Display, Formatter};
14
15use std::fs::{File, OpenOptions};
16use std::io;
17use std::io::ErrorKind;
18use std::marker::PhantomData;
19use std::ops::RangeFull;
20use std::path::Path;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::sync::{Arc, Condvar, Mutex};
23
24use crate::error::TransactionError;
25use crate::multimap_table::{parse_subtree_roots, DynamicCollection};
26use crate::sealed::Sealed;
27use crate::transactions::SAVEPOINT_TABLE;
28use crate::tree_store::file_backend::FileBackend;
29#[cfg(feature = "logging")]
30use log::{info, warn};
31
32#[allow(clippy::len_without_is_empty)]
33/// Implements persistent storage for a database.
34pub trait StorageBackend: 'static + Debug + Send + Sync {
35    /// Gets the current length of the storage.
36    fn len(&self) -> std::result::Result<u64, io::Error>;
37
38    /// Reads the specified array of bytes from the storage.
39    ///
40    /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur.
41    fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
42
43    /// Sets the length of the storage.
44    ///
45    /// When extending the storage the new positions should be zero initialized.
46    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
47
48    /// Syncs all buffered data with the persistent storage.
49    ///
50    /// If `eventual` is true, data may become persistent at some point after this call returns,
51    /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this
52    /// call to `sync_data()` will become persistent before any writes that occur after.
53    fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
54
55    /// Writes the specified array to the storage.
56    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
57}
58
59struct AtomicTransactionId {
60    inner: AtomicU64,
61}
62
63impl AtomicTransactionId {
64    fn new(last_id: TransactionId) -> Self {
65        Self {
66            inner: AtomicU64::new(last_id.raw_id()),
67        }
68    }
69
70    fn next(&self) -> TransactionId {
71        let id = self.inner.fetch_add(1, Ordering::AcqRel);
72        TransactionId::new(id)
73    }
74}
75
76pub trait TableHandle: Sealed {
77    // Returns the name of the table
78    fn name(&self) -> &str;
79}
80
81#[derive(Clone)]
82pub struct UntypedTableHandle {
83    name: String,
84}
85
86impl UntypedTableHandle {
87    pub(crate) fn new(name: String) -> Self {
88        Self { name }
89    }
90}
91
92impl TableHandle for UntypedTableHandle {
93    fn name(&self) -> &str {
94        &self.name
95    }
96}
97
98impl Sealed for UntypedTableHandle {}
99
100pub trait MultimapTableHandle: Sealed {
101    // Returns the name of the multimap table
102    fn name(&self) -> &str;
103}
104
105#[derive(Clone)]
106pub struct UntypedMultimapTableHandle {
107    name: String,
108}
109
110impl UntypedMultimapTableHandle {
111    pub(crate) fn new(name: String) -> Self {
112        Self { name }
113    }
114}
115
116impl MultimapTableHandle for UntypedMultimapTableHandle {
117    fn name(&self) -> &str {
118        &self.name
119    }
120}
121
122impl Sealed for UntypedMultimapTableHandle {}
123
124/// Defines the name and types of a table
125///
126/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
127///
128/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
129/// that is stored or retreived from the table
130pub struct TableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
131    name: &'a str,
132    _key_type: PhantomData<K>,
133    _value_type: PhantomData<V>,
134}
135
136impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableDefinition<'a, K, V> {
137    /// Construct a new table with given `name`
138    ///
139    /// ## Invariant
140    ///
141    /// `name` must not be empty.
142    pub const fn new(name: &'a str) -> Self {
143        assert!(!name.is_empty());
144        Self {
145            name,
146            _key_type: PhantomData,
147            _value_type: PhantomData,
148        }
149    }
150}
151
152impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle for TableDefinition<'a, K, V> {
153    fn name(&self) -> &str {
154        self.name
155    }
156}
157
158impl<K: RedbKey, V: RedbValue> Sealed for TableDefinition<'_, K, V> {}
159
160impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for TableDefinition<'a, K, V> {
161    fn clone(&self) -> Self {
162        *self
163    }
164}
165
166impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for TableDefinition<'a, K, V> {}
167
168impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for TableDefinition<'a, K, V> {
169    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170        write!(
171            f,
172            "{}<{}, {}>",
173            self.name,
174            K::type_name().name(),
175            V::type_name().name()
176        )
177    }
178}
179
180/// Defines the name and types of a multimap table
181///
182/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
183///
184/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
185///
186/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
187/// that is stored or retreived from the table
188pub struct MultimapTableDefinition<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
189    name: &'a str,
190    _key_type: PhantomData<K>,
191    _value_type: PhantomData<V>,
192}
193
194impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableDefinition<'a, K, V> {
195    pub const fn new(name: &'a str) -> Self {
196        assert!(!name.is_empty());
197        Self {
198            name,
199            _key_type: PhantomData,
200            _value_type: PhantomData,
201        }
202    }
203}
204
205impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
206    for MultimapTableDefinition<'a, K, V>
207{
208    fn name(&self) -> &str {
209        self.name
210    }
211}
212
213impl<K: RedbKey, V: RedbKey> Sealed for MultimapTableDefinition<'_, K, V> {}
214
215impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Clone for MultimapTableDefinition<'a, K, V> {
216    fn clone(&self) -> Self {
217        *self
218    }
219}
220
221impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
222
223impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Display for MultimapTableDefinition<'a, K, V> {
224    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
225        write!(
226            f,
227            "{}<{}, {}>",
228            self.name,
229            K::type_name().name(),
230            V::type_name().name()
231        )
232    }
233}
234
235/// Opened redb database file
236///
237/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
238/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
239///
240/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
241/// may be in progress at a time.
242///
243/// # Examples
244///
245/// Basic usage:
246///
247/// ```rust
248/// use redb::*;
249/// # use tempfile::NamedTempFile;
250/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
251///
252/// # fn main() -> Result<(), Error> {
253/// # let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
254/// # let filename = tmpfile.path();
255/// let db = Database::create(filename)?;
256/// let write_txn = db.begin_write()?;
257/// {
258///     let mut table = write_txn.open_table(TABLE)?;
259///     table.insert(&0, &0)?;
260/// }
261/// write_txn.commit()?;
262/// # Ok(())
263/// # }
264/// ```
265pub struct Database {
266    mem: TransactionalMemory,
267    next_transaction_id: AtomicTransactionId,
268    transaction_tracker: Arc<Mutex<TransactionTracker>>,
269    live_write_transaction: Mutex<Option<TransactionId>>,
270    live_write_transaction_available: Condvar,
271}
272
273impl Database {
274    /// Opens the specified file as a redb database.
275    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
276    /// * if the file is a valid redb database, it will be opened
277    /// * otherwise this function will return an error
278    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
279        Self::builder().create(path)
280    }
281
282    /// Opens an existing redb database.
283    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
284        Self::builder().open(path)
285    }
286
287    pub(crate) fn start_write_transaction(&self) -> TransactionId {
288        let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
289        while live_write_transaction.is_some() {
290            live_write_transaction = self
291                .live_write_transaction_available
292                .wait(live_write_transaction)
293                .unwrap();
294        }
295        assert!(live_write_transaction.is_none());
296        let transaction_id = self.next_transaction_id.next();
297        #[cfg(feature = "logging")]
298        info!("Beginning write transaction id={:?}", transaction_id);
299        *live_write_transaction = Some(transaction_id);
300
301        transaction_id
302    }
303
304    pub(crate) fn end_write_transaction(&self, id: TransactionId) {
305        let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
306        assert_eq!(live_write_transaction.unwrap(), id);
307        *live_write_transaction = None;
308        self.live_write_transaction_available.notify_one();
309    }
310
311    pub(crate) fn get_memory(&self) -> &TransactionalMemory {
312        &self.mem
313    }
314
315    pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
316        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
317        let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone());
318        if !table_tree.verify_checksums()? {
319            return Ok(false);
320        }
321        let system_table_tree =
322            TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone());
323        if !system_table_tree.verify_checksums()? {
324            return Ok(false);
325        }
326        assert!(fake_freed_pages.lock().unwrap().is_empty());
327
328        if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
329            if !RawBtree::new(
330                Some((freed_root, freed_checksum)),
331                FreedTableKey::fixed_width(),
332                FreedPageList::fixed_width(),
333                mem,
334            )
335            .verify_checksum()?
336            {
337                return Ok(false);
338            }
339        }
340
341        Ok(true)
342    }
343
344    /// Force a check of the integrity of the database file, and repair it if possible.
345    ///
346    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
347    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
348    /// quite slow and should only be used when you suspect the database file may have been modified
349    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
350    ///
351    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
352    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
353    pub fn check_integrity(&mut self) -> Result<bool> {
354        let allocator_hash = self.mem.allocator_hash();
355        let mut was_clean = self.mem.clear_cache_and_reload()?;
356
357        if !Self::verify_primary_checksums(&self.mem)? {
358            was_clean = false;
359        }
360
361        Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
362            DatabaseError::Storage(storage_err) => storage_err,
363            _ => unreachable!(),
364        })?;
365        if allocator_hash != self.mem.allocator_hash() {
366            was_clean = false;
367        }
368        self.mem.begin_writable()?;
369
370        Ok(was_clean)
371    }
372
373    /// Compacts the database file
374    ///
375    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
376    pub fn compact(&mut self) -> Result<bool, CompactionError> {
377        // Commit to free up any pending free pages
378        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter
379        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
380        if txn.list_persistent_savepoints()?.next().is_some() {
381            return Err(CompactionError::PersistentSavepointExists);
382        }
383        if self
384            .transaction_tracker
385            .lock()
386            .unwrap()
387            .any_savepoint_exists()
388        {
389            return Err(CompactionError::EphemeralSavepointExists);
390        }
391        txn.set_durability(Durability::Paranoid);
392        txn.commit().map_err(|e| e.into_storage_error())?;
393        // Repeat, just in case executing list_persistent_savepoints() created a new table
394        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
395        txn.set_durability(Durability::Paranoid);
396        txn.commit().map_err(|e| e.into_storage_error())?;
397        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
398        // should have been cleared out by the above commit()
399        assert!(self.mem.get_freed_root().is_none());
400
401        let mut compacted = false;
402        // Iteratively compact until no progress is made
403        loop {
404            let mut progress = false;
405
406            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
407            if txn.compact_pages()? {
408                progress = true;
409                txn.commit().map_err(|e| e.into_storage_error())?;
410            } else {
411                txn.abort()?;
412            }
413
414            // Double commit to free up the relocated pages for reuse
415            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
416            txn.set_durability(Durability::Paranoid);
417            txn.commit().map_err(|e| e.into_storage_error())?;
418            assert!(self.mem.get_freed_root().is_none());
419
420            if !progress {
421                break;
422            } else {
423                compacted = true;
424            }
425        }
426
427        Ok(compacted)
428    }
429
430    fn mark_persistent_savepoints(
431        system_root: Option<(PageNumber, Checksum)>,
432        mem: &TransactionalMemory,
433        oldest_unprocessed_free_transaction: TransactionId,
434    ) -> Result {
435        let freed_list = Arc::new(Mutex::new(vec![]));
436        let table_tree = TableTree::new(system_root, mem, freed_list);
437        let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new()));
438        if let Some(savepoint_table_def) = table_tree
439            .get_table::<SavepointId, SerializedSavepoint>(
440                SAVEPOINT_TABLE.name(),
441                TableType::Normal,
442            )
443            .map_err(|e| {
444                e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
445            })?
446        {
447            let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
448                ReadOnlyTable::new(
449                    "internal savepoint table".to_string(),
450                    savepoint_table_def.get_root(),
451                    PageHint::None,
452                    mem,
453                )?;
454            for result in savepoint_table.range::<SavepointId>(..)? {
455                let (_, savepoint_data) = result?;
456                let savepoint = savepoint_data
457                    .value()
458                    .to_savepoint(fake_transaction_tracker.clone());
459                if let Some((root, _)) = savepoint.get_user_root() {
460                    Self::mark_tables_recursive(root, mem, true)?;
461                }
462                Self::mark_freed_tree(
463                    savepoint.get_freed_root(),
464                    mem,
465                    oldest_unprocessed_free_transaction,
466                )?;
467            }
468        }
469
470        Ok(())
471    }
472
473    fn mark_freed_tree(
474        freed_root: Option<(PageNumber, Checksum)>,
475        mem: &TransactionalMemory,
476        oldest_unprocessed_free_transaction: TransactionId,
477    ) -> Result {
478        if let Some((root, _)) = freed_root {
479            let freed_pages_iter = AllPageNumbersBtreeIter::new(
480                root,
481                FreedTableKey::fixed_width(),
482                FreedPageList::fixed_width(),
483                mem,
484            )?;
485            mem.mark_pages_allocated(freed_pages_iter, true)?;
486        }
487
488        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
489            "internal freed table".to_string(),
490            freed_root,
491            PageHint::None,
492            mem,
493        )?;
494        let lookup_key = FreedTableKey {
495            transaction_id: oldest_unprocessed_free_transaction.raw_id(),
496            pagination_id: 0,
497        };
498        for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
499            let (_, freed_page_list) = result?;
500            let mut freed_page_list_as_vec = vec![];
501            for i in 0..freed_page_list.value().len() {
502                freed_page_list_as_vec.push(Ok(freed_page_list.value().get(i)));
503            }
504            mem.mark_pages_allocated(freed_page_list_as_vec.into_iter(), true)?;
505        }
506
507        Ok(())
508    }
509
510    fn mark_tables_recursive(
511        root: PageNumber,
512        mem: &TransactionalMemory,
513        allow_duplicates: bool,
514    ) -> Result {
515        // Repair the allocator state
516        // All pages in the master table
517        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem)?;
518        mem.mark_pages_allocated(master_pages_iter, allow_duplicates)?;
519
520        // Iterate over all other tables
521        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
522            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
523
524        // Chain all the other tables to the master table iter
525        for entry in iter {
526            let definition = entry?.value();
527            if let Some((table_root, _)) = definition.get_root() {
528                match definition.get_type() {
529                    TableType::Normal => {
530                        let table_pages_iter = AllPageNumbersBtreeIter::new(
531                            table_root,
532                            definition.get_fixed_key_size(),
533                            definition.get_fixed_value_size(),
534                            mem,
535                        )?;
536                        mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
537                    }
538                    TableType::Multimap => {
539                        let table_pages_iter = AllPageNumbersBtreeIter::new(
540                            table_root,
541                            definition.get_fixed_key_size(),
542                            DynamicCollection::<()>::fixed_width_with(
543                                definition.get_fixed_value_size(),
544                            ),
545                            mem,
546                        )?;
547                        mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
548
549                        let table_pages_iter = AllPageNumbersBtreeIter::new(
550                            table_root,
551                            definition.get_fixed_key_size(),
552                            DynamicCollection::<()>::fixed_width_with(
553                                definition.get_fixed_value_size(),
554                            ),
555                            mem,
556                        )?;
557                        for table_page in table_pages_iter {
558                            let page = mem.get_page(table_page?)?;
559                            let subtree_roots = parse_subtree_roots(
560                                &page,
561                                definition.get_fixed_key_size(),
562                                definition.get_fixed_value_size(),
563                            );
564                            for (sub_root, _) in subtree_roots {
565                                let sub_root_iter = AllPageNumbersBtreeIter::new(
566                                    sub_root,
567                                    definition.get_fixed_value_size(),
568                                    <()>::fixed_width(),
569                                    mem,
570                                )?;
571                                mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
572                            }
573                        }
574                    }
575                }
576            }
577        }
578
579        Ok(())
580    }
581
582    fn do_repair(
583        mem: &mut TransactionalMemory,
584        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
585    ) -> Result<(), DatabaseError> {
586        if !Self::verify_primary_checksums(mem)? {
587            // 0.3 because the repair takes 3 full scans and the first is done now
588            let mut handle = RepairSession::new(0.3);
589            repair_callback(&mut handle);
590            if handle.aborted() {
591                return Err(DatabaseError::RepairAborted);
592            }
593
594            mem.repair_primary_corrupted();
595            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
596            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
597            // that rolls back a partially committed transaction.
598            mem.clear_read_cache();
599            if !Self::verify_primary_checksums(mem)? {
600                return Err(DatabaseError::Storage(StorageError::Corrupted(
601                    "Failed to repair database. All roots are corrupted".to_string(),
602                )));
603            }
604        }
605        // 0.6 because the repair takes 3 full scans and the second is done now
606        let mut handle = RepairSession::new(0.6);
607        repair_callback(&mut handle);
608        if handle.aborted() {
609            return Err(DatabaseError::RepairAborted);
610        }
611
612        mem.begin_repair()?;
613
614        let data_root = mem.get_data_root();
615        if let Some((root, _)) = data_root {
616            Self::mark_tables_recursive(root, mem, false)?;
617        }
618
619        let freed_root = mem.get_freed_root();
620        // Allow processing of all transactions, since this is the main freed tree
621        Self::mark_freed_tree(freed_root, mem, TransactionId::new(0))?;
622        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
623            "internal freed table".to_string(),
624            freed_root,
625            PageHint::None,
626            mem,
627        )?;
628        // The persistent savepoints might hold references to older freed trees that are partially processed.
629        // Make sure we don't reprocess those frees, as that would result in a double-free
630        let oldest_unprocessed_transaction =
631            if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
632                TransactionId::new(entry?.0.value().transaction_id)
633            } else {
634                mem.get_last_committed_transaction_id()?
635            };
636        drop(freed_table);
637
638        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
639        let mut handle = RepairSession::new(0.9);
640        repair_callback(&mut handle);
641        if handle.aborted() {
642            return Err(DatabaseError::RepairAborted);
643        }
644
645        let system_root = mem.get_system_root();
646        if let Some((root, _)) = system_root {
647            Self::mark_tables_recursive(root, mem, false)?;
648        }
649        Self::mark_persistent_savepoints(system_root, mem, oldest_unprocessed_transaction)?;
650
651        mem.end_repair()?;
652
653        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
654        // by storing an empty root during the below commit()
655        mem.clear_read_cache();
656
657        let transaction_id = mem.get_last_committed_transaction_id()?.next();
658        mem.commit(
659            data_root,
660            system_root,
661            freed_root,
662            transaction_id,
663            false,
664            true,
665        )?;
666
667        Ok(())
668    }
669
670    fn new(
671        file: Box<dyn StorageBackend>,
672        page_size: usize,
673        region_size: Option<u64>,
674        read_cache_size_bytes: usize,
675        write_cache_size_bytes: usize,
676        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
677    ) -> Result<Self, DatabaseError> {
678        #[cfg(feature = "logging")]
679        let file_path = format!("{:?}", &file);
680        #[cfg(feature = "logging")]
681        info!("Opening database {:?}", &file_path);
682        let mut mem = TransactionalMemory::new(
683            file,
684            page_size,
685            region_size,
686            read_cache_size_bytes,
687            write_cache_size_bytes,
688        )?;
689        if mem.needs_repair()? {
690            #[cfg(feature = "logging")]
691            warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
692            let mut handle = RepairSession::new(0.0);
693            repair_callback(&mut handle);
694            if handle.aborted() {
695                return Err(DatabaseError::RepairAborted);
696            }
697            Self::do_repair(&mut mem, repair_callback)?;
698        }
699
700        mem.begin_writable()?;
701        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
702
703        let db = Database {
704            mem,
705            next_transaction_id: AtomicTransactionId::new(next_transaction_id),
706            transaction_tracker: Arc::new(Mutex::new(TransactionTracker::new())),
707            live_write_transaction: Mutex::new(None),
708            live_write_transaction_available: Condvar::new(),
709        };
710
711        // Restore the tracker state for any persistent savepoints
712        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
713        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
714            db.transaction_tracker
715                .lock()
716                .unwrap()
717                .restore_savepoint_counter_state(next_id);
718        }
719        for id in txn.list_persistent_savepoints()? {
720            let savepoint = match txn.get_persistent_savepoint(id) {
721                Ok(savepoint) => savepoint,
722                Err(err) => match err {
723                    SavepointError::InvalidSavepoint => unreachable!(),
724                    SavepointError::Storage(storage) => {
725                        return Err(storage.into());
726                    }
727                },
728            };
729            db.transaction_tracker
730                .lock()
731                .unwrap()
732                .register_persistent_savepoint(&savepoint);
733        }
734        txn.abort()?;
735
736        Ok(db)
737    }
738
739    fn allocate_read_transaction(&self) -> Result<TransactionId> {
740        let mut guard = self.transaction_tracker.lock().unwrap();
741        let id = self.mem.get_last_committed_transaction_id()?;
742        guard.register_read_transaction(id);
743
744        Ok(id)
745    }
746
747    pub(crate) fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
748        let id = self
749            .transaction_tracker
750            .lock()
751            .unwrap()
752            .allocate_savepoint();
753        Ok((id, self.allocate_read_transaction()?))
754    }
755
756    /// Convenience method for [`Builder::new`]
757    pub fn builder() -> Builder {
758        Builder::new()
759    }
760
761    /// Begins a write transaction
762    ///
763    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
764    /// write may be in progress at a time. If a write is in progress, this function will block
765    /// until it completes.
766    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
767        WriteTransaction::new(self, self.transaction_tracker.clone()).map_err(|e| e.into())
768    }
769
770    /// Begins a read transaction
771    ///
772    /// Captures a snapshot of the database, so that only data committed before calling this method
773    /// is visible in the transaction
774    ///
775    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
776    /// may exist concurrently with writes
777    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
778        let id = self.allocate_read_transaction()?;
779        #[cfg(feature = "logging")]
780        info!("Beginning read transaction id={:?}", id);
781        Ok(ReadTransaction::new(
782            self.get_memory(),
783            self.transaction_tracker.clone(),
784            id,
785        ))
786    }
787}
788
789pub struct RepairSession {
790    progress: f64,
791    aborted: bool,
792}
793
794impl RepairSession {
795    pub(crate) fn new(progress: f64) -> Self {
796        Self {
797            progress,
798            aborted: false,
799        }
800    }
801
802    pub(crate) fn aborted(&self) -> bool {
803        self.aborted
804    }
805
806    /// Abort the repair process. The coorresponding call to [Builder::open] or [Builder::create] will return an error
807    pub fn abort(&mut self) {
808        self.aborted = true;
809    }
810
811    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
812    pub fn progress(&self) -> f64 {
813        self.progress
814    }
815}
816
817/// Configuration builder of a redb [Database].
818pub struct Builder {
819    page_size: usize,
820    region_size: Option<u64>,
821    read_cache_size_bytes: usize,
822    write_cache_size_bytes: usize,
823    repair_callback: Box<dyn Fn(&mut RepairSession)>,
824}
825
826impl Builder {
827    /// Construct a new [Builder] with sensible defaults.
828    ///
829    /// ## Defaults
830    ///
831    /// - `cache_size_bytes`: 1GiB
832    #[allow(clippy::new_without_default)]
833    pub fn new() -> Self {
834        let mut result = Self {
835            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
836            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
837            // It is part of the file format, so can be enabled in the future.
838            page_size: PAGE_SIZE,
839            region_size: None,
840            // TODO: Default should probably take into account the total system memory
841            read_cache_size_bytes: 0,
842            // TODO: Default should probably take into account the total system memory
843            write_cache_size_bytes: 0,
844            repair_callback: Box::new(|_| {}),
845        };
846
847        result.set_cache_size(1024 * 1024 * 1024);
848        result
849    }
850
851    /// Set a callback which will be invoked periodically in the event that the database file needs
852    /// to be repaired.
853    ///
854    /// The [RepairSession] argument can be used to control the repair process.
855    ///
856    /// If the database file needs repair, the callback will be invoked at least once.
857    /// There is no upper limit on the number of times it may be called.
858    pub fn set_repair_callback(
859        &mut self,
860        callback: impl Fn(&mut RepairSession) + 'static,
861    ) -> &mut Self {
862        self.repair_callback = Box::new(callback);
863        self
864    }
865
866    /// Set the internal page size of the database
867    ///
868    /// Valid values are powers of two, greater than or equal to 512
869    ///
870    /// ## Defaults
871    ///
872    /// Default to 4 Kib pages.
873    #[cfg(any(fuzzing, test))]
874    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
875        assert!(size.is_power_of_two());
876        self.page_size = std::cmp::max(size, 512);
877        self
878    }
879
880    /// Set the amount of memory (in bytes) used for caching data
881    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
882        // TODO: allow dynamic expansion of the read/write cache
883        self.read_cache_size_bytes = bytes / 10 * 9;
884        self.write_cache_size_bytes = bytes / 10;
885        self
886    }
887
888    #[cfg(any(test, fuzzing))]
889    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
890        assert!(size.is_power_of_two());
891        self.region_size = Some(size);
892        self
893    }
894
895    /// Opens the specified file as a redb database.
896    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
897    /// * if the file is a valid redb database, it will be opened
898    /// * otherwise this function will return an error
899    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
900        let file = OpenOptions::new()
901            .read(true)
902            .write(true)
903            .create(true)
904            .open(path)?;
905
906        Database::new(
907            Box::new(FileBackend::new(file)?),
908            self.page_size,
909            self.region_size,
910            self.read_cache_size_bytes,
911            self.write_cache_size_bytes,
912            &self.repair_callback,
913        )
914    }
915
916    /// Opens an existing redb database.
917    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
918        let file = OpenOptions::new().read(true).write(true).open(path)?;
919
920        if file.metadata()?.len() == 0 {
921            return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
922        }
923
924        Database::new(
925            Box::new(FileBackend::new(file)?),
926            self.page_size,
927            None,
928            self.read_cache_size_bytes,
929            self.write_cache_size_bytes,
930            &self.repair_callback,
931        )
932    }
933
934    /// Open an existing or create a new database in the given `file`.
935    ///
936    /// The file must be empty or contain a valid database.
937    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
938        Database::new(
939            Box::new(FileBackend::new(file)?),
940            self.page_size,
941            self.region_size,
942            self.read_cache_size_bytes,
943            self.write_cache_size_bytes,
944            &self.repair_callback,
945        )
946    }
947
948    /// Open an existing or create a new database with the given backend.
949    pub fn create_with_backend(
950        &self,
951        backend: impl StorageBackend,
952    ) -> Result<Database, DatabaseError> {
953        Database::new(
954            Box::new(backend),
955            self.page_size,
956            self.region_size,
957            self.read_cache_size_bytes,
958            self.write_cache_size_bytes,
959            &self.repair_callback,
960        )
961    }
962}
963
964impl std::fmt::Debug for Database {
965    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
966        f.debug_struct("Database").finish()
967    }
968}
969
970#[cfg(test)]
971mod test {
972    use crate::backends::FileBackend;
973    use crate::{
974        Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError,
975        TableDefinition,
976    };
977    use std::io::ErrorKind;
978    use std::sync::atomic::{AtomicU64, Ordering};
979
980    #[derive(Debug)]
981    struct FailingBackend {
982        inner: FileBackend,
983        countdown: AtomicU64,
984    }
985
986    impl FailingBackend {
987        fn new(backend: FileBackend, countdown: u64) -> Self {
988            Self {
989                inner: backend,
990                countdown: AtomicU64::new(countdown),
991            }
992        }
993
994        fn check_countdown(&self) -> Result<(), std::io::Error> {
995            if self.countdown.load(Ordering::SeqCst) == 0 {
996                return Err(std::io::Error::from(ErrorKind::Other));
997            }
998
999            Ok(())
1000        }
1001
1002        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1003            if self
1004                .countdown
1005                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1006                    if x > 0 {
1007                        Some(x - 1)
1008                    } else {
1009                        None
1010                    }
1011                })
1012                .is_err()
1013            {
1014                return Err(std::io::Error::from(ErrorKind::Other));
1015            }
1016
1017            Ok(())
1018        }
1019    }
1020
1021    impl StorageBackend for FailingBackend {
1022        fn len(&self) -> Result<u64, std::io::Error> {
1023            self.inner.len()
1024        }
1025
1026        fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1027            self.check_countdown()?;
1028            self.inner.read(offset, len)
1029        }
1030
1031        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1032            self.inner.set_len(len)
1033        }
1034
1035        fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1036            self.check_countdown()?;
1037            self.inner.sync_data(eventual)
1038        }
1039
1040        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1041            self.decrement_countdown()?;
1042            self.inner.write(offset, data)
1043        }
1044    }
1045
1046    #[test]
1047    fn crash_regression4() {
1048        let tmpfile = crate::create_tempfile();
1049
1050        let backend = FailingBackend::new(
1051            FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(),
1052            23,
1053        );
1054        let db = Database::builder()
1055            .set_cache_size(12686)
1056            .set_page_size(8 * 1024)
1057            .set_region_size(32 * 4096)
1058            .create_with_backend(backend)
1059            .unwrap();
1060
1061        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1062
1063        let tx = db.begin_write().unwrap();
1064        let _savepoint = tx.ephemeral_savepoint().unwrap();
1065        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1066        tx.commit().unwrap();
1067        let tx = db.begin_write().unwrap();
1068        {
1069            let mut table = tx.open_table(table_def).unwrap();
1070            let _ = table.insert_reserve(118821, 360).unwrap();
1071        }
1072        let result = tx.commit();
1073        assert!(result.is_err());
1074
1075        drop(db);
1076        Database::builder()
1077            .set_cache_size(1024 * 1024)
1078            .set_page_size(8 * 1024)
1079            .set_region_size(32 * 4096)
1080            .create(tmpfile.path())
1081            .unwrap();
1082    }
1083
1084    #[test]
1085    fn small_pages() {
1086        let tmpfile = crate::create_tempfile();
1087
1088        let db = Database::builder()
1089            .set_page_size(512)
1090            .create(tmpfile.path())
1091            .unwrap();
1092
1093        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1094        let txn = db.begin_write().unwrap();
1095        {
1096            txn.open_table(table_definition).unwrap();
1097        }
1098        txn.commit().unwrap();
1099    }
1100
1101    #[test]
1102    fn small_pages2() {
1103        let tmpfile = crate::create_tempfile();
1104
1105        let db = Database::builder()
1106            .set_page_size(512)
1107            .create(tmpfile.path())
1108            .unwrap();
1109
1110        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1111
1112        let mut tx = db.begin_write().unwrap();
1113        tx.set_durability(Durability::Paranoid);
1114        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1115        {
1116            tx.open_table(table_def).unwrap();
1117        }
1118        tx.commit().unwrap();
1119
1120        let mut tx = db.begin_write().unwrap();
1121        tx.set_durability(Durability::Paranoid);
1122        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1123        tx.restore_savepoint(&savepoint0).unwrap();
1124        tx.set_durability(Durability::None);
1125        {
1126            let mut t = tx.open_table(table_def).unwrap();
1127            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1128            assert!(t.remove(&291295).unwrap().is_none());
1129        }
1130        tx.commit().unwrap();
1131
1132        let mut tx = db.begin_write().unwrap();
1133        tx.set_durability(Durability::Paranoid);
1134        tx.restore_savepoint(&savepoint0).unwrap();
1135        {
1136            tx.open_table(table_def).unwrap();
1137        }
1138        tx.commit().unwrap();
1139
1140        let mut tx = db.begin_write().unwrap();
1141        tx.set_durability(Durability::Paranoid);
1142        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1143        drop(savepoint0);
1144        tx.restore_savepoint(&savepoint2).unwrap();
1145        {
1146            let mut t = tx.open_table(table_def).unwrap();
1147            assert!(t.get(&2059).unwrap().is_none());
1148            assert!(t.remove(&145227).unwrap().is_none());
1149            assert!(t.remove(&145227).unwrap().is_none());
1150        }
1151        tx.commit().unwrap();
1152
1153        let mut tx = db.begin_write().unwrap();
1154        tx.set_durability(Durability::Paranoid);
1155        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1156        drop(savepoint1);
1157        tx.restore_savepoint(&savepoint3).unwrap();
1158        {
1159            tx.open_table(table_def).unwrap();
1160        }
1161        tx.commit().unwrap();
1162
1163        let mut tx = db.begin_write().unwrap();
1164        tx.set_durability(Durability::Paranoid);
1165        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1166        drop(savepoint2);
1167        tx.restore_savepoint(&savepoint3).unwrap();
1168        tx.set_durability(Durability::None);
1169        {
1170            let mut t = tx.open_table(table_def).unwrap();
1171            assert!(t.remove(&207936).unwrap().is_none());
1172        }
1173        tx.abort().unwrap();
1174
1175        let mut tx = db.begin_write().unwrap();
1176        tx.set_durability(Durability::Paranoid);
1177        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1178        drop(savepoint3);
1179        assert!(tx.restore_savepoint(&savepoint4).is_err());
1180        {
1181            tx.open_table(table_def).unwrap();
1182        }
1183        tx.commit().unwrap();
1184
1185        let mut tx = db.begin_write().unwrap();
1186        tx.set_durability(Durability::Paranoid);
1187        tx.restore_savepoint(&savepoint5).unwrap();
1188        tx.set_durability(Durability::None);
1189        {
1190            tx.open_table(table_def).unwrap();
1191        }
1192        tx.commit().unwrap();
1193    }
1194
1195    #[test]
1196    fn small_pages3() {
1197        let tmpfile = crate::create_tempfile();
1198
1199        let db = Database::builder()
1200            .set_page_size(1024)
1201            .create(tmpfile.path())
1202            .unwrap();
1203
1204        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1205
1206        let mut tx = db.begin_write().unwrap();
1207        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1208        tx.set_durability(Durability::None);
1209        {
1210            let mut t = tx.open_table(table_def).unwrap();
1211            let value = vec![0; 306];
1212            t.insert(&539717, value.as_slice()).unwrap();
1213        }
1214        tx.abort().unwrap();
1215
1216        let mut tx = db.begin_write().unwrap();
1217        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1218        tx.restore_savepoint(&savepoint1).unwrap();
1219        tx.set_durability(Durability::None);
1220        {
1221            let mut t = tx.open_table(table_def).unwrap();
1222            let value = vec![0; 2008];
1223            t.insert(&784384, value.as_slice()).unwrap();
1224        }
1225        tx.abort().unwrap();
1226    }
1227
1228    #[test]
1229    fn small_pages4() {
1230        let tmpfile = crate::create_tempfile();
1231
1232        let db = Database::builder()
1233            .set_cache_size(1024 * 1024)
1234            .set_page_size(1024)
1235            .create(tmpfile.path())
1236            .unwrap();
1237
1238        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1239
1240        let tx = db.begin_write().unwrap();
1241        {
1242            tx.open_table(table_def).unwrap();
1243        }
1244        tx.commit().unwrap();
1245
1246        let tx = db.begin_write().unwrap();
1247        {
1248            let mut t = tx.open_table(table_def).unwrap();
1249            assert!(t.get(&131072).unwrap().is_none());
1250            let value = vec![0xFF; 1130];
1251            t.insert(&42394, value.as_slice()).unwrap();
1252            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1253            assert!(t.get(&0).unwrap().is_none());
1254        }
1255        tx.abort().unwrap();
1256
1257        let tx = db.begin_write().unwrap();
1258        {
1259            let mut t = tx.open_table(table_def).unwrap();
1260            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1261        }
1262        tx.abort().unwrap();
1263    }
1264
1265    #[test]
1266    fn dynamic_shrink() {
1267        let tmpfile = crate::create_tempfile();
1268        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1269        let big_value = vec![0u8; 1024];
1270
1271        let db = Database::builder()
1272            .set_region_size(1024 * 1024)
1273            .create(tmpfile.path())
1274            .unwrap();
1275
1276        let txn = db.begin_write().unwrap();
1277        {
1278            let mut table = txn.open_table(table_definition).unwrap();
1279            for i in 0..2048 {
1280                table.insert(&i, big_value.as_slice()).unwrap();
1281            }
1282        }
1283        txn.commit().unwrap();
1284
1285        let file_size = tmpfile.as_file().metadata().unwrap().len();
1286
1287        let txn = db.begin_write().unwrap();
1288        {
1289            let mut table = txn.open_table(table_definition).unwrap();
1290            for i in 0..2048 {
1291                table.remove(&i).unwrap();
1292            }
1293        }
1294        txn.commit().unwrap();
1295
1296        // Perform a couple more commits to be sure the database has a chance to compact
1297        let txn = db.begin_write().unwrap();
1298        {
1299            let mut table = txn.open_table(table_definition).unwrap();
1300            table.insert(0, [].as_slice()).unwrap();
1301        }
1302        txn.commit().unwrap();
1303        let txn = db.begin_write().unwrap();
1304        {
1305            let mut table = txn.open_table(table_definition).unwrap();
1306            table.remove(0).unwrap();
1307        }
1308        txn.commit().unwrap();
1309        let txn = db.begin_write().unwrap();
1310        txn.commit().unwrap();
1311
1312        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1313        assert!(final_file_size < file_size);
1314    }
1315
1316    #[test]
1317    fn create_new_db_in_empty_file() {
1318        let tmpfile = crate::create_tempfile();
1319
1320        let _db = Database::builder()
1321            .create_file(tmpfile.into_file())
1322            .unwrap();
1323    }
1324
1325    #[test]
1326    fn open_missing_file() {
1327        let tmpfile = crate::create_tempfile();
1328
1329        let err = Database::builder()
1330            .open(tmpfile.path().with_extension("missing"))
1331            .unwrap_err();
1332
1333        match err {
1334            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1335            err => panic!("Unexpected error for empty file: {err}"),
1336        }
1337    }
1338
1339    #[test]
1340    fn open_empty_file() {
1341        let tmpfile = crate::create_tempfile();
1342
1343        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1344
1345        match err {
1346            DatabaseError::Storage(StorageError::Io(err))
1347                if err.kind() == ErrorKind::InvalidData => {}
1348            err => panic!("Unexpected error for empty file: {err}"),
1349        }
1350    }
1351}