redb_32bit/
transactions.rs

1use crate::error::CommitError;
2use crate::multimap_table::ReadOnlyUntypedMultimapTable;
3use crate::sealed::Sealed;
4use crate::table::ReadOnlyUntypedTable;
5use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
6use crate::tree_store::{
7    Btree, BtreeMut, Checksum, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint,
8    PageNumber, SerializedSavepoint, TableTree, TableType, TransactionalMemory, MAX_VALUE_LENGTH,
9};
10use crate::types::{RedbKey, RedbValue};
11use crate::{
12    AccessGuard, Database, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
13    ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
14    TableDefinition, TableError, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
15};
16#[cfg(feature = "logging")]
17use log::{info, warn};
18use std::borrow::Borrow;
19use std::cmp::min;
20use std::collections::{HashMap, HashSet};
21use std::fmt::{Display, Formatter};
22use std::marker::PhantomData;
23use std::ops::{RangeBounds, RangeFull};
24#[cfg(not(target_has_atomic = "64"))]
25use portable_atomic::{AtomicBool, Ordering};
26#[cfg(target_has_atomic = "64")]
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::{Arc, Mutex};
29use std::{panic, thread};
30
31const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
32    SystemTableDefinition::new("next_savepoint_id");
33pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
34    SystemTableDefinition::new("persistent_savepoints");
35
36pub struct SystemTableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
37    name: &'a str,
38    _key_type: PhantomData<K>,
39    _value_type: PhantomData<V>,
40}
41
42impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> SystemTableDefinition<'a, K, V> {
43    pub const fn new(name: &'a str) -> Self {
44        assert!(!name.is_empty());
45        Self {
46            name,
47            _key_type: PhantomData,
48            _value_type: PhantomData,
49        }
50    }
51}
52
53impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle
54    for SystemTableDefinition<'a, K, V>
55{
56    fn name(&self) -> &str {
57        self.name
58    }
59}
60
61impl<K: RedbKey, V: RedbValue> Sealed for SystemTableDefinition<'_, K, V> {}
62
63impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for SystemTableDefinition<'a, K, V> {
64    fn clone(&self) -> Self {
65        *self
66    }
67}
68
69impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for SystemTableDefinition<'a, K, V> {}
70
71impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for SystemTableDefinition<'a, K, V> {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        write!(
74            f,
75            "{}<{}, {}>",
76            self.name,
77            K::type_name().name(),
78            V::type_name().name()
79        )
80    }
81}
82
83/// Informational storage stats about the database
84#[derive(Debug)]
85pub struct DatabaseStats {
86    pub(crate) tree_height: u32,
87    pub(crate) allocated_pages: u64,
88    pub(crate) leaf_pages: u64,
89    pub(crate) branch_pages: u64,
90    pub(crate) stored_leaf_bytes: u64,
91    pub(crate) metadata_bytes: u64,
92    pub(crate) fragmented_bytes: u64,
93    pub(crate) page_size: usize,
94}
95
96impl DatabaseStats {
97    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
98    pub fn tree_height(&self) -> u32 {
99        self.tree_height
100    }
101
102    /// Number of pages allocated
103    pub fn allocated_pages(&self) -> u64 {
104        self.allocated_pages
105    }
106
107    /// Number of leaf pages that store user data
108    pub fn leaf_pages(&self) -> u64 {
109        self.leaf_pages
110    }
111
112    /// Number of branch pages in btrees that store user data
113    pub fn branch_pages(&self) -> u64 {
114        self.branch_pages
115    }
116
117    /// Number of bytes consumed by keys and values that have been inserted.
118    /// Does not include indexing overhead
119    pub fn stored_bytes(&self) -> u64 {
120        self.stored_leaf_bytes
121    }
122
123    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
124    pub fn metadata_bytes(&self) -> u64 {
125        self.metadata_bytes
126    }
127
128    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
129    pub fn fragmented_bytes(&self) -> u64 {
130        self.fragmented_bytes
131    }
132
133    /// Number of bytes per page
134    pub fn page_size(&self) -> usize {
135        self.page_size
136    }
137}
138
139#[derive(Copy, Clone, Debug)]
140#[non_exhaustive]
141pub enum Durability {
142    /// Commits with this durability level will not be persisted to disk unless followed by a
143    /// commit with a higher durability level.
144    ///
145    /// Note: Pages are only freed during commits with higher durability levels. Exclusively using
146    /// this function may result in Error::OutOfSpace.
147    None,
148    /// Commits with this durability level have been queued for persitance to disk, and should be
149    /// persistent some time after [WriteTransaction::commit] returns.
150    Eventual,
151    /// Commits with this durability level are guaranteed to be persistent as soon as
152    /// [WriteTransaction::commit] returns.
153    ///
154    /// Data is written with checksums, with the following commit algorithm:
155    ///
156    /// 1. Update the inactive commit slot with the new database state
157    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
158    /// 3. Call `fsync` to ensure all writes have been persisted to disk
159    ///
160    /// When opening the database after a crash, the most recent of the two commit slots with a
161    /// valid checksum is used.
162    ///
163    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
164    /// function with close to perfect collision resistance when used with non-malicious input. An
165    /// attacker with an extremely high degree of control over the database's workload, including
166    /// the ability to cause the database process to crash, can cause invalid data to be written
167    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
168    Immediate,
169    /// Commits with this durability level have the same gaurantees as [Durability::Immediate]
170    ///
171    /// Additionally, aata is written with the following 2-phase commit algorithm:
172    ///
173    /// 1. Update the inactive commit slot with the new database state
174    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
175    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
176    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
177    ///
178    /// This mitigates a theoretical attack where an attacker who
179    /// 1. can control the order in which pages are flushed to disk
180    /// 2. can introduce crashes during fsync(),
181    /// 3. has knowledge of the database file contents, and
182    /// 4. can include arbitrary data in a write transaction
183    /// could cause a transaction to partially commit (some but not all of the data is written).
184    /// This is described in the design doc in futher detail.
185    ///
186    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
187    /// has been persisted to disk after calling `fsync`. Even with this commit level, an attacker
188    /// with a high degree of control over the database's workload, including the ability to cause
189    /// the database process to crash, can cause the database to crash with the god byte primary bit
190    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-controlled state.
191    Paranoid,
192}
193
194// Like a Table but only one may be open at a time to avoid possible races
195pub struct SystemTable<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> {
196    name: String,
197    namespace: &'s mut SystemNamespace<'db>,
198    tree: BtreeMut<'s, K, V>,
199}
200
201impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> SystemTable<'db, 's, K, V> {
202    fn new(
203        name: &str,
204        table_root: Option<(PageNumber, Checksum)>,
205        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
206        mem: &'db TransactionalMemory,
207        namespace: &'s mut SystemNamespace<'db>,
208    ) -> SystemTable<'db, 's, K, V> {
209        SystemTable {
210            name: name.to_string(),
211            namespace,
212            tree: BtreeMut::new(table_root, mem, freed_pages),
213        }
214    }
215
216    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
217    where
218        K: 'a,
219    {
220        self.tree.get(key.borrow())
221    }
222
223    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
224    where
225        K: 'a,
226        KR: Borrow<K::SelfType<'a>> + 'a,
227    {
228        self.tree.range(&range).map(Range::new)
229    }
230
231    pub fn insert<'k, 'v>(
232        &mut self,
233        key: impl Borrow<K::SelfType<'k>>,
234        value: impl Borrow<V::SelfType<'v>>,
235    ) -> Result<Option<AccessGuard<V>>> {
236        let value_len = V::as_bytes(value.borrow()).as_ref().len();
237        if value_len > MAX_VALUE_LENGTH {
238            return Err(StorageError::ValueTooLarge(value_len));
239        }
240        let key_len = K::as_bytes(key.borrow()).as_ref().len();
241        if key_len > MAX_VALUE_LENGTH {
242            return Err(StorageError::ValueTooLarge(key_len));
243        }
244        self.tree.insert(key.borrow(), value.borrow())
245    }
246
247    pub fn remove<'a>(
248        &mut self,
249        key: impl Borrow<K::SelfType<'a>>,
250    ) -> Result<Option<AccessGuard<V>>>
251    where
252        K: 'a,
253    {
254        self.tree.remove(key.borrow())
255    }
256}
257
258impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> Drop for SystemTable<'db, 's, K, V> {
259    fn drop(&mut self) {
260        self.namespace.close_table(&self.name, &self.tree);
261    }
262}
263
264struct SystemNamespace<'db> {
265    table_tree: TableTree<'db>,
266}
267
268impl<'db> SystemNamespace<'db> {
269    fn open_system_table<'txn, 's, K: RedbKey + 'static, V: RedbValue + 'static>(
270        &'s mut self,
271        transaction: &'txn WriteTransaction<'db>,
272        definition: SystemTableDefinition<K, V>,
273    ) -> Result<SystemTable<'db, 's, K, V>> {
274        #[cfg(feature = "logging")]
275        info!("Opening system table: {}", definition);
276        let root = self
277            .table_tree
278            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
279            .map_err(|e| {
280                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
281            })?;
282        transaction.dirty.store(true, Ordering::Release);
283
284        Ok(SystemTable::new(
285            definition.name(),
286            root.get_root(),
287            transaction.freed_pages.clone(),
288            transaction.mem,
289            self,
290        ))
291    }
292
293    fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
294        &mut self,
295        name: &str,
296        table: &BtreeMut<K, V>,
297    ) {
298        self.table_tree
299            .stage_update_table_root(name, table.get_root());
300    }
301}
302
303struct TableNamespace<'db> {
304    open_tables: HashMap<String, &'static panic::Location<'static>>,
305    table_tree: TableTree<'db>,
306}
307
308impl<'db> TableNamespace<'db> {
309    #[track_caller]
310    fn inner_open<K: RedbKey + 'static, V: RedbValue + 'static>(
311        &mut self,
312        name: &str,
313        table_type: TableType,
314    ) -> Result<Option<(PageNumber, Checksum)>, TableError> {
315        if let Some(location) = self.open_tables.get(name) {
316            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
317        }
318
319        let internal_table = self
320            .table_tree
321            .get_or_create_table::<K, V>(name, table_type)?;
322        self.open_tables
323            .insert(name.to_string(), panic::Location::caller());
324
325        Ok(internal_table.get_root())
326    }
327
328    #[track_caller]
329    pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>(
330        &mut self,
331        transaction: &'txn WriteTransaction<'db>,
332        definition: MultimapTableDefinition<K, V>,
333    ) -> Result<MultimapTable<'db, 'txn, K, V>, TableError> {
334        #[cfg(feature = "logging")]
335        info!("Opening multimap table: {}", definition);
336        let root = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
337        transaction.dirty.store(true, Ordering::Release);
338
339        Ok(MultimapTable::new(
340            definition.name(),
341            root,
342            transaction.freed_pages.clone(),
343            transaction.mem,
344            transaction,
345        ))
346    }
347
348    #[track_caller]
349    pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>(
350        &mut self,
351        transaction: &'txn WriteTransaction<'db>,
352        definition: TableDefinition<K, V>,
353    ) -> Result<Table<'db, 'txn, K, V>, TableError> {
354        #[cfg(feature = "logging")]
355        info!("Opening table: {}", definition);
356        let root = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
357        transaction.dirty.store(true, Ordering::Release);
358
359        Ok(Table::new(
360            definition.name(),
361            root,
362            transaction.freed_pages.clone(),
363            transaction.mem,
364            transaction,
365        ))
366    }
367
368    #[track_caller]
369    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
370        if let Some(location) = self.open_tables.get(name) {
371            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
372        }
373
374        self.table_tree.delete_table(name, table_type)
375    }
376
377    #[track_caller]
378    fn delete_table<'txn>(
379        &mut self,
380        transaction: &'txn WriteTransaction<'db>,
381        name: &str,
382    ) -> Result<bool, TableError> {
383        #[cfg(feature = "logging")]
384        info!("Deleting table: {}", name);
385        transaction.dirty.store(true, Ordering::Release);
386        self.inner_delete(name, TableType::Normal)
387    }
388
389    #[track_caller]
390    fn delete_multimap_table<'txn>(
391        &mut self,
392        transaction: &'txn WriteTransaction<'db>,
393        name: &str,
394    ) -> Result<bool, TableError> {
395        #[cfg(feature = "logging")]
396        info!("Deleting multimap table: {}", name);
397        transaction.dirty.store(true, Ordering::Release);
398        self.inner_delete(name, TableType::Multimap)
399    }
400
401    pub(crate) fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
402        &mut self,
403        name: &str,
404        table: &BtreeMut<K, V>,
405    ) {
406        self.open_tables.remove(name).unwrap();
407        self.table_tree
408            .stage_update_table_root(name, table.get_root());
409    }
410}
411
412/// A read/write transaction
413///
414/// Only a single [`WriteTransaction`] may exist at a time
415pub struct WriteTransaction<'db> {
416    db: &'db Database,
417    transaction_tracker: Arc<Mutex<TransactionTracker>>,
418    mem: &'db TransactionalMemory,
419    transaction_id: TransactionId,
420    // The table of freed pages by transaction. FreedTableKey -> binary.
421    // The binary blob is a length-prefixed array of PageNumber
422    freed_tree: Mutex<BtreeMut<'db, FreedTableKey, FreedPageList<'static>>>,
423    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
424    // Pages that were freed from the freed-tree. These can be freed immediately after commit(),
425    // since read transactions do not access the freed-tree
426    post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
427    tables: Mutex<TableNamespace<'db>>,
428    system_tables: Mutex<SystemNamespace<'db>>,
429    completed: bool,
430    dirty: AtomicBool,
431    durability: Durability,
432    // Persistent savepoints created during this transaction
433    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
434    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
435}
436
437impl<'db> WriteTransaction<'db> {
438    pub(crate) fn new(
439        db: &'db Database,
440        transaction_tracker: Arc<Mutex<TransactionTracker>>,
441    ) -> Result<Self> {
442        let transaction_id = db.start_write_transaction();
443
444        let root_page = db.get_memory().get_data_root();
445        let system_page = db.get_memory().get_system_root();
446        let freed_root = db.get_memory().get_freed_root();
447        let freed_pages = Arc::new(Mutex::new(vec![]));
448        let post_commit_frees = Arc::new(Mutex::new(vec![]));
449
450        let tables = TableNamespace {
451            open_tables: Default::default(),
452            table_tree: TableTree::new(root_page, db.get_memory(), freed_pages.clone()),
453        };
454        let system_tables = SystemNamespace {
455            table_tree: TableTree::new(system_page, db.get_memory(), freed_pages.clone()),
456        };
457
458        Ok(Self {
459            db,
460            transaction_tracker,
461            mem: db.get_memory(),
462            transaction_id,
463            tables: Mutex::new(tables),
464            system_tables: Mutex::new(system_tables),
465            freed_tree: Mutex::new(BtreeMut::new(
466                freed_root,
467                db.get_memory(),
468                post_commit_frees.clone(),
469            )),
470            freed_pages,
471            post_commit_frees,
472            completed: false,
473            dirty: AtomicBool::new(false),
474            durability: Durability::Immediate,
475            created_persistent_savepoints: Mutex::new(Default::default()),
476            deleted_persistent_savepoints: Mutex::new(vec![]),
477        })
478    }
479
480    /// Creates a snapshot of the current database state, which can be used to rollback the database.
481    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
482    ///
483    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
484    /// Therefore, the lifetime of a savepoint should be minimized.
485    ///
486    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
487    /// or if the transaction's durability is less than `[Durability::Immediate]`
488    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
489        if !matches!(
490            self.durability,
491            Durability::Immediate | Durability::Paranoid
492        ) {
493            return Err(SavepointError::InvalidSavepoint);
494        }
495
496        let mut savepoint = self.ephemeral_savepoint()?;
497
498        let mut system_tables = self.system_tables.lock().unwrap();
499
500        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
501        next_table.insert((), savepoint.get_id().next())?;
502        drop(next_table);
503
504        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
505        savepoint_table.insert(
506            savepoint.get_id(),
507            SerializedSavepoint::from_savepoint(&savepoint),
508        )?;
509
510        savepoint.set_persistent();
511
512        self.created_persistent_savepoints
513            .lock()
514            .unwrap()
515            .insert(savepoint.get_id());
516
517        Ok(savepoint.get_id().0)
518    }
519
520    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
521        let mut system_tables = self.system_tables.lock().unwrap();
522        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
523        let value = next_table.get(())?;
524        if let Some(next_id) = value {
525            Ok(Some(next_id.value()))
526        } else {
527            Ok(None)
528        }
529    }
530
531    /// Get a persistent savepoint given its id
532    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
533        let mut system_tables = self.system_tables.lock().unwrap();
534        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
535        let value = table.get(SavepointId(id))?;
536
537        value
538            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
539            .ok_or(SavepointError::InvalidSavepoint)
540    }
541
542    /// Delete the given persistent savepoint.
543    ///
544    /// Note that if the transaction is abort()'ed this deletion will be rolled back.
545    ///
546    /// Returns `true` if the savepoint existed
547    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
548    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
549        if !matches!(
550            self.durability,
551            Durability::Immediate | Durability::Paranoid
552        ) {
553            return Err(SavepointError::InvalidSavepoint);
554        }
555        let mut system_tables = self.system_tables.lock().unwrap();
556        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
557        let savepoint = table.remove(SavepointId(id))?;
558        if let Some(serialized) = savepoint {
559            let savepoint = serialized
560                .value()
561                .to_savepoint(self.transaction_tracker.clone());
562            self.deleted_persistent_savepoints
563                .lock()
564                .unwrap()
565                .push((savepoint.get_id(), savepoint.get_transaction_id()));
566            Ok(true)
567        } else {
568            Ok(false)
569        }
570    }
571
572    /// List all persistent savepoints
573    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
574        let mut system_tables = self.system_tables.lock().unwrap();
575        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
576        let mut savepoints = vec![];
577        for savepoint in table.range::<SavepointId>(..)? {
578            savepoints.push(savepoint?.0.value().0);
579        }
580        Ok(savepoints.into_iter())
581    }
582
583    /// Creates a snapshot of the current database state, which can be used to rollback the database
584    ///
585    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
586    ///
587    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
588    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
589        if self.dirty.load(Ordering::Acquire) {
590            return Err(SavepointError::InvalidSavepoint);
591        }
592
593        let (id, transaction_id) = self.db.allocate_savepoint()?;
594        #[cfg(feature = "logging")]
595        info!(
596            "Creating savepoint id={:?}, txn_id={:?}",
597            id, transaction_id
598        );
599
600        let regional_allocators = self.mem.get_raw_allocator_states();
601        let root = self.mem.get_data_root();
602        let system_root = self.mem.get_system_root();
603        let freed_root = self.mem.get_freed_root();
604        let savepoint = Savepoint::new_ephemeral(
605            self.db.get_memory(),
606            self.transaction_tracker.clone(),
607            id,
608            transaction_id,
609            root,
610            system_root,
611            freed_root,
612            regional_allocators,
613        );
614
615        Ok(savepoint)
616    }
617
618    /// Restore the state of the database to the given [`Savepoint`]
619    ///
620    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
621    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
622        // Ensure that user does not try to restore a Savepoint that is from a different Database
623        assert_eq!(
624            self.transaction_tracker.as_ref() as *const _,
625            savepoint.db_address()
626        );
627
628        if !self
629            .transaction_tracker
630            .lock()
631            .unwrap()
632            .is_valid_savepoint(savepoint.get_id())
633        {
634            return Err(SavepointError::InvalidSavepoint);
635        }
636        #[cfg(feature = "logging")]
637        info!(
638            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
639            savepoint.get_id(),
640            self.transaction_id
641        );
642        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
643        // the database
644        assert_eq!(self.db.get_memory().get_version(), savepoint.get_version());
645        self.dirty.store(true, Ordering::Release);
646
647        let allocated_since_savepoint = self
648            .mem
649            .pages_allocated_since_raw_state(savepoint.get_regional_allocator_states());
650
651        // We don't want to rollback the system tree, so keep any pages it references
652        let referenced_by_system_tree = self
653            .system_tables
654            .lock()
655            .unwrap()
656            .table_tree
657            .all_referenced_pages()?;
658
659        let mut freed_pages = vec![];
660        for page in allocated_since_savepoint {
661            if referenced_by_system_tree.contains(&page) {
662                continue;
663            }
664            if self.mem.uncommitted(page) {
665                self.mem.free(page);
666            } else {
667                freed_pages.push(page);
668            }
669        }
670        *self.freed_pages.lock().unwrap() = freed_pages;
671        self.tables.lock().unwrap().table_tree = TableTree::new(
672            savepoint.get_user_root(),
673            self.mem,
674            self.freed_pages.clone(),
675        );
676
677        // Remove any freed pages that have already been processed. Otherwise this would result in a double free
678        // We assume below that PageNumber is length 8
679        let oldest_unprocessed_transaction = if let Some(entry) = self
680            .freed_tree
681            .lock()
682            .unwrap()
683            .range::<RangeFull, FreedTableKey>(&(..))?
684            .next()
685        {
686            entry?.key().transaction_id
687        } else {
688            self.transaction_id.raw_id()
689        };
690
691        let mut freed_tree = BtreeMut::new(
692            savepoint.get_freed_root(),
693            self.mem,
694            self.post_commit_frees.clone(),
695        );
696        let lookup_key = FreedTableKey {
697            transaction_id: oldest_unprocessed_transaction,
698            pagination_id: 0,
699        };
700        let mut to_remove = vec![];
701        for entry in freed_tree.range(&(..lookup_key))? {
702            to_remove.push(entry?.key());
703        }
704        for key in to_remove {
705            freed_tree.remove(&key)?;
706        }
707
708        *self.freed_tree.lock().unwrap() = freed_tree;
709
710        // Invalidate all savepoints that are newer than the one being applied to prevent the user
711        // from later trying to restore a savepoint "on another timeline"
712        self.transaction_tracker
713            .lock()
714            .unwrap()
715            .invalidate_savepoints_after(savepoint.get_id());
716
717        for persistent_savepoint in self.list_persistent_savepoints()? {
718            if persistent_savepoint > savepoint.get_id().0 {
719                self.delete_persistent_savepoint(persistent_savepoint)?;
720            }
721        }
722
723        Ok(())
724    }
725
726    /// Set the desired durability level for writes made in this transaction
727    /// Defaults to [`Durability::Immediate`]
728    ///
729    /// Will panic if the durability is reduced below `[Durability::Immediate]` after a persistent savepoint has been created or deleted.
730    pub fn set_durability(&mut self, durability: Durability) {
731        let no_created = self
732            .created_persistent_savepoints
733            .lock()
734            .unwrap()
735            .is_empty();
736        let no_deleted = self
737            .deleted_persistent_savepoints
738            .lock()
739            .unwrap()
740            .is_empty();
741        assert!(no_created && no_deleted);
742        self.durability = durability;
743    }
744
745    /// Open the given table
746    ///
747    /// The table will be created if it does not exist
748    #[track_caller]
749    pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>(
750        &'txn self,
751        definition: TableDefinition<K, V>,
752    ) -> Result<Table<'db, 'txn, K, V>, TableError> {
753        self.tables.lock().unwrap().open_table(self, definition)
754    }
755
756    /// Open the given table
757    ///
758    /// The table will be created if it does not exist
759    #[track_caller]
760    pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>(
761        &'txn self,
762        definition: MultimapTableDefinition<K, V>,
763    ) -> Result<MultimapTable<'db, 'txn, K, V>, TableError> {
764        self.tables
765            .lock()
766            .unwrap()
767            .open_multimap_table(self, definition)
768    }
769
770    pub(crate) fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
771        &self,
772        name: &str,
773        table: &BtreeMut<K, V>,
774    ) {
775        self.tables.lock().unwrap().close_table(name, table);
776    }
777
778    /// Delete the given table
779    ///
780    /// Returns a bool indicating whether the table existed
781    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
782        let name = definition.name().to_string();
783        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
784        drop(definition);
785        self.tables.lock().unwrap().delete_table(self, &name)
786    }
787
788    /// Delete the given table
789    ///
790    /// Returns a bool indicating whether the table existed
791    pub fn delete_multimap_table(
792        &self,
793        definition: impl MultimapTableHandle,
794    ) -> Result<bool, TableError> {
795        let name = definition.name().to_string();
796        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
797        drop(definition);
798        self.tables
799            .lock()
800            .unwrap()
801            .delete_multimap_table(self, &name)
802    }
803
804    /// List all the tables
805    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
806        self.tables
807            .lock()
808            .unwrap()
809            .table_tree
810            .list_tables(TableType::Normal)
811            .map(|x| x.into_iter().map(UntypedTableHandle::new))
812    }
813
814    /// List all the multimap tables
815    pub fn list_multimap_tables(
816        &self,
817    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
818        self.tables
819            .lock()
820            .unwrap()
821            .table_tree
822            .list_tables(TableType::Multimap)
823            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
824    }
825
826    /// Commit the transaction
827    ///
828    /// All writes performed in this transaction will be visible to future transactions, and are
829    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
830    pub fn commit(mut self) -> Result<(), CommitError> {
831        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
832        self.completed = true;
833        self.commit_inner()
834    }
835
836    fn commit_inner(&mut self) -> Result<(), CommitError> {
837        #[cfg(feature = "logging")]
838        info!(
839            "Committing transaction id={:?} with durability={:?}",
840            self.transaction_id, self.durability
841        );
842        match self.durability {
843            Durability::None => self.non_durable_commit()?,
844            Durability::Eventual => self.durable_commit(true, false)?,
845            Durability::Immediate => self.durable_commit(false, false)?,
846            Durability::Paranoid => self.durable_commit(false, true)?,
847        }
848
849        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
850            self.transaction_tracker
851                .lock()
852                .unwrap()
853                .deallocate_savepoint(*savepoint, *transaction);
854        }
855
856        #[cfg(feature = "logging")]
857        info!(
858            "Finished commit of transaction id={:?}",
859            self.transaction_id
860        );
861
862        Ok(())
863    }
864
865    /// Abort the transaction
866    ///
867    /// All writes performed in this transaction will be rolled back
868    pub fn abort(mut self) -> Result {
869        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
870        self.completed = true;
871        self.abort_inner()
872    }
873
874    fn abort_inner(&mut self) -> Result {
875        #[cfg(feature = "logging")]
876        info!("Aborting transaction id={:?}", self.transaction_id);
877        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
878            match self.delete_persistent_savepoint(savepoint.0) {
879                Ok(_) => {}
880                Err(err) => match err {
881                    SavepointError::InvalidSavepoint => {
882                        unreachable!();
883                    }
884                    SavepointError::Storage(storage_err) => {
885                        return Err(storage_err);
886                    }
887                },
888            }
889        }
890        self.tables
891            .lock()
892            .unwrap()
893            .table_tree
894            .clear_table_root_updates();
895        self.mem.rollback_uncommitted_writes()?;
896        #[cfg(feature = "logging")]
897        info!("Finished abort of transaction id={:?}", self.transaction_id);
898        Ok(())
899    }
900
901    pub(crate) fn durable_commit(&mut self, eventual: bool, two_phase: bool) -> Result {
902        let oldest_live_read = self
903            .transaction_tracker
904            .lock()
905            .unwrap()
906            .oldest_live_read_transaction()
907            .unwrap_or(self.transaction_id);
908
909        let user_root = self
910            .tables
911            .lock()
912            .unwrap()
913            .table_tree
914            .flush_table_root_updates()?;
915
916        let system_root = self
917            .system_tables
918            .lock()
919            .unwrap()
920            .table_tree
921            .flush_table_root_updates()?;
922
923        self.process_freed_pages(oldest_live_read)?;
924        // If a savepoint exists it might reference the freed-tree, since it holds a reference to the
925        // root of the freed-tree. Therefore, we must use the transactional free mechanism to free
926        // those pages. If there are no save points then these can be immediately freed, which is
927        // done at the end of this function.
928        let savepoint_exists = self
929            .transaction_tracker
930            .lock()
931            .unwrap()
932            .any_savepoint_exists();
933        self.store_freed_pages(savepoint_exists)?;
934
935        // Finalize freed table checksums, before doing the final commit
936        // user & system table trees were already finalized when we flushed the pending roots above
937        self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
938
939        let freed_root = self.freed_tree.lock().unwrap().get_root();
940
941        self.mem.commit(
942            user_root,
943            system_root,
944            freed_root,
945            self.transaction_id,
946            eventual,
947            two_phase,
948        )?;
949
950        // Mark any pending non-durable commits as fully committed.
951        self.transaction_tracker
952            .lock()
953            .unwrap()
954            .clear_pending_non_durable_commits();
955
956        // Immediately free the pages that were freed from the freed-tree itself. These are only
957        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
958        for page in self.post_commit_frees.lock().unwrap().drain(..) {
959            self.mem.free(page);
960        }
961
962        Ok(())
963    }
964
965    // Commit without a durability guarantee
966    pub(crate) fn non_durable_commit(&mut self) -> Result {
967        let user_root = self
968            .tables
969            .lock()
970            .unwrap()
971            .table_tree
972            .flush_table_root_updates()?;
973
974        let system_root = self
975            .system_tables
976            .lock()
977            .unwrap()
978            .table_tree
979            .flush_table_root_updates()?;
980
981        // Store all freed pages for a future commit(), since we can't free pages during a
982        // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
983        self.store_freed_pages(true)?;
984
985        // Finalize all checksums, before doing the final commit
986        self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
987
988        let freed_root = self.freed_tree.lock().unwrap().get_root();
989
990        self.mem
991            .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
992        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
993        // are only processed after this has been persisted
994        self.transaction_tracker
995            .lock()
996            .unwrap()
997            .register_non_durable_commit(self.transaction_id);
998        Ok(())
999    }
1000
1001    // Relocate pages to lower number regions/pages
1002    // Returns true if a page(s) was moved
1003    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1004        let mut progress = false;
1005        // Relocate the region tracker page
1006        if self.mem.relocate_region_tracker()? {
1007            progress = true;
1008        }
1009
1010        // Relocate the btree pages
1011        let mut tables = self.tables.lock().unwrap();
1012        let table_tree = &mut tables.table_tree;
1013        if table_tree.compact_tables()? {
1014            progress = true;
1015        }
1016
1017        Ok(progress)
1018    }
1019
1020    // NOTE: must be called before store_freed_pages() during commit, since this can create
1021    // more pages freed by the current transaction
1022    fn process_freed_pages(&mut self, oldest_live_read: TransactionId) -> Result {
1023        // We assume below that PageNumber is length 8
1024        assert_eq!(PageNumber::serialized_size(), 8);
1025        let lookup_key = FreedTableKey {
1026            transaction_id: oldest_live_read.raw_id(),
1027            pagination_id: 0,
1028        };
1029
1030        let mut to_remove = vec![];
1031        let mut freed_tree = self.freed_tree.lock().unwrap();
1032        for entry in freed_tree.range(&(..lookup_key))? {
1033            let entry = entry?;
1034            to_remove.push(entry.key());
1035            let value = entry.value();
1036            for i in 0..value.len() {
1037                self.mem.free(value.get(i));
1038            }
1039        }
1040
1041        // Remove all the old transactions
1042        for key in to_remove {
1043            freed_tree.remove(&key)?;
1044        }
1045
1046        Ok(())
1047    }
1048
1049    fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result {
1050        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
1051
1052        let mut pagination_counter = 0u64;
1053        let mut freed_tree = self.freed_tree.lock().unwrap();
1054        if include_post_commit_free {
1055            // Move all the post-commit pages that came from the freed-tree. These need to be stored
1056            // since we can't free pages until a durable commit
1057            self.freed_pages
1058                .lock()
1059                .unwrap()
1060                .extend(self.post_commit_frees.lock().unwrap().drain(..));
1061        }
1062        while !self.freed_pages.lock().unwrap().is_empty() {
1063            let chunk_size = 100;
1064            let buffer_size = FreedPageList::required_bytes(chunk_size);
1065            let key = FreedTableKey {
1066                transaction_id: self.transaction_id.raw_id(),
1067                pagination_id: pagination_counter,
1068            };
1069            let mut access_guard =
1070                freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1071
1072            let mut freed_pages = self.freed_pages.lock().unwrap();
1073            let len = freed_pages.len();
1074            access_guard.as_mut().clear();
1075            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1076                access_guard.as_mut().push_back(page);
1077            }
1078            drop(access_guard);
1079
1080            pagination_counter += 1;
1081
1082            if include_post_commit_free {
1083                // Move all the post-commit pages that came from the freed-tree. These need to be stored
1084                // since we can't free pages until a durable commit
1085                freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1086            }
1087        }
1088
1089        Ok(())
1090    }
1091
1092    /// Retrieves information about storage usage in the database
1093    pub fn stats(&self) -> Result<DatabaseStats> {
1094        let tables = self.tables.lock().unwrap();
1095        let table_tree = &tables.table_tree;
1096        let data_tree_stats = table_tree.stats()?;
1097        let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1098        let total_metadata_bytes = data_tree_stats.metadata_bytes()
1099            + freed_tree_stats.metadata_bytes
1100            + freed_tree_stats.stored_leaf_bytes;
1101        let total_fragmented =
1102            data_tree_stats.fragmented_bytes() + freed_tree_stats.fragmented_bytes;
1103
1104        Ok(DatabaseStats {
1105            tree_height: data_tree_stats.tree_height(),
1106            allocated_pages: self.mem.count_allocated_pages()?,
1107            leaf_pages: data_tree_stats.leaf_pages(),
1108            branch_pages: data_tree_stats.branch_pages(),
1109            stored_leaf_bytes: data_tree_stats.stored_bytes(),
1110            metadata_bytes: total_metadata_bytes,
1111            fragmented_bytes: total_fragmented,
1112            page_size: self.mem.get_page_size(),
1113        })
1114    }
1115
1116    #[allow(dead_code)]
1117    pub(crate) fn print_debug(&self) -> Result {
1118        // Flush any pending updates to make sure we get the latest root
1119        if let Some(page) = self
1120            .tables
1121            .lock()
1122            .unwrap()
1123            .table_tree
1124            .flush_table_root_updates()
1125            .unwrap()
1126        {
1127            eprintln!("Master tree:");
1128            let master_tree: Btree<&str, InternalTableDefinition> =
1129                Btree::new(Some(page), PageHint::None, self.mem)?;
1130            master_tree.print_debug(true)?;
1131        }
1132
1133        Ok(())
1134    }
1135}
1136
1137impl<'a> Drop for WriteTransaction<'a> {
1138    fn drop(&mut self) {
1139        self.db.end_write_transaction(self.transaction_id);
1140        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1141            #[allow(unused_variables)]
1142            if let Err(error) = self.abort_inner() {
1143                #[cfg(feature = "logging")]
1144                warn!("Failure automatically aborting transaction: {}", error);
1145            }
1146        }
1147    }
1148}
1149
1150/// A read-only transaction
1151///
1152/// Read-only transactions may exist concurrently with writes
1153pub struct ReadTransaction<'a> {
1154    transaction_tracker: Arc<Mutex<TransactionTracker>>,
1155    mem: &'a TransactionalMemory,
1156    tree: TableTree<'a>,
1157    transaction_id: TransactionId,
1158}
1159
1160impl<'db> ReadTransaction<'db> {
1161    pub(crate) fn new(
1162        mem: &'db TransactionalMemory,
1163        transaction_tracker: Arc<Mutex<TransactionTracker>>,
1164        transaction_id: TransactionId,
1165    ) -> Self {
1166        let root_page = mem.get_data_root();
1167        Self {
1168            transaction_tracker,
1169            mem,
1170            tree: TableTree::new(root_page, mem, Default::default()),
1171            transaction_id,
1172        }
1173    }
1174
1175    /// Open the given table
1176    pub fn open_table<K: RedbKey + 'static, V: RedbValue + 'static>(
1177        &self,
1178        definition: TableDefinition<K, V>,
1179    ) -> Result<ReadOnlyTable<K, V>, TableError> {
1180        let header = self
1181            .tree
1182            .get_table::<K, V>(definition.name(), TableType::Normal)?
1183            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1184
1185        Ok(ReadOnlyTable::new(
1186            definition.name().to_string(),
1187            header.get_root(),
1188            PageHint::Clean,
1189            self.mem,
1190        )?)
1191    }
1192
1193    /// Open the given table without a type
1194    pub fn open_untyped_table(
1195        &self,
1196        handle: impl TableHandle,
1197    ) -> Result<ReadOnlyUntypedTable, TableError> {
1198        let header = self
1199            .tree
1200            .get_table_untyped(handle.name(), TableType::Normal)?
1201            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1202
1203        Ok(ReadOnlyUntypedTable::new(
1204            header.get_root(),
1205            header.get_fixed_key_size(),
1206            header.get_fixed_value_size(),
1207            self.mem,
1208        ))
1209    }
1210
1211    /// Open the given table
1212    pub fn open_multimap_table<K: RedbKey + 'static, V: RedbKey + 'static>(
1213        &self,
1214        definition: MultimapTableDefinition<K, V>,
1215    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1216        let header = self
1217            .tree
1218            .get_table::<K, V>(definition.name(), TableType::Multimap)?
1219            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1220
1221        Ok(ReadOnlyMultimapTable::new(
1222            header.get_root(),
1223            PageHint::Clean,
1224            self.mem,
1225        )?)
1226    }
1227
1228    /// Open the given table without a type
1229    pub fn open_untyped_multimap_table(
1230        &self,
1231        handle: impl MultimapTableHandle,
1232    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1233        let header = self
1234            .tree
1235            .get_table_untyped(handle.name(), TableType::Multimap)?
1236            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1237
1238        Ok(ReadOnlyUntypedMultimapTable::new(
1239            header.get_root(),
1240            header.get_fixed_key_size(),
1241            header.get_fixed_value_size(),
1242            self.mem,
1243        ))
1244    }
1245
1246    /// List all the tables
1247    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1248        self.tree
1249            .list_tables(TableType::Normal)
1250            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1251    }
1252
1253    /// List all the multimap tables
1254    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1255        self.tree
1256            .list_tables(TableType::Multimap)
1257            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1258    }
1259}
1260
1261impl<'a> Drop for ReadTransaction<'a> {
1262    fn drop(&mut self) {
1263        self.transaction_tracker
1264            .lock()
1265            .unwrap()
1266            .deallocate_read_transaction(self.transaction_id);
1267    }
1268}
1269
1270#[cfg(test)]
1271mod test {
1272    use crate::{Database, TableDefinition};
1273
1274    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1275
1276    #[test]
1277    fn transaction_id_persistence() {
1278        let tmpfile = crate::create_tempfile();
1279        let db = Database::create(tmpfile.path()).unwrap();
1280        let write_txn = db.begin_write().unwrap();
1281        {
1282            let mut table = write_txn.open_table(X).unwrap();
1283            table.insert("hello", "world").unwrap();
1284        }
1285        let first_txn_id = write_txn.transaction_id;
1286        write_txn.commit().unwrap();
1287        drop(db);
1288
1289        let db2 = Database::create(tmpfile.path()).unwrap();
1290        let write_txn = db2.begin_write().unwrap();
1291        assert!(write_txn.transaction_id > first_txn_id);
1292    }
1293}