manifold/
transactions.rs

1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8    Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
9    PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
10    TableTree, TableTreeMut, TableType, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{
14    AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
15    MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
16    Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
17    TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
18    UntypedTableHandle,
19};
20#[cfg(feature = "logging")]
21use log::{debug, warn};
22use std::borrow::Borrow;
23use std::cmp::min;
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::fmt::{Debug, Display, Formatter};
26use std::marker::PhantomData;
27use std::mem::size_of;
28use std::ops::RangeBounds;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35    SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37    SystemTableDefinition::new("persistent_savepoints");
38// Pages that were allocated in the data tree by a given transaction. Only updated when a savepoint
39// exists
40pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
41    TransactionIdWithPagination,
42    PageList,
43> = SystemTableDefinition::new("data_pages_allocated");
44// Pages in the data tree that are in the pending free state: i.e., they are unreachable from the
45// root as of the given transaction.
46pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
47    SystemTableDefinition::new("data_pages_unreachable");
48// Pages in the system tree that are in the pending free state: i.e., they are unreachable from the
49// root as of the given transaction.
50pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
51    SystemTableDefinition::new("system_pages_unreachable");
52// The allocator state table is stored in the system table tree, but it's accessed using
53// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
54pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
55pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
56pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
57pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
58
59// Format:
60// 2 bytes: length
61// length * size_of(PageNumber): array of page numbers
62#[derive(Debug)]
63pub(crate) struct PageList<'a> {
64    data: &'a [u8],
65}
66
67impl PageList<'_> {
68    fn required_bytes(len: usize) -> usize {
69        2 + PageNumber::serialized_size() * len
70    }
71
72    pub(crate) fn len(&self) -> usize {
73        u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
74    }
75
76    pub(crate) fn get(&self, index: usize) -> PageNumber {
77        let start = size_of::<u16>() + PageNumber::serialized_size() * index;
78        PageNumber::from_le_bytes(
79            self.data[start..(start + PageNumber::serialized_size())]
80                .try_into()
81                .unwrap(),
82        )
83    }
84}
85
86impl Value for PageList<'_> {
87    type SelfType<'a>
88        = PageList<'a>
89    where
90        Self: 'a;
91    type AsBytes<'a>
92        = &'a [u8]
93    where
94        Self: 'a;
95
96    fn fixed_width() -> Option<usize> {
97        None
98    }
99
100    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
101    where
102        Self: 'a,
103    {
104        PageList { data }
105    }
106
107    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
108    where
109        Self: 'b,
110    {
111        value.data
112    }
113
114    fn type_name() -> TypeName {
115        TypeName::internal("redb::PageList")
116    }
117}
118
119impl MutInPlaceValue for PageList<'_> {
120    type BaseRefType = PageListMut;
121
122    fn initialize(data: &mut [u8]) {
123        assert!(data.len() >= 8);
124        // Set the length to zero
125        data[..8].fill(0);
126    }
127
128    fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
129        unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
130    }
131}
132
133#[derive(Debug)]
134pub(crate) struct TransactionIdWithPagination {
135    pub(crate) transaction_id: u64,
136    pub(crate) pagination_id: u64,
137}
138
139impl Value for TransactionIdWithPagination {
140    type SelfType<'a>
141        = TransactionIdWithPagination
142    where
143        Self: 'a;
144    type AsBytes<'a>
145        = [u8; 2 * size_of::<u64>()]
146    where
147        Self: 'a;
148
149    fn fixed_width() -> Option<usize> {
150        Some(2 * size_of::<u64>())
151    }
152
153    fn from_bytes<'a>(data: &'a [u8]) -> Self
154    where
155        Self: 'a,
156    {
157        let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
158        let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
159        Self {
160            transaction_id,
161            pagination_id,
162        }
163    }
164
165    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
166    where
167        Self: 'b,
168    {
169        let mut result = [0u8; 2 * size_of::<u64>()];
170        result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
171        result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
172        result
173    }
174
175    fn type_name() -> TypeName {
176        TypeName::internal("redb::TransactionIdWithPagination")
177    }
178}
179
180impl Key for TransactionIdWithPagination {
181    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
182        let value1 = Self::from_bytes(data1);
183        let value2 = Self::from_bytes(data2);
184
185        match value1.transaction_id.cmp(&value2.transaction_id) {
186            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
187            std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
188            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
189        }
190    }
191}
192
193#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
194pub(crate) enum AllocatorStateKey {
195    Deprecated,
196    Region(u32),
197    RegionTracker,
198    TransactionId,
199}
200
201impl Value for AllocatorStateKey {
202    type SelfType<'a> = Self;
203    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
204
205    fn fixed_width() -> Option<usize> {
206        Some(1 + size_of::<u32>())
207    }
208
209    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
210    where
211        Self: 'a,
212    {
213        match data[0] {
214            // 0, 1, 2 were used in redb 2.x and have a different format
215            0..=2 => Self::Deprecated,
216            3 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
217            4 => Self::RegionTracker,
218            5 => Self::TransactionId,
219            _ => unreachable!(),
220        }
221    }
222
223    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
224    where
225        Self: 'a,
226        Self: 'b,
227    {
228        let mut result = Self::AsBytes::default();
229        match value {
230            Self::Region(region) => {
231                result[0] = 3;
232                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
233            }
234            Self::RegionTracker => {
235                result[0] = 4;
236            }
237            Self::TransactionId => {
238                result[0] = 5;
239            }
240            AllocatorStateKey::Deprecated => {
241                result[0] = 0;
242            }
243        }
244
245        result
246    }
247
248    fn type_name() -> TypeName {
249        TypeName::internal("redb::AllocatorStateKey")
250    }
251}
252
253impl Key for AllocatorStateKey {
254    fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
255        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
256    }
257}
258
259pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
260    name: &'a str,
261    _key_type: PhantomData<K>,
262    _value_type: PhantomData<V>,
263}
264
265impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
266    pub const fn new(name: &'a str) -> Self {
267        assert!(!name.is_empty());
268        Self {
269            name,
270            _key_type: PhantomData,
271            _value_type: PhantomData,
272        }
273    }
274}
275
276impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
277    fn name(&self) -> &str {
278        self.name
279    }
280}
281
282impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
283
284impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
285    fn clone(&self) -> Self {
286        *self
287    }
288}
289
290impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
291
292impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
293    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
294        write!(
295            f,
296            "{}<{}, {}>",
297            self.name,
298            K::type_name().name(),
299            V::type_name().name()
300        )
301    }
302}
303
304/// Informational storage stats about the database
305#[derive(Debug)]
306pub struct DatabaseStats {
307    pub(crate) tree_height: u32,
308    pub(crate) allocated_pages: u64,
309    pub(crate) leaf_pages: u64,
310    pub(crate) branch_pages: u64,
311    pub(crate) stored_leaf_bytes: u64,
312    pub(crate) metadata_bytes: u64,
313    pub(crate) fragmented_bytes: u64,
314    pub(crate) page_size: usize,
315}
316
317impl DatabaseStats {
318    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
319    pub fn tree_height(&self) -> u32 {
320        self.tree_height
321    }
322
323    /// Number of pages allocated
324    pub fn allocated_pages(&self) -> u64 {
325        self.allocated_pages
326    }
327
328    /// Number of leaf pages that store user data
329    pub fn leaf_pages(&self) -> u64 {
330        self.leaf_pages
331    }
332
333    /// Number of branch pages in btrees that store user data
334    pub fn branch_pages(&self) -> u64 {
335        self.branch_pages
336    }
337
338    /// Number of bytes consumed by keys and values that have been inserted.
339    /// Does not include indexing overhead
340    pub fn stored_bytes(&self) -> u64 {
341        self.stored_leaf_bytes
342    }
343
344    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
345    pub fn metadata_bytes(&self) -> u64 {
346        self.metadata_bytes
347    }
348
349    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
350    pub fn fragmented_bytes(&self) -> u64 {
351        self.fragmented_bytes
352    }
353
354    /// Number of bytes per page
355    pub fn page_size(&self) -> usize {
356        self.page_size
357    }
358}
359
360#[derive(Copy, Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
361#[non_exhaustive]
362pub enum Durability {
363    /// Commits with this durability level will not be persisted to disk unless followed by a
364    /// commit with [`Durability::Immediate`].
365    None,
366    /// Commits with this durability level are guaranteed to be persistent as soon as
367    /// [`WriteTransaction::commit`] returns.
368    Immediate,
369}
370
371// These are the actual durability levels used internally. `Durability::Paranoid` is translated
372// to `InternalDurability::Immediate`, and also enables 2-phase commit
373#[derive(Copy, Clone, Debug, PartialEq, Eq)]
374enum InternalDurability {
375    None,
376    Immediate,
377}
378
379// Like a Table but only one may be open at a time to avoid possible races
380pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
381    name: String,
382    namespace: &'s mut SystemNamespace<'db>,
383    tree: BtreeMut<'s, K, V>,
384    transaction_guard: Arc<TransactionGuard>,
385}
386
387impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
388    fn new(
389        name: &str,
390        table_root: Option<BtreeHeader>,
391        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
392        guard: Arc<TransactionGuard>,
393        mem: Arc<TransactionalMemory>,
394        namespace: &'s mut SystemNamespace<'db>,
395    ) -> SystemTable<'db, 's, K, V> {
396        // No need to track allocations in the system tree. Savepoint restoration only relies on
397        // freeing in the data tree
398        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
399        SystemTable {
400            name: name.to_string(),
401            namespace,
402            tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages, ignore),
403            transaction_guard: guard,
404        }
405    }
406
407    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
408    where
409        K: 'a,
410    {
411        self.tree.get(key.borrow())
412    }
413
414    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
415    where
416        K: 'a,
417        KR: Borrow<K::SelfType<'a>> + 'a,
418    {
419        self.tree
420            .range(&range)
421            .map(|x| Range::new(x, self.transaction_guard.clone()))
422    }
423
424    pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
425        &mut self,
426        range: impl RangeBounds<KR> + 'a,
427        predicate: F,
428    ) -> Result<ExtractIf<'_, K, V, F>>
429    where
430        KR: Borrow<K::SelfType<'a>> + 'a,
431    {
432        self.tree
433            .extract_from_if(&range, predicate)
434            .map(ExtractIf::new)
435    }
436
437    pub fn insert<'k, 'v>(
438        &mut self,
439        key: impl Borrow<K::SelfType<'k>>,
440        value: impl Borrow<V::SelfType<'v>>,
441    ) -> Result<Option<AccessGuard<'_, V>>> {
442        let value_len = V::as_bytes(value.borrow()).as_ref().len();
443        if value_len > MAX_VALUE_LENGTH {
444            return Err(StorageError::ValueTooLarge(value_len));
445        }
446        let key_len = K::as_bytes(key.borrow()).as_ref().len();
447        if key_len > MAX_VALUE_LENGTH {
448            return Err(StorageError::ValueTooLarge(key_len));
449        }
450        if value_len + key_len > MAX_PAIR_LENGTH {
451            return Err(StorageError::ValueTooLarge(value_len + key_len));
452        }
453        self.tree.insert(key.borrow(), value.borrow())
454    }
455
456    pub fn remove<'a>(
457        &mut self,
458        key: impl Borrow<K::SelfType<'a>>,
459    ) -> Result<Option<AccessGuard<'_, V>>>
460    where
461        K: 'a,
462    {
463        self.tree.remove(key.borrow())
464    }
465}
466
467impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
468    pub fn insert_reserve<'a>(
469        &mut self,
470        key: impl Borrow<K::SelfType<'a>>,
471        value_length: usize,
472    ) -> Result<AccessGuardMutInPlace<'_, V>> {
473        if value_length > MAX_VALUE_LENGTH {
474            return Err(StorageError::ValueTooLarge(value_length));
475        }
476        let key_len = K::as_bytes(key.borrow()).as_ref().len();
477        if key_len > MAX_VALUE_LENGTH {
478            return Err(StorageError::ValueTooLarge(key_len));
479        }
480        if value_length + key_len > MAX_PAIR_LENGTH {
481            return Err(StorageError::ValueTooLarge(value_length + key_len));
482        }
483        self.tree.insert_reserve(key.borrow(), value_length)
484    }
485}
486
487impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
488    fn drop(&mut self) {
489        self.namespace.close_table(
490            &self.name,
491            &self.tree,
492            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
493        );
494    }
495}
496
497struct SystemNamespace<'db> {
498    table_tree: TableTreeMut<'db>,
499    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
500    transaction_guard: Arc<TransactionGuard>,
501}
502
503impl<'db> SystemNamespace<'db> {
504    fn new(
505        root_page: Option<BtreeHeader>,
506        guard: Arc<TransactionGuard>,
507        mem: Arc<TransactionalMemory>,
508    ) -> Self {
509        // No need to track allocations in the system tree. Savepoint restoration only relies on
510        // freeing in the data tree
511        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
512        let freed_pages = Arc::new(Mutex::new(vec![]));
513        Self {
514            table_tree: TableTreeMut::new(
515                root_page,
516                guard.clone(),
517                mem,
518                freed_pages.clone(),
519                ignore,
520            ),
521            freed_pages,
522            transaction_guard: guard.clone(),
523        }
524    }
525
526    fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
527        self.freed_pages.clone()
528    }
529
530    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
531        &'s mut self,
532        transaction: &'txn WriteTransaction,
533        definition: SystemTableDefinition<K, V>,
534    ) -> Result<SystemTable<'db, 's, K, V>> {
535        let (root, _) = self
536            .table_tree
537            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
538            .map_err(|e| {
539                e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
540            })?;
541        transaction.dirty.store(true, Ordering::Release);
542
543        Ok(SystemTable::new(
544            definition.name(),
545            root,
546            self.freed_pages.clone(),
547            self.transaction_guard.clone(),
548            transaction.mem.clone(),
549            self,
550        ))
551    }
552
553    fn close_table<K: Key + 'static, V: Value + 'static>(
554        &mut self,
555        name: &str,
556        table: &BtreeMut<K, V>,
557        length: u64,
558    ) {
559        self.table_tree
560            .stage_update_table_root(name, table.get_root(), length);
561    }
562}
563
564struct TableNamespace<'db> {
565    open_tables: HashMap<String, &'static panic::Location<'static>>,
566    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
567    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
568    table_tree: TableTreeMut<'db>,
569}
570
571impl TableNamespace<'_> {
572    fn new(
573        root_page: Option<BtreeHeader>,
574        guard: Arc<TransactionGuard>,
575        mem: Arc<TransactionalMemory>,
576    ) -> Self {
577        let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
578        let freed_pages = Arc::new(Mutex::new(vec![]));
579        let table_tree = TableTreeMut::new(
580            root_page,
581            guard,
582            mem,
583            // Committed pages which are no longer reachable and will be queued for free'ing
584            // These are separated from the system freed pages
585            freed_pages.clone(),
586            allocated.clone(),
587        );
588        Self {
589            open_tables: Default::default(),
590            table_tree,
591            freed_pages,
592            allocated_pages: allocated,
593        }
594    }
595
596    fn set_dirty(&mut self, transaction: &WriteTransaction) {
597        transaction.dirty.store(true, Ordering::Release);
598        if !transaction.transaction_tracker.any_savepoint_exists() {
599            // No savepoints exist, and we don't allow savepoints to be created in a dirty transaction
600            // so we can disable allocation tracking now
601            *self.allocated_pages.lock().unwrap() = PageTrackerPolicy::Ignore;
602        }
603    }
604
605    fn set_root(&mut self, root: Option<BtreeHeader>) {
606        assert!(self.open_tables.is_empty());
607        self.table_tree.set_root(root);
608    }
609
610    #[track_caller]
611    fn inner_open<K: Key + 'static, V: Value + 'static>(
612        &mut self,
613        name: &str,
614        table_type: TableType,
615    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
616        if let Some(location) = self.open_tables.get(name) {
617            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
618        }
619
620        let root = self
621            .table_tree
622            .get_or_create_table::<K, V>(name, table_type)?;
623        self.open_tables
624            .insert(name.to_string(), panic::Location::caller());
625
626        Ok(root)
627    }
628
629    #[track_caller]
630    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
631        &mut self,
632        transaction: &'txn WriteTransaction,
633        definition: MultimapTableDefinition<K, V>,
634    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
635        #[cfg(feature = "logging")]
636        debug!("Opening multimap table: {definition}");
637        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
638        self.set_dirty(transaction);
639
640        Ok(MultimapTable::new(
641            definition.name(),
642            root,
643            length,
644            self.freed_pages.clone(),
645            self.allocated_pages.clone(),
646            transaction.mem.clone(),
647            transaction,
648        ))
649    }
650
651    #[track_caller]
652    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
653        &mut self,
654        transaction: &'txn WriteTransaction,
655        definition: TableDefinition<K, V>,
656    ) -> Result<Table<'txn, K, V>, TableError> {
657        #[cfg(feature = "logging")]
658        debug!("Opening table: {definition}");
659        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
660        self.set_dirty(transaction);
661
662        Ok(Table::new(
663            definition.name(),
664            root,
665            self.freed_pages.clone(),
666            self.allocated_pages.clone(),
667            transaction.mem.clone(),
668            transaction,
669        ))
670    }
671
672    #[track_caller]
673    fn inner_rename(
674        &mut self,
675        name: &str,
676        new_name: &str,
677        table_type: TableType,
678    ) -> Result<(), TableError> {
679        if let Some(location) = self.open_tables.get(name) {
680            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
681        }
682
683        self.table_tree.rename_table(name, new_name, table_type)
684    }
685
686    #[track_caller]
687    fn rename_table(
688        &mut self,
689        transaction: &WriteTransaction,
690        name: &str,
691        new_name: &str,
692    ) -> Result<(), TableError> {
693        #[cfg(feature = "logging")]
694        debug!("Renaming table: {name} to {new_name}");
695        self.set_dirty(transaction);
696        self.inner_rename(name, new_name, TableType::Normal)
697    }
698
699    #[track_caller]
700    fn rename_multimap_table(
701        &mut self,
702        transaction: &WriteTransaction,
703        name: &str,
704        new_name: &str,
705    ) -> Result<(), TableError> {
706        #[cfg(feature = "logging")]
707        debug!("Renaming multimap table: {name} to {new_name}");
708        self.set_dirty(transaction);
709        self.inner_rename(name, new_name, TableType::Multimap)
710    }
711
712    #[track_caller]
713    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
714        if let Some(location) = self.open_tables.get(name) {
715            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
716        }
717
718        self.table_tree.delete_table(name, table_type)
719    }
720
721    #[track_caller]
722    fn delete_table(
723        &mut self,
724        transaction: &WriteTransaction,
725        name: &str,
726    ) -> Result<bool, TableError> {
727        #[cfg(feature = "logging")]
728        debug!("Deleting table: {name}");
729        self.set_dirty(transaction);
730        self.inner_delete(name, TableType::Normal)
731    }
732
733    #[track_caller]
734    fn delete_multimap_table(
735        &mut self,
736        transaction: &WriteTransaction,
737        name: &str,
738    ) -> Result<bool, TableError> {
739        #[cfg(feature = "logging")]
740        debug!("Deleting multimap table: {name}");
741        self.set_dirty(transaction);
742        self.inner_delete(name, TableType::Multimap)
743    }
744
745    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
746        &mut self,
747        name: &str,
748        table: &BtreeMut<K, V>,
749        length: u64,
750    ) {
751        self.open_tables.remove(name).unwrap();
752        self.table_tree
753            .stage_update_table_root(name, table.get_root(), length);
754    }
755}
756
757/// A read/write transaction
758///
759/// Only a single [`WriteTransaction`] may exist at a time
760pub struct WriteTransaction {
761    transaction_tracker: Arc<TransactionTracker>,
762    mem: Arc<TransactionalMemory>,
763    transaction_guard: Arc<TransactionGuard>,
764    transaction_id: TransactionId,
765    tables: Mutex<TableNamespace<'static>>,
766    system_tables: Mutex<SystemNamespace<'static>>,
767    completed: bool,
768    dirty: AtomicBool,
769    durability: InternalDurability,
770    two_phase_commit: bool,
771    shrink_policy: ShrinkPolicy,
772    quick_repair: bool,
773    // Persistent savepoints created during this transaction
774    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
775    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
776    // WAL integration for column families
777    wal_journal: Option<Arc<crate::column_family::wal::journal::WALJournal>>,
778    cf_name: Option<String>,
779    checkpoint_manager: Option<Arc<crate::column_family::wal::checkpoint::CheckpointManager>>,
780}
781
782impl WriteTransaction {
783    pub(crate) fn new(
784        guard: TransactionGuard,
785        transaction_tracker: Arc<TransactionTracker>,
786        mem: Arc<TransactionalMemory>,
787    ) -> Result<Self> {
788        let transaction_id = guard.id();
789        let guard = Arc::new(guard);
790
791        let root_page = mem.get_data_root();
792        let system_page = mem.get_system_root();
793
794        let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
795        let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
796
797        Ok(Self {
798            transaction_tracker,
799            mem: mem.clone(),
800            transaction_guard: guard.clone(),
801            transaction_id,
802            tables: Mutex::new(tables),
803            system_tables: Mutex::new(system_tables),
804            completed: false,
805            dirty: AtomicBool::new(false),
806            durability: InternalDurability::Immediate,
807            two_phase_commit: false,
808            quick_repair: false,
809            shrink_policy: ShrinkPolicy::Default,
810            created_persistent_savepoints: Mutex::new(Default::default()),
811            deleted_persistent_savepoints: Mutex::new(vec![]),
812            wal_journal: None,
813            cf_name: None,
814            checkpoint_manager: None,
815        })
816    }
817
818    pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
819        self.shrink_policy = shrink_policy;
820    }
821
822    /// Sets the WAL context for this transaction (used by column families).
823    pub(crate) fn set_wal_context(
824        &mut self,
825        cf_name: String,
826        wal_journal: Arc<crate::column_family::wal::journal::WALJournal>,
827        checkpoint_manager: Option<Arc<crate::column_family::wal::checkpoint::CheckpointManager>>,
828    ) {
829        self.cf_name = Some(cf_name);
830        self.wal_journal = Some(wal_journal);
831        self.checkpoint_manager = checkpoint_manager;
832    }
833
834    /// Disable WAL for this specific transaction.
835    ///
836    /// This is useful for bulk load operations where WAL overhead provides no benefit.
837    /// When WAL is disabled for a transaction, it will use a direct durable commit instead.
838    ///
839    /// **Important**: Only use this for large bulk operations. For normal operations,
840    /// WAL provides better performance through group commit batching.
841    pub fn disable_wal(&mut self) {
842        self.wal_journal = None;
843        self.checkpoint_manager = None;
844    }
845
846    pub(crate) fn pending_free_pages(&self) -> Result<bool> {
847        let mut system_tables = self.system_tables.lock().unwrap();
848        if system_tables
849            .open_system_table(self, DATA_FREED_TABLE)?
850            .tree
851            .get_root()
852            .is_some()
853        {
854            return Ok(true);
855        }
856        if system_tables
857            .open_system_table(self, SYSTEM_FREED_TABLE)?
858            .tree
859            .get_root()
860            .is_some()
861        {
862            return Ok(true);
863        }
864
865        Ok(false)
866    }
867
868    #[cfg(debug_assertions)]
869    pub fn print_allocated_page_debug(&self) {
870        let mut all_allocated: HashSet<PageNumber> =
871            HashSet::from_iter(self.mem.all_allocated_pages());
872
873        self.mem.debug_check_allocator_consistency();
874
875        let mut table_pages = vec![];
876        self.tables
877            .lock()
878            .unwrap()
879            .table_tree
880            .visit_all_pages(|path| {
881                table_pages.push(path.page_number());
882                Ok(())
883            })
884            .unwrap();
885        println!("Tables");
886        for p in table_pages {
887            assert!(all_allocated.remove(&p));
888            println!("{p:?}");
889        }
890
891        let mut system_table_pages = vec![];
892        self.system_tables
893            .lock()
894            .unwrap()
895            .table_tree
896            .visit_all_pages(|path| {
897                system_table_pages.push(path.page_number());
898                Ok(())
899            })
900            .unwrap();
901        println!("System tables");
902        for p in system_table_pages {
903            assert!(all_allocated.remove(&p));
904            println!("{p:?}");
905        }
906
907        {
908            println!("Pending free (in data freed table)");
909            let mut system_tables = self.system_tables.lock().unwrap();
910            let data_freed = system_tables
911                .open_system_table(self, DATA_FREED_TABLE)
912                .unwrap();
913            for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
914                let (_, entry) = entry.unwrap();
915                let value = entry.value();
916                for i in 0..value.len() {
917                    let p = value.get(i);
918                    assert!(all_allocated.remove(&p));
919                    println!("{p:?}");
920                }
921            }
922        }
923        {
924            println!("Pending free (in system freed table)");
925            let mut system_tables = self.system_tables.lock().unwrap();
926            let system_freed = system_tables
927                .open_system_table(self, SYSTEM_FREED_TABLE)
928                .unwrap();
929            for entry in system_freed
930                .range::<TransactionIdWithPagination>(..)
931                .unwrap()
932            {
933                let (_, entry) = entry.unwrap();
934                let value = entry.value();
935                for i in 0..value.len() {
936                    let p = value.get(i);
937                    assert!(all_allocated.remove(&p));
938                    println!("{p:?}");
939                }
940            }
941        }
942        {
943            let tables = self.tables.lock().unwrap();
944            let pages = tables.freed_pages.lock().unwrap();
945            if !pages.is_empty() {
946                println!("Pages in in-memory data freed_pages");
947                for p in pages.iter() {
948                    println!("{p:?}");
949                    assert!(all_allocated.remove(p));
950                }
951            }
952        }
953        {
954            let system_tables = self.system_tables.lock().unwrap();
955            let pages = system_tables.freed_pages.lock().unwrap();
956            if !pages.is_empty() {
957                println!("Pages in in-memory system freed_pages");
958                for p in pages.iter() {
959                    println!("{p:?}");
960                    assert!(all_allocated.remove(p));
961                }
962            }
963        }
964        if !all_allocated.is_empty() {
965            println!("Leaked pages");
966            for p in all_allocated {
967                println!("{p:?}");
968            }
969        }
970    }
971
972    /// Creates a snapshot of the current database state, which can be used to rollback the database.
973    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
974    ///
975    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
976    /// Therefore, the lifetime of a savepoint should be minimized.
977    ///
978    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
979    /// or if the transaction's durability is less than `[Durability::Immediate]`
980    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
981        if self.durability != InternalDurability::Immediate {
982            return Err(SavepointError::InvalidSavepoint);
983        }
984
985        let mut savepoint = self.ephemeral_savepoint()?;
986
987        let mut system_tables = self.system_tables.lock().unwrap();
988
989        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
990        next_table.insert((), savepoint.get_id().next())?;
991        drop(next_table);
992
993        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
994        savepoint_table.insert(
995            savepoint.get_id(),
996            SerializedSavepoint::from_savepoint(&savepoint),
997        )?;
998
999        savepoint.set_persistent();
1000
1001        self.created_persistent_savepoints
1002            .lock()
1003            .unwrap()
1004            .insert(savepoint.get_id());
1005
1006        Ok(savepoint.get_id().0)
1007    }
1008
1009    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
1010        self.transaction_guard.clone()
1011    }
1012
1013    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
1014        let mut system_tables = self.system_tables.lock().unwrap();
1015        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1016        let value = next_table.get(())?;
1017        if let Some(next_id) = value {
1018            Ok(Some(next_id.value()))
1019        } else {
1020            Ok(None)
1021        }
1022    }
1023
1024    /// Get a persistent savepoint given its id
1025    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
1026        let mut system_tables = self.system_tables.lock().unwrap();
1027        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1028        let value = table.get(SavepointId(id))?;
1029
1030        value
1031            .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
1032            .ok_or(SavepointError::InvalidSavepoint)
1033    }
1034
1035    /// Delete the given persistent savepoint.
1036    ///
1037    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
1038    ///
1039    /// Returns `true` if the savepoint existed
1040    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
1041    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1042        if self.durability != InternalDurability::Immediate {
1043            return Err(SavepointError::InvalidSavepoint);
1044        }
1045        let mut system_tables = self.system_tables.lock().unwrap();
1046        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1047        let savepoint = table.remove(SavepointId(id))?;
1048        if let Some(serialized) = savepoint {
1049            let savepoint = serialized
1050                .value()
1051                .to_savepoint(self.transaction_tracker.clone());
1052            self.deleted_persistent_savepoints
1053                .lock()
1054                .unwrap()
1055                .push((savepoint.get_id(), savepoint.get_transaction_id()));
1056            Ok(true)
1057        } else {
1058            Ok(false)
1059        }
1060    }
1061
1062    /// List all persistent savepoints
1063    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1064        let mut system_tables = self.system_tables.lock().unwrap();
1065        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1066        let mut savepoints = vec![];
1067        for savepoint in table.range::<SavepointId>(..)? {
1068            savepoints.push(savepoint?.0.value().0);
1069        }
1070        Ok(savepoints.into_iter())
1071    }
1072
1073    // TODO: deduplicate this with the one in Database
1074    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1075        let id = self
1076            .transaction_tracker
1077            .register_read_transaction(&self.mem)?;
1078
1079        Ok(TransactionGuard::new_read(
1080            id,
1081            self.transaction_tracker.clone(),
1082        ))
1083    }
1084
1085    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1086        let transaction_id = self.allocate_read_transaction()?.leak();
1087        let id = self.transaction_tracker.allocate_savepoint(transaction_id);
1088        Ok((id, transaction_id))
1089    }
1090
1091    /// Creates a snapshot of the current database state, which can be used to rollback the database
1092    ///
1093    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
1094    ///
1095    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
1096    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1097        if self.dirty.load(Ordering::Acquire) {
1098            return Err(SavepointError::InvalidSavepoint);
1099        }
1100
1101        let (id, transaction_id) = self.allocate_savepoint()?;
1102        #[cfg(feature = "logging")]
1103        debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
1104
1105        let root = self.mem.get_data_root();
1106        let savepoint = Savepoint::new_ephemeral(
1107            &self.mem,
1108            self.transaction_tracker.clone(),
1109            id,
1110            transaction_id,
1111            root,
1112        );
1113
1114        Ok(savepoint)
1115    }
1116
1117    /// Restore the state of the database to the given [`Savepoint`]
1118    ///
1119    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
1120    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1121        // Ensure that user does not try to restore a Savepoint that is from a different Database
1122        assert_eq!(
1123            std::ptr::from_ref(self.transaction_tracker.as_ref()),
1124            savepoint.db_address()
1125        );
1126
1127        if !self
1128            .transaction_tracker
1129            .is_valid_savepoint(savepoint.get_id())
1130        {
1131            return Err(SavepointError::InvalidSavepoint);
1132        }
1133        #[cfg(feature = "logging")]
1134        debug!(
1135            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1136            savepoint.get_id(),
1137            self.transaction_id
1138        );
1139        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
1140        // the database
1141        assert_eq!(self.mem.get_version(), savepoint.get_version());
1142        self.dirty.store(true, Ordering::Release);
1143
1144        // Restoring a savepoint needs to accomplish the following:
1145        // 1) restore the table tree. This is trivial, since we have the old root
1146        // 1a) we also filter the freed tree to remove any pages referenced by the old root
1147        // 2) free all pages that were allocated since the savepoint and are unreachable
1148        //    from the restored table tree root. Here we diff the reachable pages from the old
1149        //    and new roots
1150        // 3) update the system tree to remove invalid persistent savepoints.
1151
1152        // 1) restore the table tree
1153        {
1154            self.tables
1155                .lock()
1156                .unwrap()
1157                .set_root(savepoint.get_user_root());
1158        }
1159
1160        // 1a) purge all transactions that happened after the savepoint from the data freed tree
1161        let txn_id = savepoint.get_transaction_id().next().raw_id();
1162        {
1163            let lower = TransactionIdWithPagination {
1164                transaction_id: txn_id,
1165                pagination_id: 0,
1166            };
1167            let mut system_tables = self.system_tables.lock().unwrap();
1168            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1169            for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1170                entry?;
1171            }
1172            // No need to process the system freed table, because it only rolls forward
1173        }
1174
1175        // 2) queue all pages that became unreachable
1176        {
1177            let tables = self.tables.lock().unwrap();
1178            let mut data_freed_pages = tables.freed_pages.lock().unwrap();
1179            let mut system_tables = self.system_tables.lock().unwrap();
1180            let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1181            let lower = TransactionIdWithPagination {
1182                transaction_id: txn_id,
1183                pagination_id: 0,
1184            };
1185            for entry in data_allocated.range(lower..)? {
1186                let (_, value) = entry?;
1187                for i in 0..value.value().len() {
1188                    data_freed_pages.push(value.value().get(i));
1189                }
1190            }
1191        }
1192
1193        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1194        // from later trying to restore a savepoint "on another timeline"
1195        self.transaction_tracker
1196            .invalidate_savepoints_after(savepoint.get_id());
1197        for persistent_savepoint in self.list_persistent_savepoints()? {
1198            if persistent_savepoint > savepoint.get_id().0 {
1199                self.delete_persistent_savepoint(persistent_savepoint)?;
1200            }
1201        }
1202
1203        Ok(())
1204    }
1205
1206    /// Set the desired durability level for writes made in this transaction
1207    /// Defaults to [`Durability::Immediate`]
1208    ///
1209    /// If a persistent savepoint has been created or deleted, in this transaction, the durability may not
1210    /// be reduced below [`Durability::Immediate`]
1211    pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
1212        let created = !self
1213            .created_persistent_savepoints
1214            .lock()
1215            .unwrap()
1216            .is_empty();
1217        let deleted = !self
1218            .deleted_persistent_savepoints
1219            .lock()
1220            .unwrap()
1221            .is_empty();
1222        if (created || deleted) && !matches!(durability, Durability::Immediate) {
1223            return Err(SetDurabilityError::PersistentSavepointModified);
1224        }
1225
1226        self.durability = match durability {
1227            Durability::None => InternalDurability::None,
1228            Durability::Immediate => InternalDurability::Immediate,
1229        };
1230
1231        Ok(())
1232    }
1233
1234    /// Enable or disable 2-phase commit (defaults to disabled)
1235    ///
1236    /// By default, data is written using the following 1-phase commit algorithm:
1237    ///
1238    /// 1. Update the inactive commit slot with the new database state
1239    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
1240    /// 3. Call `fsync` to ensure all writes have been persisted to disk
1241    ///
1242    /// All data is written with checksums. When opening the database after a crash, the most
1243    /// recent of the two commit slots with a valid checksum is used.
1244    ///
1245    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
1246    /// function with close to perfect collision resistance when used with non-malicious input. An
1247    /// attacker with an extremely high degree of control over the database's workload, including
1248    /// the ability to cause the database process to crash, can cause invalid data to be written
1249    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
1250    ///
1251    /// Alternatively, you can enable 2-phase commit, which writes data like this:
1252    ///
1253    /// 1. Update the inactive commit slot with the new database state
1254    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
1255    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
1256    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
1257    ///
1258    /// This mitigates a theoretical attack where an attacker who
1259    /// 1. can control the order in which pages are flushed to disk
1260    /// 2. can introduce crashes during `fsync`,
1261    /// 3. has knowledge of the database file contents, and
1262    /// 4. can include arbitrary data in a write transaction
1263    ///
1264    /// could cause a transaction to partially commit (some but not all of the data is written).
1265    /// This is described in the design doc in futher detail.
1266    ///
1267    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
1268    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
1269    /// a high degree of control over the database's workload, including the ability to cause the
1270    /// database process to crash, can cause the database to crash with the god byte primary bit
1271    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
1272    /// controlled state.
1273    pub fn set_two_phase_commit(&mut self, enabled: bool) {
1274        self.two_phase_commit = enabled;
1275    }
1276
1277    /// Enable or disable quick-repair (defaults to disabled)
1278    ///
1279    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1280    /// This involves walking the entire database to verify the checksums and reconstruct the
1281    /// allocator state, so it can be very slow if the database is large.
1282    ///
1283    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1284    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1285    /// (which guarantees that the primary commit slot is valid without needing to look at the
1286    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1287    pub fn set_quick_repair(&mut self, enabled: bool) {
1288        self.quick_repair = enabled;
1289    }
1290
1291    /// Open the given table
1292    ///
1293    /// The table will be created if it does not exist
1294    #[track_caller]
1295    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1296        &'txn self,
1297        definition: TableDefinition<K, V>,
1298    ) -> Result<Table<'txn, K, V>, TableError> {
1299        self.tables.lock().unwrap().open_table(self, definition)
1300    }
1301
1302    /// Open the given table
1303    ///
1304    /// The table will be created if it does not exist
1305    #[track_caller]
1306    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1307        &'txn self,
1308        definition: MultimapTableDefinition<K, V>,
1309    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1310        self.tables
1311            .lock()
1312            .unwrap()
1313            .open_multimap_table(self, definition)
1314    }
1315
1316    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1317        &self,
1318        name: &str,
1319        table: &BtreeMut<K, V>,
1320        length: u64,
1321    ) {
1322        self.tables.lock().unwrap().close_table(name, table, length);
1323    }
1324
1325    /// Rename the given table
1326    pub fn rename_table(
1327        &self,
1328        definition: impl TableHandle,
1329        new_name: impl TableHandle,
1330    ) -> Result<(), TableError> {
1331        let name = definition.name().to_string();
1332        // Drop the definition so that callers can pass in a `Table` to rename, without getting a TableAlreadyOpen error
1333        drop(definition);
1334        self.tables
1335            .lock()
1336            .unwrap()
1337            .rename_table(self, &name, new_name.name())
1338    }
1339
1340    /// Rename the given multimap table
1341    pub fn rename_multimap_table(
1342        &self,
1343        definition: impl MultimapTableHandle,
1344        new_name: impl MultimapTableHandle,
1345    ) -> Result<(), TableError> {
1346        let name = definition.name().to_string();
1347        // Drop the definition so that callers can pass in a `MultimapTable` to rename, without getting a TableAlreadyOpen error
1348        drop(definition);
1349        self.tables
1350            .lock()
1351            .unwrap()
1352            .rename_multimap_table(self, &name, new_name.name())
1353    }
1354
1355    /// Delete the given table
1356    ///
1357    /// Returns a bool indicating whether the table existed
1358    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1359        let name = definition.name().to_string();
1360        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1361        drop(definition);
1362        self.tables.lock().unwrap().delete_table(self, &name)
1363    }
1364
1365    /// Delete the given table
1366    ///
1367    /// Returns a bool indicating whether the table existed
1368    pub fn delete_multimap_table(
1369        &self,
1370        definition: impl MultimapTableHandle,
1371    ) -> Result<bool, TableError> {
1372        let name = definition.name().to_string();
1373        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1374        drop(definition);
1375        self.tables
1376            .lock()
1377            .unwrap()
1378            .delete_multimap_table(self, &name)
1379    }
1380
1381    /// List all the tables
1382    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1383        self.tables
1384            .lock()
1385            .unwrap()
1386            .table_tree
1387            .list_tables(TableType::Normal)
1388            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1389    }
1390
1391    /// List all the multimap tables
1392    pub fn list_multimap_tables(
1393        &self,
1394    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1395        self.tables
1396            .lock()
1397            .unwrap()
1398            .table_tree
1399            .list_tables(TableType::Multimap)
1400            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1401    }
1402
1403    /// Commit the transaction
1404    ///
1405    /// All writes performed in this transaction will be visible to future transactions, and are
1406    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
1407    pub fn commit(mut self) -> Result<(), CommitError> {
1408        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1409        self.completed = true;
1410        self.commit_inner()
1411    }
1412
1413    fn commit_inner(&mut self) -> Result<(), CommitError> {
1414        // Quick-repair requires 2-phase commit
1415        if self.quick_repair {
1416            self.two_phase_commit = true;
1417        }
1418
1419        let (user_root, allocated_pages, data_freed) =
1420            self.tables.lock().unwrap().table_tree.flush_and_close()?;
1421
1422        // Clone data for WAL before storing
1423        let data_freed_clone = data_freed.clone();
1424        let allocated_pages_vec: Vec<_> = allocated_pages.iter().copied().collect();
1425
1426        self.store_data_freed_pages(data_freed)?;
1427        self.store_allocated_pages(allocated_pages.into_iter().collect())?;
1428
1429        // Prepare system root with finalized checksums based on durability
1430        // This MUST happen before writing to WAL to avoid DEFERRED checksums
1431        let (system_root, post_commit_frees) = match self.durability {
1432            InternalDurability::None => {
1433                let (sys_root, frees) = self.prepare_system_root_for_non_durable_commit()?;
1434                (sys_root, Some(frees))
1435            }
1436            InternalDurability::Immediate => {
1437                if self.wal_journal.is_some() {
1438                    // WAL path: use non-durable preparation
1439                    let (sys_root, frees) = self.prepare_system_root_for_non_durable_commit()?;
1440                    (sys_root, Some(frees))
1441                } else {
1442                    // No WAL: use durable preparation
1443                    let sys_root = self.prepare_system_root_for_durable_commit()?;
1444                    (sys_root, None)
1445                }
1446            }
1447        };
1448
1449        // Append to WAL if enabled (AFTER system root is finalized)
1450        if let (Some(wal_journal), Some(cf_name)) = (&self.wal_journal, &self.cf_name) {
1451            use crate::column_family::wal::entry::{WALEntry, WALTransactionPayload};
1452
1453            let payload = WALTransactionPayload {
1454                user_root: user_root.map(|h| (h.root, h.checksum, h.length)),
1455                system_root: system_root.map(|h| (h.root, h.checksum, h.length)),
1456                freed_pages: data_freed_clone,
1457                allocated_pages: allocated_pages_vec,
1458                durability: match self.durability {
1459                    InternalDurability::None => Durability::None,
1460                    InternalDurability::Immediate => Durability::Immediate,
1461                },
1462            };
1463
1464            let mut entry = WALEntry::new(cf_name.clone(), self.transaction_id.raw_id(), payload);
1465
1466            // Append to WAL and wait for group commit fsync
1467            let sequence = wal_journal
1468                .append(&mut entry)
1469                .map_err(|e| CommitError::Storage(StorageError::from(e)))?;
1470
1471            // Wait for background sync thread to fsync (group commit)
1472            wal_journal
1473                .wait_for_sync(sequence)
1474                .map_err(|e| CommitError::Storage(StorageError::from(e)))?;
1475
1476            // Register for checkpoint
1477            if let Some(checkpoint_mgr) = &self.checkpoint_manager {
1478                checkpoint_mgr.register_pending(sequence);
1479            }
1480        }
1481
1482        #[cfg(feature = "logging")]
1483        debug!(
1484            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1485            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1486        );
1487
1488        // Perform final commit to memory
1489        // System root is already finalized, so we can commit directly
1490        match self.durability {
1491            InternalDurability::None => {
1492                self.mem
1493                    .non_durable_commit(user_root, system_root, self.transaction_id)?;
1494                self.transaction_tracker.register_non_durable_commit(
1495                    self.transaction_id,
1496                    self.mem.get_last_durable_transaction_id()?,
1497                );
1498                if let Some(frees) = post_commit_frees {
1499                    for page in frees {
1500                        self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1501                    }
1502                }
1503            }
1504            InternalDurability::Immediate => {
1505                if self.wal_journal.is_some() {
1506                    // WAL already fsynced, just make changes visible
1507                    self.mem
1508                        .non_durable_commit(user_root, system_root, self.transaction_id)?;
1509                    self.transaction_tracker.register_non_durable_commit(
1510                        self.transaction_id,
1511                        self.mem.get_last_durable_transaction_id()?,
1512                    );
1513                    if let Some(frees) = post_commit_frees {
1514                        for page in frees {
1515                            self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1516                        }
1517                    }
1518                } else {
1519                    // No WAL, use traditional durable commit
1520                    // System root already prepared, now do the final commit steps
1521                    self.mem.commit(
1522                        user_root,
1523                        system_root,
1524                        self.transaction_id,
1525                        self.two_phase_commit,
1526                        self.shrink_policy,
1527                    )?;
1528                    self.transaction_tracker.clear_pending_non_durable_commits();
1529                    // Free system pages immediately for durable commits
1530                    let system_freed_pages =
1531                        self.system_tables.lock().unwrap().system_freed_pages();
1532                    for page in system_freed_pages.lock().unwrap().drain(..) {
1533                        self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1534                    }
1535                }
1536            }
1537        }
1538
1539        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1540            self.transaction_tracker
1541                .deallocate_savepoint(*savepoint, *transaction);
1542        }
1543
1544        assert!(
1545            self.system_tables
1546                .lock()
1547                .unwrap()
1548                .system_freed_pages()
1549                .lock()
1550                .unwrap()
1551                .is_empty()
1552        );
1553        assert!(
1554            self.tables
1555                .lock()
1556                .unwrap()
1557                .freed_pages
1558                .lock()
1559                .unwrap()
1560                .is_empty()
1561        );
1562
1563        #[cfg(feature = "logging")]
1564        debug!(
1565            "Finished commit of transaction id={:?}",
1566            self.transaction_id
1567        );
1568
1569        Ok(())
1570    }
1571
1572    fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
1573        let mut system_tables = self.system_tables.lock().unwrap();
1574        let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1575        let mut pagination_counter = 0;
1576        while !freed_pages.is_empty() {
1577            let chunk_size = 400;
1578            let buffer_size = PageList::required_bytes(chunk_size);
1579            let key = TransactionIdWithPagination {
1580                transaction_id: self.transaction_id.raw_id(),
1581                pagination_id: pagination_counter,
1582            };
1583            let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
1584
1585            let len = freed_pages.len();
1586            access_guard.as_mut().clear();
1587            for page in freed_pages.drain(len - min(len, chunk_size)..) {
1588                // Make sure that the page is currently allocated
1589                debug_assert!(
1590                    self.mem.is_allocated(page),
1591                    "Page is not allocated: {page:?}"
1592                );
1593                debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
1594                access_guard.as_mut().push_back(page);
1595            }
1596
1597            pagination_counter += 1;
1598        }
1599
1600        Ok(())
1601    }
1602
1603    fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
1604        let mut system_tables = self.system_tables.lock().unwrap();
1605        let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1606        let mut pagination_counter = 0;
1607        while !data_allocated_pages.is_empty() {
1608            let chunk_size = 400;
1609            let buffer_size = PageList::required_bytes(chunk_size);
1610            let key = TransactionIdWithPagination {
1611                transaction_id: self.transaction_id.raw_id(),
1612                pagination_id: pagination_counter,
1613            };
1614            let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
1615
1616            let len = data_allocated_pages.len();
1617            access_guard.as_mut().clear();
1618            for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
1619                // Make sure that the page is currently allocated. This is to catch scenarios like
1620                // a page getting allocated, and then deallocated within the same transaction,
1621                // but errantly being left in the allocated pages list
1622                debug_assert!(
1623                    self.mem.is_allocated(page),
1624                    "Page is not allocated: {page:?}"
1625                );
1626                debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
1627                access_guard.as_mut().push_back(page);
1628            }
1629
1630            pagination_counter += 1;
1631        }
1632
1633        // Purge any transactions that are no longer referenced
1634        let oldest = self
1635            .transaction_tracker
1636            .oldest_savepoint()
1637            .map_or(u64::MAX, |(_, x)| x.raw_id());
1638        let key = TransactionIdWithPagination {
1639            transaction_id: oldest,
1640            pagination_id: 0,
1641        };
1642        for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
1643            entry?;
1644        }
1645
1646        Ok(())
1647    }
1648
1649    /// Abort the transaction
1650    ///
1651    /// All writes performed in this transaction will be rolled back
1652    pub fn abort(mut self) -> Result {
1653        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
1654        self.completed = true;
1655        self.abort_inner()
1656    }
1657
1658    fn abort_inner(&mut self) -> Result {
1659        #[cfg(feature = "logging")]
1660        debug!("Aborting transaction id={:?}", self.transaction_id);
1661        self.tables
1662            .lock()
1663            .unwrap()
1664            .table_tree
1665            .clear_root_updates_and_close();
1666        for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1667            match self.delete_persistent_savepoint(savepoint.0) {
1668                Ok(_) => {}
1669                Err(err) => match err {
1670                    SavepointError::InvalidSavepoint => {
1671                        unreachable!();
1672                    }
1673                    SavepointError::Storage(storage_err) => {
1674                        return Err(storage_err);
1675                    }
1676                },
1677            }
1678        }
1679        self.mem.rollback_uncommitted_writes()?;
1680        #[cfg(feature = "logging")]
1681        debug!("Finished abort of transaction id={:?}", self.transaction_id);
1682        Ok(())
1683    }
1684
1685    /// Prepares and finalizes the system root for commit.
1686    /// This must be called before writing to WAL to ensure checksums are finalized.
1687    /// Returns the finalized `system_root` and a vector of pages to free post-commit.
1688    fn prepare_system_root_for_durable_commit(&mut self) -> Result<Option<BtreeHeader>> {
1689        let free_until_transaction = self
1690            .transaction_tracker
1691            .oldest_live_read_transaction()
1692            .map_or(self.transaction_id, |x| x.next());
1693        self.process_freed_pages(free_until_transaction)?;
1694
1695        let mut system_tables = self.system_tables.lock().unwrap();
1696        let system_freed_pages = system_tables.system_freed_pages();
1697        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1698        system_tree
1699            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1700            .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1701
1702        if self.quick_repair {
1703            system_tree.create_table_and_flush_table_root(
1704                ALLOCATOR_STATE_TABLE_NAME,
1705                |system_tree_ref, tree: &mut AllocatorStateTreeMut| {
1706                    let mut pagination_counter = 0;
1707
1708                    loop {
1709                        let num_regions = self
1710                            .mem
1711                            .reserve_allocator_state(tree, self.transaction_id)?;
1712
1713                        // We can't free pages after the commit, because that would invalidate our
1714                        // saved allocator state. Everything needs to go through the transactional
1715                        // free mechanism
1716                        self.store_system_freed_pages(
1717                            system_tree_ref,
1718                            system_freed_pages.clone(),
1719                            None,
1720                            &mut pagination_counter,
1721                        )?;
1722
1723                        if self.mem.try_save_allocator_state(tree, num_regions)? {
1724                            return Ok(());
1725                        }
1726
1727                        // Clear out the table before retrying, just in case the number of regions
1728                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
1729                        // free the pages immediately -- we need to reuse those pages to guarantee
1730                        // that our retry loop will eventually terminate
1731                        while let Some(guards) = tree.last()? {
1732                            let key = guards.0.value();
1733                            drop(guards);
1734                            tree.remove(&key)?;
1735                        }
1736                    }
1737                },
1738            )?;
1739        }
1740
1741        let system_root = system_tree.finalize_dirty_checksums()?;
1742        Ok(system_root)
1743    }
1744
1745    /// Prepares and finalizes the system root for non-durable commit.
1746    /// This must be called before writing to WAL to ensure checksums are finalized.
1747    /// Returns the finalized `system_root` and a vector of pages to free post-commit.
1748    fn prepare_system_root_for_non_durable_commit(
1749        &mut self,
1750    ) -> Result<(Option<BtreeHeader>, Vec<PageNumber>)> {
1751        let mut free_until_transaction = self
1752            .transaction_tracker
1753            .oldest_live_read_nondurable_transaction()
1754            .map_or(self.transaction_id, |x| x.next());
1755        // TODO: refactor the non-durable free'ed processing to remove this
1756        // The reason it is needed is that non-durable commits edit previous non-durable commits,
1757        // but they only edit the freed tree of unpersisted pages.
1758        // The allocated tree, which savepoints rely, is not edited for performance reasons
1759        // Therefore, we must not edit anything after a savepoint
1760        // It would be better for non-durable transaction's unpersisted pages to be kept in-memory
1761        // in a data structure where the allocated list can be efficiently edited
1762        if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint() {
1763            free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
1764        }
1765        self.process_freed_pages_nondurable(free_until_transaction)?;
1766
1767        let mut post_commit_frees = vec![];
1768
1769        let system_root = {
1770            let mut system_tables = self.system_tables.lock().unwrap();
1771            let system_freed_pages = system_tables.system_freed_pages();
1772            system_tables.table_tree.flush_table_root_updates()?;
1773            for page in system_freed_pages
1774                .lock()
1775                .unwrap()
1776                .extract_if(.., |p| self.mem.unpersisted(*p))
1777            {
1778                post_commit_frees.push(page);
1779            }
1780            // Store all freed pages for a future commit(), since we can't free pages during a
1781            // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
1782            self.store_system_freed_pages(
1783                &mut system_tables.table_tree,
1784                system_freed_pages,
1785                Some(&mut post_commit_frees),
1786                &mut 0,
1787            )?;
1788
1789            system_tables
1790                .table_tree
1791                .flush_table_root_updates()?
1792                .finalize_dirty_checksums()?
1793        };
1794
1795        Ok((system_root, post_commit_frees))
1796    }
1797
1798    // Relocate pages to lower number regions/pages
1799    // Returns true if a page(s) was moved
1800    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1801        let mut progress = false;
1802
1803        // Find the 1M highest pages
1804        let mut highest_pages = BTreeMap::new();
1805        let mut tables = self.tables.lock().unwrap();
1806        let table_tree = &mut tables.table_tree;
1807        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1808        let mut system_tables = self.system_tables.lock().unwrap();
1809        let system_table_tree = &mut system_tables.table_tree;
1810        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1811
1812        // Calculate how many of them can be relocated to lower pages, starting from the last page
1813        let mut relocation_map = HashMap::new();
1814        for path in highest_pages.into_values().rev() {
1815            if relocation_map.contains_key(&path.page_number()) {
1816                continue;
1817            }
1818            let old_page = self.mem.get_page(path.page_number())?;
1819            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1820            let new_page_number = new_page.get_page_number();
1821            // We have to copy at least the page type into the new page.
1822            // Otherwise its cache priority will be calculated incorrectly
1823            new_page.memory_mut()[0] = old_page.memory()[0];
1824            drop(new_page);
1825            // We're able to move this to a lower page, so insert it and rewrite all its parents
1826            if new_page_number < path.page_number() {
1827                relocation_map.insert(path.page_number(), new_page_number);
1828                for parent in path.parents() {
1829                    if relocation_map.contains_key(parent) {
1830                        continue;
1831                    }
1832                    let old_parent = self.mem.get_page(*parent)?;
1833                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1834                    let new_page_number = new_page.get_page_number();
1835                    // We have to copy at least the page type into the new page.
1836                    // Otherwise its cache priority will be calculated incorrectly
1837                    new_page.memory_mut()[0] = old_parent.memory()[0];
1838                    drop(new_page);
1839                    relocation_map.insert(*parent, new_page_number);
1840                }
1841            } else {
1842                self.mem
1843                    .free(new_page_number, &mut PageTrackerPolicy::Ignore);
1844                break;
1845            }
1846        }
1847
1848        if !relocation_map.is_empty() {
1849            progress = true;
1850        }
1851
1852        table_tree.relocate_tables(&relocation_map)?;
1853        system_table_tree.relocate_tables(&relocation_map)?;
1854
1855        Ok(progress)
1856    }
1857
1858    // NOTE: must be called before store_system_freed_pages() during commit, since this can create
1859    // more pages freed by the current transaction
1860    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1861        // We assume below that PageNumber is length 8
1862        assert_eq!(PageNumber::serialized_size(), 8);
1863
1864        // Handle the data freed tree
1865        let mut system_tables = self.system_tables.lock().unwrap();
1866        {
1867            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1868            let key = TransactionIdWithPagination {
1869                transaction_id: free_until.raw_id(),
1870                pagination_id: 0,
1871            };
1872            for entry in data_freed.extract_from_if(..key, |_, _| true)? {
1873                let (_, page_list) = entry?;
1874                for i in 0..page_list.value().len() {
1875                    self.mem
1876                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1877                }
1878            }
1879        }
1880
1881        // Handle the system freed tree
1882        {
1883            let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
1884            let key = TransactionIdWithPagination {
1885                transaction_id: free_until.raw_id(),
1886                pagination_id: 0,
1887            };
1888            for entry in system_freed.extract_from_if(..key, |_, _| true)? {
1889                let (_, page_list) = entry?;
1890                for i in 0..page_list.value().len() {
1891                    self.mem
1892                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1893                }
1894            }
1895        }
1896
1897        Ok(())
1898    }
1899
1900    fn process_freed_pages_nondurable_helper(
1901        &mut self,
1902        free_until: TransactionId,
1903        definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
1904    ) -> Result<Vec<TransactionId>> {
1905        let mut processed = vec![];
1906        let mut system_tables = self.system_tables.lock().unwrap();
1907
1908        let last_key = TransactionIdWithPagination {
1909            transaction_id: free_until.raw_id(),
1910            pagination_id: 0,
1911        };
1912        let oldest_unprocessed = self
1913            .transaction_tracker
1914            .oldest_unprocessed_non_durable_commit()
1915            .map_or(free_until.raw_id(), |x| x.raw_id());
1916        let first_key = TransactionIdWithPagination {
1917            transaction_id: oldest_unprocessed,
1918            pagination_id: 0,
1919        };
1920        let mut data_freed = system_tables.open_system_table(self, definition)?;
1921
1922        let mut candidate_transactions = vec![];
1923        for entry in data_freed.range(first_key..last_key)? {
1924            let (key, _) = entry?;
1925            let transaction_id = TransactionId::new(key.value().transaction_id);
1926            if self
1927                .transaction_tracker
1928                .is_unprocessed_non_durable_commit(transaction_id)
1929            {
1930                candidate_transactions.push(transaction_id);
1931            }
1932        }
1933        for transaction_id in candidate_transactions {
1934            let mut key = TransactionIdWithPagination {
1935                transaction_id: transaction_id.raw_id(),
1936                pagination_id: 0,
1937            };
1938            loop {
1939                let Some(entry) = data_freed.get(&key)? else {
1940                    break;
1941                };
1942                let pages = entry.value();
1943                let mut new_pages = vec![];
1944                for i in 0..pages.len() {
1945                    let page = pages.get(i);
1946                    if !self
1947                        .mem
1948                        .free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
1949                    {
1950                        new_pages.push(page);
1951                    }
1952                }
1953                if new_pages.len() != pages.len() {
1954                    drop(entry);
1955                    if new_pages.is_empty() {
1956                        data_freed.remove(&key)?;
1957                    } else {
1958                        let required = PageList::required_bytes(new_pages.len());
1959                        let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
1960                        for page in new_pages {
1961                            page_list_mut.as_mut().push_back(page);
1962                        }
1963                    }
1964                }
1965                key.pagination_id += 1;
1966            }
1967            processed.push(transaction_id);
1968        }
1969
1970        Ok(processed)
1971    }
1972
1973    // NOTE: must be called before store_system_freed_pages() during commit, since this can create
1974    // more pages freed by the current transaction
1975    //
1976    // This method only frees pages that are unpersisted, in non-durable transactions, since
1977    // it is called from a non-durable commit() and therefore can't modify anything that the
1978    // on-disk state in the last durable transaction might reference.
1979    fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
1980        // We assume below that PageNumber is length 8
1981        assert_eq!(PageNumber::serialized_size(), 8);
1982
1983        // Handle the data freed tree
1984        let mut processed =
1985            self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
1986
1987        // Handle the system freed tree
1988        processed
1989            .extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
1990
1991        for transaction_id in processed {
1992            self.transaction_tracker
1993                .mark_unprocessed_non_durable_commit(transaction_id);
1994        }
1995
1996        Ok(())
1997    }
1998
1999    fn store_system_freed_pages(
2000        &self,
2001        system_tree: &mut TableTreeMut,
2002        system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
2003        mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
2004        pagination_counter: &mut u64,
2005    ) -> Result {
2006        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
2007
2008        system_tree.open_table_and_flush_table_root(
2009            SYSTEM_FREED_TABLE.name(),
2010            |system_freed_tree: &mut SystemFreedTree| {
2011                while !system_freed_pages.lock().unwrap().is_empty() {
2012                    let chunk_size = 200;
2013                    let buffer_size = PageList::required_bytes(chunk_size);
2014                    let key = TransactionIdWithPagination {
2015                        transaction_id: self.transaction_id.raw_id(),
2016                        pagination_id: *pagination_counter,
2017                    };
2018                    let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
2019
2020                    let mut freed_pages = system_freed_pages.lock().unwrap();
2021                    let len = freed_pages.len();
2022                    access_guard.as_mut().clear();
2023                    for page in freed_pages.drain(len - min(len, chunk_size)..) {
2024                        if let Some(ref mut unpersisted_pages) = unpersisted_pages
2025                            && self.mem.unpersisted(page)
2026                        {
2027                            unpersisted_pages.push(page);
2028                        } else {
2029                            access_guard.as_mut().push_back(page);
2030                        }
2031                    }
2032                    drop(access_guard);
2033
2034                    *pagination_counter += 1;
2035                }
2036                Ok(())
2037            },
2038        )?;
2039
2040        Ok(())
2041    }
2042
2043    /// Retrieves information about storage usage in the database
2044    pub fn stats(&self) -> Result<DatabaseStats> {
2045        let tables = self.tables.lock().unwrap();
2046        let table_tree = &tables.table_tree;
2047        let data_tree_stats = table_tree.stats()?;
2048
2049        let system_tables = self.system_tables.lock().unwrap();
2050        let system_table_tree = &system_tables.table_tree;
2051        let system_tree_stats = system_table_tree.stats()?;
2052
2053        let total_metadata_bytes = data_tree_stats.metadata_bytes()
2054            + system_tree_stats.metadata_bytes
2055            + system_tree_stats.stored_leaf_bytes;
2056        let total_fragmented = data_tree_stats.fragmented_bytes()
2057            + system_tree_stats.fragmented_bytes
2058            + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
2059
2060        Ok(DatabaseStats {
2061            tree_height: data_tree_stats.tree_height(),
2062            allocated_pages: self.mem.count_allocated_pages()?,
2063            leaf_pages: data_tree_stats.leaf_pages(),
2064            branch_pages: data_tree_stats.branch_pages(),
2065            stored_leaf_bytes: data_tree_stats.stored_bytes(),
2066            metadata_bytes: total_metadata_bytes,
2067            fragmented_bytes: total_fragmented,
2068            page_size: self.mem.get_page_size(),
2069        })
2070    }
2071
2072    #[allow(dead_code)]
2073    pub(crate) fn print_debug(&self) -> Result {
2074        // Flush any pending updates to make sure we get the latest root
2075        let mut tables = self.tables.lock().unwrap();
2076        if let Some(page) = tables
2077            .table_tree
2078            .flush_table_root_updates()
2079            .unwrap()
2080            .finalize_dirty_checksums()
2081            .unwrap()
2082        {
2083            eprintln!("Master tree:");
2084            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
2085                Some(page),
2086                PageHint::None,
2087                self.transaction_guard.clone(),
2088                self.mem.clone(),
2089            )?;
2090            master_tree.print_debug(true)?;
2091        }
2092
2093        // Flush any pending updates to make sure we get the latest root
2094        let mut system_tables = self.system_tables.lock().unwrap();
2095        if let Some(page) = system_tables
2096            .table_tree
2097            .flush_table_root_updates()
2098            .unwrap()
2099            .finalize_dirty_checksums()
2100            .unwrap()
2101        {
2102            eprintln!("System tree:");
2103            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
2104                Some(page),
2105                PageHint::None,
2106                self.transaction_guard.clone(),
2107                self.mem.clone(),
2108            )?;
2109            master_tree.print_debug(true)?;
2110        }
2111
2112        Ok(())
2113    }
2114}
2115
2116impl Drop for WriteTransaction {
2117    fn drop(&mut self) {
2118        if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
2119            #[allow(unused_variables)]
2120            if let Err(error) = self.abort_inner() {
2121                #[cfg(feature = "logging")]
2122                warn!("Failure automatically aborting transaction: {error}");
2123            }
2124        } else if !self.completed && self.mem.storage_failure() {
2125            self.tables
2126                .lock()
2127                .unwrap()
2128                .table_tree
2129                .clear_root_updates_and_close();
2130        }
2131    }
2132}
2133
2134/// A read-only transaction
2135///
2136/// Read-only transactions may exist concurrently with writes
2137pub struct ReadTransaction {
2138    mem: Arc<TransactionalMemory>,
2139    tree: TableTree,
2140}
2141
2142impl ReadTransaction {
2143    pub(crate) fn new(
2144        mem: Arc<TransactionalMemory>,
2145        guard: TransactionGuard,
2146    ) -> Result<Self, TransactionError> {
2147        let root_page = mem.get_data_root();
2148        let guard = Arc::new(guard);
2149        Ok(Self {
2150            mem: mem.clone(),
2151            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
2152                .map_err(TransactionError::Storage)?,
2153        })
2154    }
2155
2156    /// Open the given table
2157    pub fn open_table<K: Key + 'static, V: Value + 'static>(
2158        &self,
2159        definition: TableDefinition<K, V>,
2160    ) -> Result<ReadOnlyTable<K, V>, TableError> {
2161        let header = self
2162            .tree
2163            .get_table::<K, V>(definition.name(), TableType::Normal)?
2164            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2165
2166        match header {
2167            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
2168                definition.name().to_string(),
2169                table_root,
2170                PageHint::Clean,
2171                self.tree.transaction_guard().clone(),
2172                self.mem.clone(),
2173            )?),
2174            InternalTableDefinition::Multimap { .. } => unreachable!(),
2175        }
2176    }
2177
2178    /// Open the given table without a type
2179    pub fn open_untyped_table(
2180        &self,
2181        handle: impl TableHandle,
2182    ) -> Result<ReadOnlyUntypedTable, TableError> {
2183        let header = self
2184            .tree
2185            .get_table_untyped(handle.name(), TableType::Normal)?
2186            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2187
2188        match header {
2189            InternalTableDefinition::Normal {
2190                table_root,
2191                fixed_key_size,
2192                fixed_value_size,
2193                ..
2194            } => Ok(ReadOnlyUntypedTable::new(
2195                table_root,
2196                fixed_key_size,
2197                fixed_value_size,
2198                self.mem.clone(),
2199            )),
2200            InternalTableDefinition::Multimap { .. } => unreachable!(),
2201        }
2202    }
2203
2204    /// Open the given table
2205    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
2206        &self,
2207        definition: MultimapTableDefinition<K, V>,
2208    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
2209        let header = self
2210            .tree
2211            .get_table::<K, V>(definition.name(), TableType::Multimap)?
2212            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2213
2214        match header {
2215            InternalTableDefinition::Normal { .. } => unreachable!(),
2216            InternalTableDefinition::Multimap {
2217                table_root,
2218                table_length,
2219                ..
2220            } => Ok(ReadOnlyMultimapTable::new(
2221                table_root,
2222                table_length,
2223                PageHint::Clean,
2224                self.tree.transaction_guard().clone(),
2225                self.mem.clone(),
2226            )?),
2227        }
2228    }
2229
2230    /// Open the given table without a type
2231    pub fn open_untyped_multimap_table(
2232        &self,
2233        handle: impl MultimapTableHandle,
2234    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
2235        let header = self
2236            .tree
2237            .get_table_untyped(handle.name(), TableType::Multimap)?
2238            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2239
2240        match header {
2241            InternalTableDefinition::Normal { .. } => unreachable!(),
2242            InternalTableDefinition::Multimap {
2243                table_root,
2244                table_length,
2245                fixed_key_size,
2246                fixed_value_size,
2247                ..
2248            } => Ok(ReadOnlyUntypedMultimapTable::new(
2249                table_root,
2250                table_length,
2251                fixed_key_size,
2252                fixed_value_size,
2253                self.mem.clone(),
2254            )),
2255        }
2256    }
2257
2258    /// List all the tables
2259    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
2260        self.tree
2261            .list_tables(TableType::Normal)
2262            .map(|x| x.into_iter().map(UntypedTableHandle::new))
2263    }
2264
2265    /// List all the multimap tables
2266    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
2267        self.tree
2268            .list_tables(TableType::Multimap)
2269            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
2270    }
2271
2272    /// Close the transaction
2273    ///
2274    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
2275    /// so this method does not normally need to be called.
2276    /// This method can be used to ensure that there are no outstanding objects remaining.
2277    ///
2278    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
2279    pub fn close(self) -> Result<(), TransactionError> {
2280        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
2281            return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
2282        }
2283        // No-op, just drop ourself
2284        Ok(())
2285    }
2286}
2287
2288impl Debug for ReadTransaction {
2289    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2290        f.write_str("ReadTransaction")
2291    }
2292}
2293
2294#[cfg(test)]
2295mod test {
2296    use crate::{Database, TableDefinition};
2297
2298    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
2299
2300    #[test]
2301    fn transaction_id_persistence() {
2302        let tmpfile = crate::create_tempfile();
2303        let db = Database::create(tmpfile.path()).unwrap();
2304        let write_txn = db.begin_write().unwrap();
2305        {
2306            let mut table = write_txn.open_table(X).unwrap();
2307            table.insert("hello", "world").unwrap();
2308        }
2309        let first_txn_id = write_txn.transaction_id;
2310        write_txn.commit().unwrap();
2311        drop(db);
2312
2313        let db2 = Database::create(tmpfile.path()).unwrap();
2314        let write_txn = db2.begin_write().unwrap();
2315        assert!(write_txn.transaction_id > first_txn_id);
2316    }
2317}