redb_32bit/
db.rs

1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3    AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey,
4    InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree,
5    TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{RedbKey, RedbValue};
8use crate::{
9    CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, SavepointError,
10    StorageError,
11};
12use crate::{ReadTransaction, Result, WriteTransaction};
13use std::fmt::{Debug, Display, Formatter};
14
15use std::fs::{File, OpenOptions};
16use std::io;
17use std::io::ErrorKind;
18use std::marker::PhantomData;
19use std::ops::RangeFull;
20use std::path::Path;
21#[cfg(not(target_has_atomic = "64"))]
22use portable_atomic::{AtomicU64, Ordering};
23#[cfg(target_has_atomic = "64")]
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::{Arc, Condvar, Mutex};
26
27use crate::error::TransactionError;
28use crate::multimap_table::{parse_subtree_roots, DynamicCollection};
29use crate::sealed::Sealed;
30use crate::transactions::SAVEPOINT_TABLE;
31use crate::tree_store::file_backend::FileBackend;
32#[cfg(feature = "logging")]
33use log::{info, warn};
34
35#[allow(clippy::len_without_is_empty)]
36/// Implements persistent storage for a database.
37pub trait StorageBackend: 'static + Debug + Send + Sync {
38    /// Gets the current length of the storage.
39    fn len(&self) -> std::result::Result<u64, io::Error>;
40
41    /// Reads the specified array of bytes from the storage.
42    ///
43    /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur.
44    fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
45
46    /// Sets the length of the storage.
47    ///
48    /// When extending the storage the new positions should be zero initialized.
49    fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
50
51    /// Syncs all buffered data with the persistent storage.
52    ///
53    /// If `eventual` is true, data may become persistent at some point after this call returns,
54    /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this
55    /// call to `sync_data()` will become persistent before any writes that occur after.
56    fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
57
58    /// Writes the specified array to the storage.
59    fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
60}
61
62struct AtomicTransactionId {
63    inner: AtomicU64,
64}
65
66impl AtomicTransactionId {
67    fn new(last_id: TransactionId) -> Self {
68        Self {
69            inner: AtomicU64::new(last_id.raw_id()),
70        }
71    }
72
73    fn next(&self) -> TransactionId {
74        let id = self.inner.fetch_add(1, Ordering::AcqRel);
75        TransactionId::new(id)
76    }
77}
78
79pub trait TableHandle: Sealed {
80    // Returns the name of the table
81    fn name(&self) -> &str;
82}
83
84#[derive(Clone)]
85pub struct UntypedTableHandle {
86    name: String,
87}
88
89impl UntypedTableHandle {
90    pub(crate) fn new(name: String) -> Self {
91        Self { name }
92    }
93}
94
95impl TableHandle for UntypedTableHandle {
96    fn name(&self) -> &str {
97        &self.name
98    }
99}
100
101impl Sealed for UntypedTableHandle {}
102
103pub trait MultimapTableHandle: Sealed {
104    // Returns the name of the multimap table
105    fn name(&self) -> &str;
106}
107
108#[derive(Clone)]
109pub struct UntypedMultimapTableHandle {
110    name: String,
111}
112
113impl UntypedMultimapTableHandle {
114    pub(crate) fn new(name: String) -> Self {
115        Self { name }
116    }
117}
118
119impl MultimapTableHandle for UntypedMultimapTableHandle {
120    fn name(&self) -> &str {
121        &self.name
122    }
123}
124
125impl Sealed for UntypedMultimapTableHandle {}
126
127/// Defines the name and types of a table
128///
129/// A [`TableDefinition`] should be opened for use by calling [`ReadTransaction::open_table`] or [`WriteTransaction::open_table`]
130///
131/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
132/// that is stored or retreived from the table
133pub struct TableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
134    name: &'a str,
135    _key_type: PhantomData<K>,
136    _value_type: PhantomData<V>,
137}
138
139impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableDefinition<'a, K, V> {
140    /// Construct a new table with given `name`
141    ///
142    /// ## Invariant
143    ///
144    /// `name` must not be empty.
145    pub const fn new(name: &'a str) -> Self {
146        assert!(!name.is_empty());
147        Self {
148            name,
149            _key_type: PhantomData,
150            _value_type: PhantomData,
151        }
152    }
153}
154
155impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle for TableDefinition<'a, K, V> {
156    fn name(&self) -> &str {
157        self.name
158    }
159}
160
161impl<K: RedbKey, V: RedbValue> Sealed for TableDefinition<'_, K, V> {}
162
163impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for TableDefinition<'a, K, V> {
164    fn clone(&self) -> Self {
165        *self
166    }
167}
168
169impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for TableDefinition<'a, K, V> {}
170
171impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for TableDefinition<'a, K, V> {
172    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173        write!(
174            f,
175            "{}<{}, {}>",
176            self.name,
177            K::type_name().name(),
178            V::type_name().name()
179        )
180    }
181}
182
183/// Defines the name and types of a multimap table
184///
185/// A [`MultimapTableDefinition`] should be opened for use by calling [`ReadTransaction::open_multimap_table`] or [`WriteTransaction::open_multimap_table`]
186///
187/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
188///
189/// Note that the lifetime of the `K` and `V` type parameters does not impact the lifetimes of the data
190/// that is stored or retreived from the table
191pub struct MultimapTableDefinition<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
192    name: &'a str,
193    _key_type: PhantomData<K>,
194    _value_type: PhantomData<V>,
195}
196
197impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableDefinition<'a, K, V> {
198    pub const fn new(name: &'a str) -> Self {
199        assert!(!name.is_empty());
200        Self {
201            name,
202            _key_type: PhantomData,
203            _value_type: PhantomData,
204        }
205    }
206}
207
208impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
209    for MultimapTableDefinition<'a, K, V>
210{
211    fn name(&self) -> &str {
212        self.name
213    }
214}
215
216impl<K: RedbKey, V: RedbKey> Sealed for MultimapTableDefinition<'_, K, V> {}
217
218impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Clone for MultimapTableDefinition<'a, K, V> {
219    fn clone(&self) -> Self {
220        *self
221    }
222}
223
224impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
225
226impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Display for MultimapTableDefinition<'a, K, V> {
227    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
228        write!(
229            f,
230            "{}<{}, {}>",
231            self.name,
232            K::type_name().name(),
233            V::type_name().name()
234        )
235    }
236}
237
238/// Opened redb database file
239///
240/// Use [`Self::begin_read`] to get a [`ReadTransaction`] object that can be used to read from the database
241/// Use [`Self::begin_write`] to get a [`WriteTransaction`] object that can be used to read or write to the database
242///
243/// Multiple reads may be performed concurrently, with each other, and with writes. Only a single write
244/// may be in progress at a time.
245///
246/// # Examples
247///
248/// Basic usage:
249///
250/// ```rust
251/// use redb::*;
252/// # use tempfile::NamedTempFile;
253/// const TABLE: TableDefinition<u64, u64> = TableDefinition::new("my_data");
254///
255/// # fn main() -> Result<(), Error> {
256/// # let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
257/// # let filename = tmpfile.path();
258/// let db = Database::create(filename)?;
259/// let write_txn = db.begin_write()?;
260/// {
261///     let mut table = write_txn.open_table(TABLE)?;
262///     table.insert(&0, &0)?;
263/// }
264/// write_txn.commit()?;
265/// # Ok(())
266/// # }
267/// ```
268pub struct Database {
269    mem: TransactionalMemory,
270    next_transaction_id: AtomicTransactionId,
271    transaction_tracker: Arc<Mutex<TransactionTracker>>,
272    live_write_transaction: Mutex<Option<TransactionId>>,
273    live_write_transaction_available: Condvar,
274}
275
276impl Database {
277    /// Opens the specified file as a redb database.
278    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
279    /// * if the file is a valid redb database, it will be opened
280    /// * otherwise this function will return an error
281    pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
282        Self::builder().create(path)
283    }
284
285    /// Opens an existing redb database.
286    pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
287        Self::builder().open(path)
288    }
289
290    pub(crate) fn start_write_transaction(&self) -> TransactionId {
291        let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
292        while live_write_transaction.is_some() {
293            live_write_transaction = self
294                .live_write_transaction_available
295                .wait(live_write_transaction)
296                .unwrap();
297        }
298        assert!(live_write_transaction.is_none());
299        let transaction_id = self.next_transaction_id.next();
300        #[cfg(feature = "logging")]
301        info!("Beginning write transaction id={:?}", transaction_id);
302        *live_write_transaction = Some(transaction_id);
303
304        transaction_id
305    }
306
307    pub(crate) fn end_write_transaction(&self, id: TransactionId) {
308        let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
309        assert_eq!(live_write_transaction.unwrap(), id);
310        *live_write_transaction = None;
311        self.live_write_transaction_available.notify_one();
312    }
313
314    pub(crate) fn get_memory(&self) -> &TransactionalMemory {
315        &self.mem
316    }
317
318    pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
319        let fake_freed_pages = Arc::new(Mutex::new(vec![]));
320        let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone());
321        if !table_tree.verify_checksums()? {
322            return Ok(false);
323        }
324        let system_table_tree =
325            TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone());
326        if !system_table_tree.verify_checksums()? {
327            return Ok(false);
328        }
329        assert!(fake_freed_pages.lock().unwrap().is_empty());
330
331        if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
332            if !RawBtree::new(
333                Some((freed_root, freed_checksum)),
334                FreedTableKey::fixed_width(),
335                FreedPageList::fixed_width(),
336                mem,
337            )
338            .verify_checksum()?
339            {
340                return Ok(false);
341            }
342        }
343
344        Ok(true)
345    }
346
347    /// Force a check of the integrity of the database file, and repair it if possible.
348    ///
349    /// Note: Calling this function is unnecessary during normal operation. redb will automatically
350    /// detect and recover from crashes, power loss, and other unclean shutdowns. This function is
351    /// quite slow and should only be used when you suspect the database file may have been modified
352    /// externally to redb, or that a redb bug may have left the database in a corrupted state.
353    ///
354    /// Returns `Ok(true)` if the database passed integrity checks; `Ok(false)` if it failed but was repaired,
355    /// and `Err(Corrupted)` if the check failed and the file could not be repaired
356    pub fn check_integrity(&mut self) -> Result<bool> {
357        let allocator_hash = self.mem.allocator_hash();
358        let mut was_clean = self.mem.clear_cache_and_reload()?;
359
360        if !Self::verify_primary_checksums(&self.mem)? {
361            was_clean = false;
362        }
363
364        Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
365            DatabaseError::Storage(storage_err) => storage_err,
366            _ => unreachable!(),
367        })?;
368        if allocator_hash != self.mem.allocator_hash() {
369            was_clean = false;
370        }
371        self.mem.begin_writable()?;
372
373        Ok(was_clean)
374    }
375
376    /// Compacts the database file
377    ///
378    /// Returns `true` if compaction was performed, and `false` if no futher compaction was possible
379    pub fn compact(&mut self) -> Result<bool, CompactionError> {
380        // Commit to free up any pending free pages
381        // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter
382        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
383        if txn.list_persistent_savepoints()?.next().is_some() {
384            return Err(CompactionError::PersistentSavepointExists);
385        }
386        if self
387            .transaction_tracker
388            .lock()
389            .unwrap()
390            .any_savepoint_exists()
391        {
392            return Err(CompactionError::EphemeralSavepointExists);
393        }
394        txn.set_durability(Durability::Paranoid);
395        txn.commit().map_err(|e| e.into_storage_error())?;
396        // Repeat, just in case executing list_persistent_savepoints() created a new table
397        let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
398        txn.set_durability(Durability::Paranoid);
399        txn.commit().map_err(|e| e.into_storage_error())?;
400        // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages
401        // should have been cleared out by the above commit()
402        assert!(self.mem.get_freed_root().is_none());
403
404        let mut compacted = false;
405        // Iteratively compact until no progress is made
406        loop {
407            let mut progress = false;
408
409            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
410            if txn.compact_pages()? {
411                progress = true;
412                txn.commit().map_err(|e| e.into_storage_error())?;
413            } else {
414                txn.abort()?;
415            }
416
417            // Double commit to free up the relocated pages for reuse
418            let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
419            txn.set_durability(Durability::Paranoid);
420            txn.commit().map_err(|e| e.into_storage_error())?;
421            assert!(self.mem.get_freed_root().is_none());
422
423            if !progress {
424                break;
425            } else {
426                compacted = true;
427            }
428        }
429
430        Ok(compacted)
431    }
432
433    fn mark_persistent_savepoints(
434        system_root: Option<(PageNumber, Checksum)>,
435        mem: &TransactionalMemory,
436        oldest_unprocessed_free_transaction: TransactionId,
437    ) -> Result {
438        let freed_list = Arc::new(Mutex::new(vec![]));
439        let table_tree = TableTree::new(system_root, mem, freed_list);
440        let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new()));
441        if let Some(savepoint_table_def) = table_tree
442            .get_table::<SavepointId, SerializedSavepoint>(
443                SAVEPOINT_TABLE.name(),
444                TableType::Normal,
445            )
446            .map_err(|e| {
447                e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
448            })?
449        {
450            let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
451                ReadOnlyTable::new(
452                    "internal savepoint table".to_string(),
453                    savepoint_table_def.get_root(),
454                    PageHint::None,
455                    mem,
456                )?;
457            for result in savepoint_table.range::<SavepointId>(..)? {
458                let (_, savepoint_data) = result?;
459                let savepoint = savepoint_data
460                    .value()
461                    .to_savepoint(fake_transaction_tracker.clone());
462                if let Some((root, _)) = savepoint.get_user_root() {
463                    Self::mark_tables_recursive(root, mem, true)?;
464                }
465                Self::mark_freed_tree(
466                    savepoint.get_freed_root(),
467                    mem,
468                    oldest_unprocessed_free_transaction,
469                )?;
470            }
471        }
472
473        Ok(())
474    }
475
476    fn mark_freed_tree(
477        freed_root: Option<(PageNumber, Checksum)>,
478        mem: &TransactionalMemory,
479        oldest_unprocessed_free_transaction: TransactionId,
480    ) -> Result {
481        if let Some((root, _)) = freed_root {
482            let freed_pages_iter = AllPageNumbersBtreeIter::new(
483                root,
484                FreedTableKey::fixed_width(),
485                FreedPageList::fixed_width(),
486                mem,
487            )?;
488            mem.mark_pages_allocated(freed_pages_iter, true)?;
489        }
490
491        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
492            "internal freed table".to_string(),
493            freed_root,
494            PageHint::None,
495            mem,
496        )?;
497        let lookup_key = FreedTableKey {
498            transaction_id: oldest_unprocessed_free_transaction.raw_id(),
499            pagination_id: 0,
500        };
501        for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
502            let (_, freed_page_list) = result?;
503            let mut freed_page_list_as_vec = vec![];
504            for i in 0..freed_page_list.value().len() {
505                freed_page_list_as_vec.push(Ok(freed_page_list.value().get(i)));
506            }
507            mem.mark_pages_allocated(freed_page_list_as_vec.into_iter(), true)?;
508        }
509
510        Ok(())
511    }
512
513    fn mark_tables_recursive(
514        root: PageNumber,
515        mem: &TransactionalMemory,
516        allow_duplicates: bool,
517    ) -> Result {
518        // Repair the allocator state
519        // All pages in the master table
520        let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem)?;
521        mem.mark_pages_allocated(master_pages_iter, allow_duplicates)?;
522
523        // Iterate over all other tables
524        let iter: BtreeRangeIter<&str, InternalTableDefinition> =
525            BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
526
527        // Chain all the other tables to the master table iter
528        for entry in iter {
529            let definition = entry?.value();
530            if let Some((table_root, _)) = definition.get_root() {
531                match definition.get_type() {
532                    TableType::Normal => {
533                        let table_pages_iter = AllPageNumbersBtreeIter::new(
534                            table_root,
535                            definition.get_fixed_key_size(),
536                            definition.get_fixed_value_size(),
537                            mem,
538                        )?;
539                        mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
540                    }
541                    TableType::Multimap => {
542                        let table_pages_iter = AllPageNumbersBtreeIter::new(
543                            table_root,
544                            definition.get_fixed_key_size(),
545                            DynamicCollection::<()>::fixed_width_with(
546                                definition.get_fixed_value_size(),
547                            ),
548                            mem,
549                        )?;
550                        mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
551
552                        let table_pages_iter = AllPageNumbersBtreeIter::new(
553                            table_root,
554                            definition.get_fixed_key_size(),
555                            DynamicCollection::<()>::fixed_width_with(
556                                definition.get_fixed_value_size(),
557                            ),
558                            mem,
559                        )?;
560                        for table_page in table_pages_iter {
561                            let page = mem.get_page(table_page?)?;
562                            let subtree_roots = parse_subtree_roots(
563                                &page,
564                                definition.get_fixed_key_size(),
565                                definition.get_fixed_value_size(),
566                            );
567                            for (sub_root, _) in subtree_roots {
568                                let sub_root_iter = AllPageNumbersBtreeIter::new(
569                                    sub_root,
570                                    definition.get_fixed_value_size(),
571                                    <()>::fixed_width(),
572                                    mem,
573                                )?;
574                                mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
575                            }
576                        }
577                    }
578                }
579            }
580        }
581
582        Ok(())
583    }
584
585    fn do_repair(
586        mem: &mut TransactionalMemory,
587        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
588    ) -> Result<(), DatabaseError> {
589        if !Self::verify_primary_checksums(mem)? {
590            // 0.3 because the repair takes 3 full scans and the first is done now
591            let mut handle = RepairSession::new(0.3);
592            repair_callback(&mut handle);
593            if handle.aborted() {
594                return Err(DatabaseError::RepairAborted);
595            }
596
597            mem.repair_primary_corrupted();
598            // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
599            // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
600            // that rolls back a partially committed transaction.
601            mem.clear_read_cache();
602            if !Self::verify_primary_checksums(mem)? {
603                return Err(DatabaseError::Storage(StorageError::Corrupted(
604                    "Failed to repair database. All roots are corrupted".to_string(),
605                )));
606            }
607        }
608        // 0.6 because the repair takes 3 full scans and the second is done now
609        let mut handle = RepairSession::new(0.6);
610        repair_callback(&mut handle);
611        if handle.aborted() {
612            return Err(DatabaseError::RepairAborted);
613        }
614
615        mem.begin_repair()?;
616
617        let data_root = mem.get_data_root();
618        if let Some((root, _)) = data_root {
619            Self::mark_tables_recursive(root, mem, false)?;
620        }
621
622        let freed_root = mem.get_freed_root();
623        // Allow processing of all transactions, since this is the main freed tree
624        Self::mark_freed_tree(freed_root, mem, TransactionId::new(0))?;
625        let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
626            "internal freed table".to_string(),
627            freed_root,
628            PageHint::None,
629            mem,
630        )?;
631        // The persistent savepoints might hold references to older freed trees that are partially processed.
632        // Make sure we don't reprocess those frees, as that would result in a double-free
633        let oldest_unprocessed_transaction =
634            if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
635                TransactionId::new(entry?.0.value().transaction_id)
636            } else {
637                mem.get_last_committed_transaction_id()?
638            };
639        drop(freed_table);
640
641        // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
642        let mut handle = RepairSession::new(0.9);
643        repair_callback(&mut handle);
644        if handle.aborted() {
645            return Err(DatabaseError::RepairAborted);
646        }
647
648        let system_root = mem.get_system_root();
649        if let Some((root, _)) = system_root {
650            Self::mark_tables_recursive(root, mem, false)?;
651        }
652        Self::mark_persistent_savepoints(system_root, mem, oldest_unprocessed_transaction)?;
653
654        mem.end_repair()?;
655
656        // We need to invalidate the userspace cache, because we're about to implicitly free the freed table
657        // by storing an empty root during the below commit()
658        mem.clear_read_cache();
659
660        let transaction_id = mem.get_last_committed_transaction_id()?.next();
661        mem.commit(
662            data_root,
663            system_root,
664            freed_root,
665            transaction_id,
666            false,
667            true,
668        )?;
669
670        Ok(())
671    }
672
673    fn new(
674        file: Box<dyn StorageBackend>,
675        page_size: usize,
676        region_size: Option<u64>,
677        read_cache_size_bytes: usize,
678        write_cache_size_bytes: usize,
679        repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
680    ) -> Result<Self, DatabaseError> {
681        #[cfg(feature = "logging")]
682        let file_path = format!("{:?}", &file);
683        #[cfg(feature = "logging")]
684        info!("Opening database {:?}", &file_path);
685        let mut mem = TransactionalMemory::new(
686            file,
687            page_size,
688            region_size,
689            read_cache_size_bytes,
690            write_cache_size_bytes,
691        )?;
692        if mem.needs_repair()? {
693            #[cfg(feature = "logging")]
694            warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
695            let mut handle = RepairSession::new(0.0);
696            repair_callback(&mut handle);
697            if handle.aborted() {
698                return Err(DatabaseError::RepairAborted);
699            }
700            Self::do_repair(&mut mem, repair_callback)?;
701        }
702
703        mem.begin_writable()?;
704        let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
705
706        let db = Database {
707            mem,
708            next_transaction_id: AtomicTransactionId::new(next_transaction_id),
709            transaction_tracker: Arc::new(Mutex::new(TransactionTracker::new())),
710            live_write_transaction: Mutex::new(None),
711            live_write_transaction_available: Condvar::new(),
712        };
713
714        // Restore the tracker state for any persistent savepoints
715        let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
716        if let Some(next_id) = txn.next_persistent_savepoint_id()? {
717            db.transaction_tracker
718                .lock()
719                .unwrap()
720                .restore_savepoint_counter_state(next_id);
721        }
722        for id in txn.list_persistent_savepoints()? {
723            let savepoint = match txn.get_persistent_savepoint(id) {
724                Ok(savepoint) => savepoint,
725                Err(err) => match err {
726                    SavepointError::InvalidSavepoint => unreachable!(),
727                    SavepointError::Storage(storage) => {
728                        return Err(storage.into());
729                    }
730                },
731            };
732            db.transaction_tracker
733                .lock()
734                .unwrap()
735                .register_persistent_savepoint(&savepoint);
736        }
737        txn.abort()?;
738
739        Ok(db)
740    }
741
742    fn allocate_read_transaction(&self) -> Result<TransactionId> {
743        let mut guard = self.transaction_tracker.lock().unwrap();
744        let id = self.mem.get_last_committed_transaction_id()?;
745        guard.register_read_transaction(id);
746
747        Ok(id)
748    }
749
750    pub(crate) fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
751        let id = self
752            .transaction_tracker
753            .lock()
754            .unwrap()
755            .allocate_savepoint();
756        Ok((id, self.allocate_read_transaction()?))
757    }
758
759    /// Convenience method for [`Builder::new`]
760    pub fn builder() -> Builder {
761        Builder::new()
762    }
763
764    /// Begins a write transaction
765    ///
766    /// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
767    /// write may be in progress at a time. If a write is in progress, this function will block
768    /// until it completes.
769    pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
770        WriteTransaction::new(self, self.transaction_tracker.clone()).map_err(|e| e.into())
771    }
772
773    /// Begins a read transaction
774    ///
775    /// Captures a snapshot of the database, so that only data committed before calling this method
776    /// is visible in the transaction
777    ///
778    /// Returns a [`ReadTransaction`] which may be used to read from the database. Read transactions
779    /// may exist concurrently with writes
780    pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
781        let id = self.allocate_read_transaction()?;
782        #[cfg(feature = "logging")]
783        info!("Beginning read transaction id={:?}", id);
784        Ok(ReadTransaction::new(
785            self.get_memory(),
786            self.transaction_tracker.clone(),
787            id,
788        ))
789    }
790}
791
792pub struct RepairSession {
793    progress: f64,
794    aborted: bool,
795}
796
797impl RepairSession {
798    pub(crate) fn new(progress: f64) -> Self {
799        Self {
800            progress,
801            aborted: false,
802        }
803    }
804
805    pub(crate) fn aborted(&self) -> bool {
806        self.aborted
807    }
808
809    /// Abort the repair process. The coorresponding call to [Builder::open] or [Builder::create] will return an error
810    pub fn abort(&mut self) {
811        self.aborted = true;
812    }
813
814    /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
815    pub fn progress(&self) -> f64 {
816        self.progress
817    }
818}
819
820/// Configuration builder of a redb [Database].
821pub struct Builder {
822    page_size: usize,
823    region_size: Option<u64>,
824    read_cache_size_bytes: usize,
825    write_cache_size_bytes: usize,
826    repair_callback: Box<dyn Fn(&mut RepairSession)>,
827}
828
829impl Builder {
830    /// Construct a new [Builder] with sensible defaults.
831    ///
832    /// ## Defaults
833    ///
834    /// - `cache_size_bytes`: 1GiB
835    #[allow(clippy::new_without_default)]
836    pub fn new() -> Self {
837        let mut result = Self {
838            // Default to 4k pages. Benchmarking showed that this was a good default on all platforms,
839            // including MacOS with 16k pages. Therefore, users are not allowed to configure it at the moment.
840            // It is part of the file format, so can be enabled in the future.
841            page_size: PAGE_SIZE,
842            region_size: None,
843            // TODO: Default should probably take into account the total system memory
844            read_cache_size_bytes: 0,
845            // TODO: Default should probably take into account the total system memory
846            write_cache_size_bytes: 0,
847            repair_callback: Box::new(|_| {}),
848        };
849
850        result.set_cache_size(1024 * 1024 * 1024);
851        result
852    }
853
854    /// Set a callback which will be invoked periodically in the event that the database file needs
855    /// to be repaired.
856    ///
857    /// The [RepairSession] argument can be used to control the repair process.
858    ///
859    /// If the database file needs repair, the callback will be invoked at least once.
860    /// There is no upper limit on the number of times it may be called.
861    pub fn set_repair_callback(
862        &mut self,
863        callback: impl Fn(&mut RepairSession) + 'static,
864    ) -> &mut Self {
865        self.repair_callback = Box::new(callback);
866        self
867    }
868
869    /// Set the internal page size of the database
870    ///
871    /// Valid values are powers of two, greater than or equal to 512
872    ///
873    /// ## Defaults
874    ///
875    /// Default to 4 Kib pages.
876    #[cfg(any(fuzzing, test))]
877    pub fn set_page_size(&mut self, size: usize) -> &mut Self {
878        assert!(size.is_power_of_two());
879        self.page_size = std::cmp::max(size, 512);
880        self
881    }
882
883    /// Set the amount of memory (in bytes) used for caching data
884    pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
885        // TODO: allow dynamic expansion of the read/write cache
886        self.read_cache_size_bytes = bytes / 10 * 9;
887        self.write_cache_size_bytes = bytes / 10;
888        self
889    }
890
891    #[cfg(any(test, fuzzing))]
892    pub fn set_region_size(&mut self, size: u64) -> &mut Self {
893        assert!(size.is_power_of_two());
894        self.region_size = Some(size);
895        self
896    }
897
898    /// Opens the specified file as a redb database.
899    /// * if the file does not exist, or is an empty file, a new database will be initialized in it
900    /// * if the file is a valid redb database, it will be opened
901    /// * otherwise this function will return an error
902    pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
903        let file = OpenOptions::new()
904            .read(true)
905            .write(true)
906            .create(true)
907            .open(path)?;
908
909        Database::new(
910            Box::new(FileBackend::new(file)?),
911            self.page_size,
912            self.region_size,
913            self.read_cache_size_bytes,
914            self.write_cache_size_bytes,
915            &self.repair_callback,
916        )
917    }
918
919    /// Opens an existing redb database.
920    pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
921        let file = OpenOptions::new().read(true).write(true).open(path)?;
922
923        if file.metadata()?.len() == 0 {
924            return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
925        }
926
927        Database::new(
928            Box::new(FileBackend::new(file)?),
929            self.page_size,
930            None,
931            self.read_cache_size_bytes,
932            self.write_cache_size_bytes,
933            &self.repair_callback,
934        )
935    }
936
937    /// Open an existing or create a new database in the given `file`.
938    ///
939    /// The file must be empty or contain a valid database.
940    pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
941        Database::new(
942            Box::new(FileBackend::new(file)?),
943            self.page_size,
944            self.region_size,
945            self.read_cache_size_bytes,
946            self.write_cache_size_bytes,
947            &self.repair_callback,
948        )
949    }
950
951    /// Open an existing or create a new database with the given backend.
952    pub fn create_with_backend(
953        &self,
954        backend: impl StorageBackend,
955    ) -> Result<Database, DatabaseError> {
956        Database::new(
957            Box::new(backend),
958            self.page_size,
959            self.region_size,
960            self.read_cache_size_bytes,
961            self.write_cache_size_bytes,
962            &self.repair_callback,
963        )
964    }
965}
966
967impl std::fmt::Debug for Database {
968    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
969        f.debug_struct("Database").finish()
970    }
971}
972
973#[cfg(test)]
974mod test {
975    use crate::backends::FileBackend;
976    use crate::{
977        Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError,
978        TableDefinition,
979    };
980    use std::io::ErrorKind;
981    #[cfg(not(target_has_atomic = "64"))]
982    use portable_atomic::{AtomicU64, Ordering};
983    #[cfg(target_has_atomic = "64")]
984    use std::sync::atomic::{AtomicU64, Ordering};
985
986    #[derive(Debug)]
987    struct FailingBackend {
988        inner: FileBackend,
989        countdown: AtomicU64,
990    }
991
992    impl FailingBackend {
993        fn new(backend: FileBackend, countdown: u64) -> Self {
994            Self {
995                inner: backend,
996                countdown: AtomicU64::new(countdown),
997            }
998        }
999
1000        fn check_countdown(&self) -> Result<(), std::io::Error> {
1001            if self.countdown.load(Ordering::SeqCst) == 0 {
1002                return Err(std::io::Error::from(ErrorKind::Other));
1003            }
1004
1005            Ok(())
1006        }
1007
1008        fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1009            if self
1010                .countdown
1011                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1012                    if x > 0 {
1013                        Some(x - 1)
1014                    } else {
1015                        None
1016                    }
1017                })
1018                .is_err()
1019            {
1020                return Err(std::io::Error::from(ErrorKind::Other));
1021            }
1022
1023            Ok(())
1024        }
1025    }
1026
1027    impl StorageBackend for FailingBackend {
1028        fn len(&self) -> Result<u64, std::io::Error> {
1029            self.inner.len()
1030        }
1031
1032        fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1033            self.check_countdown()?;
1034            self.inner.read(offset, len)
1035        }
1036
1037        fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1038            self.inner.set_len(len)
1039        }
1040
1041        fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1042            self.check_countdown()?;
1043            self.inner.sync_data(eventual)
1044        }
1045
1046        fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1047            self.decrement_countdown()?;
1048            self.inner.write(offset, data)
1049        }
1050    }
1051
1052    #[test]
1053    fn crash_regression4() {
1054        let tmpfile = crate::create_tempfile();
1055
1056        let backend = FailingBackend::new(
1057            FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(),
1058            23,
1059        );
1060        let db = Database::builder()
1061            .set_cache_size(12686)
1062            .set_page_size(8 * 1024)
1063            .set_region_size(32 * 4096)
1064            .create_with_backend(backend)
1065            .unwrap();
1066
1067        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1068
1069        let tx = db.begin_write().unwrap();
1070        let _savepoint = tx.ephemeral_savepoint().unwrap();
1071        let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1072        tx.commit().unwrap();
1073        let tx = db.begin_write().unwrap();
1074        {
1075            let mut table = tx.open_table(table_def).unwrap();
1076            let _ = table.insert_reserve(118821, 360).unwrap();
1077        }
1078        let result = tx.commit();
1079        assert!(result.is_err());
1080
1081        drop(db);
1082        Database::builder()
1083            .set_cache_size(1024 * 1024)
1084            .set_page_size(8 * 1024)
1085            .set_region_size(32 * 4096)
1086            .create(tmpfile.path())
1087            .unwrap();
1088    }
1089
1090    #[test]
1091    fn small_pages() {
1092        let tmpfile = crate::create_tempfile();
1093
1094        let db = Database::builder()
1095            .set_page_size(512)
1096            .create(tmpfile.path())
1097            .unwrap();
1098
1099        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1100        let txn = db.begin_write().unwrap();
1101        {
1102            txn.open_table(table_definition).unwrap();
1103        }
1104        txn.commit().unwrap();
1105    }
1106
1107    #[test]
1108    fn small_pages2() {
1109        let tmpfile = crate::create_tempfile();
1110
1111        let db = Database::builder()
1112            .set_page_size(512)
1113            .create(tmpfile.path())
1114            .unwrap();
1115
1116        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1117
1118        let mut tx = db.begin_write().unwrap();
1119        tx.set_durability(Durability::Paranoid);
1120        let savepoint0 = tx.ephemeral_savepoint().unwrap();
1121        {
1122            tx.open_table(table_def).unwrap();
1123        }
1124        tx.commit().unwrap();
1125
1126        let mut tx = db.begin_write().unwrap();
1127        tx.set_durability(Durability::Paranoid);
1128        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1129        tx.restore_savepoint(&savepoint0).unwrap();
1130        tx.set_durability(Durability::None);
1131        {
1132            let mut t = tx.open_table(table_def).unwrap();
1133            t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1134            assert!(t.remove(&291295).unwrap().is_none());
1135        }
1136        tx.commit().unwrap();
1137
1138        let mut tx = db.begin_write().unwrap();
1139        tx.set_durability(Durability::Paranoid);
1140        tx.restore_savepoint(&savepoint0).unwrap();
1141        {
1142            tx.open_table(table_def).unwrap();
1143        }
1144        tx.commit().unwrap();
1145
1146        let mut tx = db.begin_write().unwrap();
1147        tx.set_durability(Durability::Paranoid);
1148        let savepoint2 = tx.ephemeral_savepoint().unwrap();
1149        drop(savepoint0);
1150        tx.restore_savepoint(&savepoint2).unwrap();
1151        {
1152            let mut t = tx.open_table(table_def).unwrap();
1153            assert!(t.get(&2059).unwrap().is_none());
1154            assert!(t.remove(&145227).unwrap().is_none());
1155            assert!(t.remove(&145227).unwrap().is_none());
1156        }
1157        tx.commit().unwrap();
1158
1159        let mut tx = db.begin_write().unwrap();
1160        tx.set_durability(Durability::Paranoid);
1161        let savepoint3 = tx.ephemeral_savepoint().unwrap();
1162        drop(savepoint1);
1163        tx.restore_savepoint(&savepoint3).unwrap();
1164        {
1165            tx.open_table(table_def).unwrap();
1166        }
1167        tx.commit().unwrap();
1168
1169        let mut tx = db.begin_write().unwrap();
1170        tx.set_durability(Durability::Paranoid);
1171        let savepoint4 = tx.ephemeral_savepoint().unwrap();
1172        drop(savepoint2);
1173        tx.restore_savepoint(&savepoint3).unwrap();
1174        tx.set_durability(Durability::None);
1175        {
1176            let mut t = tx.open_table(table_def).unwrap();
1177            assert!(t.remove(&207936).unwrap().is_none());
1178        }
1179        tx.abort().unwrap();
1180
1181        let mut tx = db.begin_write().unwrap();
1182        tx.set_durability(Durability::Paranoid);
1183        let savepoint5 = tx.ephemeral_savepoint().unwrap();
1184        drop(savepoint3);
1185        assert!(tx.restore_savepoint(&savepoint4).is_err());
1186        {
1187            tx.open_table(table_def).unwrap();
1188        }
1189        tx.commit().unwrap();
1190
1191        let mut tx = db.begin_write().unwrap();
1192        tx.set_durability(Durability::Paranoid);
1193        tx.restore_savepoint(&savepoint5).unwrap();
1194        tx.set_durability(Durability::None);
1195        {
1196            tx.open_table(table_def).unwrap();
1197        }
1198        tx.commit().unwrap();
1199    }
1200
1201    #[test]
1202    fn small_pages3() {
1203        let tmpfile = crate::create_tempfile();
1204
1205        let db = Database::builder()
1206            .set_page_size(1024)
1207            .create(tmpfile.path())
1208            .unwrap();
1209
1210        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1211
1212        let mut tx = db.begin_write().unwrap();
1213        let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1214        tx.set_durability(Durability::None);
1215        {
1216            let mut t = tx.open_table(table_def).unwrap();
1217            let value = vec![0; 306];
1218            t.insert(&539717, value.as_slice()).unwrap();
1219        }
1220        tx.abort().unwrap();
1221
1222        let mut tx = db.begin_write().unwrap();
1223        let savepoint1 = tx.ephemeral_savepoint().unwrap();
1224        tx.restore_savepoint(&savepoint1).unwrap();
1225        tx.set_durability(Durability::None);
1226        {
1227            let mut t = tx.open_table(table_def).unwrap();
1228            let value = vec![0; 2008];
1229            t.insert(&784384, value.as_slice()).unwrap();
1230        }
1231        tx.abort().unwrap();
1232    }
1233
1234    #[test]
1235    fn small_pages4() {
1236        let tmpfile = crate::create_tempfile();
1237
1238        let db = Database::builder()
1239            .set_cache_size(1024 * 1024)
1240            .set_page_size(1024)
1241            .create(tmpfile.path())
1242            .unwrap();
1243
1244        let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1245
1246        let tx = db.begin_write().unwrap();
1247        {
1248            tx.open_table(table_def).unwrap();
1249        }
1250        tx.commit().unwrap();
1251
1252        let tx = db.begin_write().unwrap();
1253        {
1254            let mut t = tx.open_table(table_def).unwrap();
1255            assert!(t.get(&131072).unwrap().is_none());
1256            let value = vec![0xFF; 1130];
1257            t.insert(&42394, value.as_slice()).unwrap();
1258            t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1259            assert!(t.get(&0).unwrap().is_none());
1260        }
1261        tx.abort().unwrap();
1262
1263        let tx = db.begin_write().unwrap();
1264        {
1265            let mut t = tx.open_table(table_def).unwrap();
1266            t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1267        }
1268        tx.abort().unwrap();
1269    }
1270
1271    #[test]
1272    fn dynamic_shrink() {
1273        let tmpfile = crate::create_tempfile();
1274        let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1275        let big_value = vec![0u8; 1024];
1276
1277        let db = Database::builder()
1278            .set_region_size(1024 * 1024)
1279            .create(tmpfile.path())
1280            .unwrap();
1281
1282        let txn = db.begin_write().unwrap();
1283        {
1284            let mut table = txn.open_table(table_definition).unwrap();
1285            for i in 0..2048 {
1286                table.insert(&i, big_value.as_slice()).unwrap();
1287            }
1288        }
1289        txn.commit().unwrap();
1290
1291        let file_size = tmpfile.as_file().metadata().unwrap().len();
1292
1293        let txn = db.begin_write().unwrap();
1294        {
1295            let mut table = txn.open_table(table_definition).unwrap();
1296            for i in 0..2048 {
1297                table.remove(&i).unwrap();
1298            }
1299        }
1300        txn.commit().unwrap();
1301
1302        // Perform a couple more commits to be sure the database has a chance to compact
1303        let txn = db.begin_write().unwrap();
1304        {
1305            let mut table = txn.open_table(table_definition).unwrap();
1306            table.insert(0, [].as_slice()).unwrap();
1307        }
1308        txn.commit().unwrap();
1309        let txn = db.begin_write().unwrap();
1310        {
1311            let mut table = txn.open_table(table_definition).unwrap();
1312            table.remove(0).unwrap();
1313        }
1314        txn.commit().unwrap();
1315        let txn = db.begin_write().unwrap();
1316        txn.commit().unwrap();
1317
1318        let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1319        assert!(final_file_size < file_size);
1320    }
1321
1322    #[test]
1323    fn create_new_db_in_empty_file() {
1324        let tmpfile = crate::create_tempfile();
1325
1326        let _db = Database::builder()
1327            .create_file(tmpfile.into_file())
1328            .unwrap();
1329    }
1330
1331    #[test]
1332    fn open_missing_file() {
1333        let tmpfile = crate::create_tempfile();
1334
1335        let err = Database::builder()
1336            .open(tmpfile.path().with_extension("missing"))
1337            .unwrap_err();
1338
1339        match err {
1340            DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1341            err => panic!("Unexpected error for empty file: {err}"),
1342        }
1343    }
1344
1345    #[test]
1346    fn open_empty_file() {
1347        let tmpfile = crate::create_tempfile();
1348
1349        let err = Database::builder().open(tmpfile.path()).unwrap_err();
1350
1351        match err {
1352            DatabaseError::Storage(StorageError::Io(err))
1353                if err.kind() == ErrorKind::InvalidData => {}
1354            err => panic!("Unexpected error for empty file: {err}"),
1355        }
1356    }
1357}