Skip to main content

shodh_redb/
transactions.rs

1use crate::blob_store::reader::BlobReader;
2use crate::blob_store::types::{
3    BlobDedupConfig, BlobId, BlobMeta, BlobRef, BlobStats, CausalEdge, CausalEdgeKey, CausalPath,
4    ContentType, DedupStats, DedupVal, MAX_TAGS_PER_BLOB, NamespaceKey, NamespaceVal, Sha256Key,
5    StoreOptions, TagKey, TemporalKey,
6};
7use crate::blob_store::writer::BlobWriter;
8use crate::cdc::CdcConfig;
9use crate::cdc::types::{CdcEvent, CdcKey, CdcRecord, ChangeStream};
10use crate::compat::{HashMap, HashSet, Mutex};
11use crate::db::TransactionGuard;
12use crate::error::CommitError;
13use crate::multimap_table::ReadOnlyUntypedMultimapTable;
14use crate::sealed::Sealed;
15use crate::table::ReadOnlyUntypedTable;
16use crate::temporal::HybridLogicalClock;
17use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
18use crate::tree_store::{
19    Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
20    PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
21    TableTree, TableTreeMut, TableType, TransactionalMemory, hash64_with_seed, hash128_with_seed,
22};
23use crate::types::{Key, Value};
24use crate::{
25    AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
26    MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
27    Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
28    TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
29    UntypedTableHandle,
30};
31use alloc::boxed::Box;
32use alloc::collections::BTreeMap;
33use alloc::string::{String, ToString};
34use alloc::sync::Arc;
35use alloc::vec;
36use alloc::vec::Vec;
37use core::borrow::Borrow;
38use core::cmp::min;
39use core::fmt::{Debug, Display, Formatter};
40use core::marker::PhantomData;
41use core::mem::size_of;
42use core::ops::{RangeBounds, RangeFull};
43use core::panic;
44use core::sync::atomic::{AtomicBool, Ordering};
45#[cfg(feature = "logging")]
46use log::{debug, warn};
47use sha2::{Digest, Sha256};
48
49const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
50
51fn xxh3_hash64(data: &[u8]) -> u64 {
52    hash64_with_seed(data, 0)
53}
54
55fn xxh3_hash128(data: &[u8]) -> u128 {
56    hash128_with_seed(data, 0)
57}
58const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
59    SystemTableDefinition::new("next_savepoint_id");
60pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
61    SystemTableDefinition::new("persistent_savepoints");
62// Pages that were allocated in the data tree by a given transaction. Only updated when a savepoint
63// exists
64pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
65    TransactionIdWithPagination,
66    PageList,
67> = SystemTableDefinition::new("data_pages_allocated");
68// Pages in the data tree that are in the pending free state: i.e., they are unreachable from the
69// root as of the given transaction.
70pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
71    SystemTableDefinition::new("data_pages_unreachable");
72// Pages in the system tree that are in the pending free state: i.e., they are unreachable from the
73// root as of the given transaction.
74pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
75    SystemTableDefinition::new("system_pages_unreachable");
76// Blob store system tables
77const BLOB_TABLE: SystemTableDefinition<BlobId, BlobMeta> =
78    SystemTableDefinition::new("blob_store");
79const BLOB_TEMPORAL_INDEX: SystemTableDefinition<TemporalKey, ()> =
80    SystemTableDefinition::new("blob_temporal_idx");
81const BLOB_CAUSAL_CHILDREN: SystemTableDefinition<BlobId, BlobId> =
82    SystemTableDefinition::new("blob_causal_children");
83const BLOB_CAUSAL_EDGES: SystemTableDefinition<CausalEdgeKey, CausalEdge> =
84    SystemTableDefinition::new("blob_causal_edges_v2");
85const BLOB_TAG_INDEX: SystemTableDefinition<TagKey, ()> =
86    SystemTableDefinition::new("blob_tag_idx");
87const BLOB_NAMESPACE: SystemTableDefinition<BlobId, NamespaceVal> =
88    SystemTableDefinition::new("blob_namespace");
89const BLOB_NAMESPACE_INDEX: SystemTableDefinition<NamespaceKey, ()> =
90    SystemTableDefinition::new("blob_namespace_idx");
91const BLOB_DEDUP_INDEX: SystemTableDefinition<Sha256Key, DedupVal> =
92    SystemTableDefinition::new("blob_dedup_idx");
93const BLOB_DEDUP_MAP: SystemTableDefinition<BlobId, Sha256Key> =
94    SystemTableDefinition::new("blob_dedup_map");
95const CDC_LOG_TABLE: SystemTableDefinition<CdcKey, CdcRecord> =
96    SystemTableDefinition::new("cdc_log");
97const CDC_CURSOR_TABLE: SystemTableDefinition<&str, u64> =
98    SystemTableDefinition::new("cdc_cursors");
99const HISTORY_TABLE: SystemTableDefinition<u64, HistorySnapshot> =
100    SystemTableDefinition::new("transaction_history");
101// ---------------------------------------------------------------------------
102// HistorySnapshot -- fixed-width Value for time-travel history table
103// ---------------------------------------------------------------------------
104
105/// Fixed-width snapshot of a committed transaction's state, stored in the
106/// `transaction_history` system table for time-travel reads.
107///
108/// Binary layout (73 bytes):
109/// ```text
110/// [user_root_non_null: u8]           offset 0
111/// [user_root: 32 bytes BtreeHeader]  offset 1   (PageNumber[8] + Checksum[16] + length[8])
112/// [timestamp_ms: u64 LE]             offset 33
113/// [blob_region_offset: u64 LE]       offset 41
114/// [blob_region_length: u64 LE]       offset 49
115/// [blob_next_sequence: u64 LE]       offset 57
116/// [blob_hlc_state: u64 LE]           offset 65
117/// ```
118#[derive(Debug, Clone)]
119pub(crate) struct HistorySnapshot {
120    data: [u8; Self::SIZE],
121}
122
123impl HistorySnapshot {
124    const SIZE: usize = 1 + BtreeHeader::serialized_size() + 5 * size_of::<u64>();
125
126    const USER_ROOT_FLAG: usize = 0;
127    const USER_ROOT: usize = 1;
128    const TIMESTAMP: usize = 1 + BtreeHeader::serialized_size();
129    const BLOB_OFFSET: usize = Self::TIMESTAMP + size_of::<u64>();
130    const BLOB_LENGTH: usize = Self::BLOB_OFFSET + size_of::<u64>();
131    const BLOB_SEQUENCE: usize = Self::BLOB_LENGTH + size_of::<u64>();
132    const BLOB_HLC: usize = Self::BLOB_SEQUENCE + size_of::<u64>();
133
134    const USER_ROOT_END: usize = Self::USER_ROOT + BtreeHeader::serialized_size();
135
136    pub(crate) fn new(
137        user_root: Option<BtreeHeader>,
138        timestamp_ms: u64,
139        blob_region_offset: u64,
140        blob_region_length: u64,
141        blob_next_sequence: u64,
142        blob_hlc_state: u64,
143    ) -> Self {
144        let mut data = [0u8; Self::SIZE];
145        if let Some(root) = user_root {
146            data[Self::USER_ROOT_FLAG] = 1;
147            data[Self::USER_ROOT..Self::USER_ROOT_END].copy_from_slice(&root.to_le_bytes());
148        }
149        data[Self::TIMESTAMP..Self::TIMESTAMP + 8].copy_from_slice(&timestamp_ms.to_le_bytes());
150        data[Self::BLOB_OFFSET..Self::BLOB_OFFSET + 8]
151            .copy_from_slice(&blob_region_offset.to_le_bytes());
152        data[Self::BLOB_LENGTH..Self::BLOB_LENGTH + 8]
153            .copy_from_slice(&blob_region_length.to_le_bytes());
154        data[Self::BLOB_SEQUENCE..Self::BLOB_SEQUENCE + 8]
155            .copy_from_slice(&blob_next_sequence.to_le_bytes());
156        data[Self::BLOB_HLC..Self::BLOB_HLC + 8].copy_from_slice(&blob_hlc_state.to_le_bytes());
157        Self { data }
158    }
159
160    pub(crate) fn user_root(&self) -> Option<BtreeHeader> {
161        if self.data[Self::USER_ROOT_FLAG] != 0 {
162            self.data[Self::USER_ROOT..Self::USER_ROOT_END]
163                .try_into()
164                .ok()
165                .map(BtreeHeader::from_le_bytes)
166        } else {
167            None
168        }
169    }
170
171    pub(crate) fn timestamp_ms(&self) -> u64 {
172        self.data[Self::TIMESTAMP..Self::TIMESTAMP + 8]
173            .try_into()
174            .map(u64::from_le_bytes)
175            .unwrap_or(0)
176    }
177}
178
179impl Value for HistorySnapshot {
180    type SelfType<'a>
181        = HistorySnapshot
182    where
183        Self: 'a;
184    type AsBytes<'a>
185        = [u8; HistorySnapshot::SIZE]
186    where
187        Self: 'a;
188
189    fn fixed_width() -> Option<usize> {
190        Some(Self::SIZE)
191    }
192
193    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
194    where
195        Self: 'a,
196    {
197        let mut buf = [0u8; Self::SIZE];
198        buf.copy_from_slice(&data[..Self::SIZE]);
199        Self { data: buf }
200    }
201
202    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
203    where
204        Self: 'b,
205    {
206        value.data
207    }
208
209    fn type_name() -> TypeName {
210        TypeName::internal("redb::HistorySnapshot")
211    }
212}
213
214// The allocator state table is stored in the system table tree, but it's accessed using
215// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition
216pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
217pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
218pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
219pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
220
221// Format:
222// 2 bytes: length
223// length * size_of(PageNumber): array of page numbers
224#[derive(Debug)]
225pub(crate) struct PageList<'a> {
226    data: &'a [u8],
227}
228
229impl PageList<'_> {
230    fn required_bytes(len: usize) -> usize {
231        2 + PageNumber::serialized_size() * len
232    }
233
234    pub(crate) fn len(&self) -> usize {
235        self.data[..size_of::<u16>()]
236            .try_into()
237            .map(|b| u16::from_le_bytes(b) as usize)
238            .unwrap_or(0)
239    }
240
241    pub(crate) fn get(&self, index: usize) -> PageNumber {
242        let start = size_of::<u16>() + PageNumber::serialized_size() * index;
243        self.data[start..(start + PageNumber::serialized_size())]
244            .try_into()
245            .map(PageNumber::from_le_bytes)
246            .unwrap_or(PageNumber::new(0, 0, 0))
247    }
248}
249
250impl Value for PageList<'_> {
251    type SelfType<'a>
252        = PageList<'a>
253    where
254        Self: 'a;
255    type AsBytes<'a>
256        = &'a [u8]
257    where
258        Self: 'a;
259
260    fn fixed_width() -> Option<usize> {
261        None
262    }
263
264    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
265    where
266        Self: 'a,
267    {
268        PageList { data }
269    }
270
271    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
272    where
273        Self: 'b,
274    {
275        value.data
276    }
277
278    fn type_name() -> TypeName {
279        TypeName::internal("redb::PageList")
280    }
281}
282
283impl MutInPlaceValue for PageList<'_> {
284    type BaseRefType = PageListMut;
285
286    fn initialize(data: &mut [u8]) {
287        debug_assert!(data.len() >= 8);
288        // Set the length to zero
289        if data.len() >= 8 {
290            data[..8].fill(0);
291        }
292    }
293
294    fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
295        // SAFETY: PageListMut is #[repr(transparent)] over [u8], so it has the
296        // same size, alignment, and memory layout. The pointer cast preserves
297        // the slice length metadata and the mutable borrow guarantees exclusive
298        // access for the lifetime of the returned reference.
299        unsafe { &mut *(core::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
300    }
301}
302
303#[derive(Debug)]
304pub(crate) struct TransactionIdWithPagination {
305    pub(crate) transaction_id: u64,
306    pub(crate) pagination_id: u64,
307}
308
309impl Value for TransactionIdWithPagination {
310    type SelfType<'a>
311        = TransactionIdWithPagination
312    where
313        Self: 'a;
314    type AsBytes<'a>
315        = [u8; 2 * size_of::<u64>()]
316    where
317        Self: 'a;
318
319    fn fixed_width() -> Option<usize> {
320        Some(2 * size_of::<u64>())
321    }
322
323    #[allow(clippy::big_endian_bytes)]
324    fn from_bytes<'a>(data: &'a [u8]) -> Self
325    where
326        Self: 'a,
327    {
328        let transaction_id = data[..size_of::<u64>()]
329            .try_into()
330            .map(u64::from_be_bytes)
331            .unwrap_or(0);
332        let pagination_id = data[size_of::<u64>()..]
333            .try_into()
334            .map(u64::from_be_bytes)
335            .unwrap_or(0);
336        Self {
337            transaction_id,
338            pagination_id,
339        }
340    }
341
342    #[allow(clippy::big_endian_bytes)]
343    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
344    where
345        Self: 'b,
346    {
347        let mut result = [0u8; 2 * size_of::<u64>()];
348        result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_be_bytes());
349        result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_be_bytes());
350        result
351    }
352
353    fn type_name() -> TypeName {
354        TypeName::internal("redb::TransactionIdWithPagination")
355    }
356}
357
358impl Key for TransactionIdWithPagination {
359    fn compare(data1: &[u8], data2: &[u8]) -> core::cmp::Ordering {
360        // Big-endian serialization means raw byte comparison is correct.
361        let len = (2 * size_of::<u64>()).min(data1.len()).min(data2.len());
362        data1[..len]
363            .cmp(&data2[..len])
364            .then_with(|| data1.len().cmp(&data2.len()))
365    }
366}
367
368#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
369pub(crate) enum AllocatorStateKey {
370    Deprecated,
371    Region(u32),
372    RegionTracker,
373    TransactionId,
374}
375
376impl Value for AllocatorStateKey {
377    type SelfType<'a> = Self;
378    type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
379
380    fn fixed_width() -> Option<usize> {
381        Some(1 + size_of::<u32>())
382    }
383
384    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
385    where
386        Self: 'a,
387    {
388        match data[0] {
389            3 => Self::Region(data[1..].try_into().map(u32::from_le_bytes).unwrap_or(0)),
390            4 => Self::RegionTracker,
391            5 => Self::TransactionId,
392            // 0, 1, 2 were used in redb 2.x; unknown discriminants are also
393            // treated as deprecated to avoid panicking on corrupt data.
394            _ => Self::Deprecated,
395        }
396    }
397
398    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
399    where
400        Self: 'a,
401        Self: 'b,
402    {
403        let mut result = Self::AsBytes::default();
404        match value {
405            Self::Region(region) => {
406                result[0] = 3;
407                result[1..].copy_from_slice(&u32::to_le_bytes(*region));
408            }
409            Self::RegionTracker => {
410                result[0] = 4;
411            }
412            Self::TransactionId => {
413                result[0] = 5;
414            }
415            AllocatorStateKey::Deprecated => {
416                result[0] = 0;
417            }
418        }
419
420        result
421    }
422
423    fn type_name() -> TypeName {
424        TypeName::internal("redb::AllocatorStateKey")
425    }
426}
427
428impl Key for AllocatorStateKey {
429    fn compare(data1: &[u8], data2: &[u8]) -> core::cmp::Ordering {
430        Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
431    }
432}
433
434pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
435    name: &'a str,
436    _key_type: PhantomData<K>,
437    _value_type: PhantomData<V>,
438}
439
440impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
441    pub const fn new(name: &'a str) -> Self {
442        assert!(!name.is_empty());
443        Self {
444            name,
445            _key_type: PhantomData,
446            _value_type: PhantomData,
447        }
448    }
449}
450
451impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
452    fn name(&self) -> &str {
453        self.name
454    }
455}
456
457impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
458
459impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
460    fn clone(&self) -> Self {
461        *self
462    }
463}
464
465impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
466
467impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
468    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
469        write!(
470            f,
471            "{}<{}, {}>",
472            self.name,
473            K::type_name().name(),
474            V::type_name().name()
475        )
476    }
477}
478
479/// Informational storage stats about the database
480#[derive(Debug, Clone, Copy)]
481pub struct DatabaseStats {
482    pub(crate) tree_height: u32,
483    pub(crate) allocated_pages: u64,
484    pub(crate) free_pages: u64,
485    pub(crate) trailing_free_pages: u64,
486    pub(crate) leaf_pages: u64,
487    pub(crate) branch_pages: u64,
488    pub(crate) stored_leaf_bytes: u64,
489    pub(crate) metadata_bytes: u64,
490    pub(crate) fragmented_bytes: u64,
491    pub(crate) page_size: usize,
492}
493
494impl DatabaseStats {
495    /// Maximum traversal distance to reach the deepest (key, value) pair, across all tables
496    pub fn tree_height(&self) -> u32 {
497        self.tree_height
498    }
499
500    /// Number of pages allocated
501    pub fn allocated_pages(&self) -> u64 {
502        self.allocated_pages
503    }
504
505    /// Number of pages currently free in the buddy allocator, available for immediate reuse
506    pub fn free_pages(&self) -> u64 {
507        self.free_pages
508    }
509
510    /// Number of contiguous free pages at the end of the database file.
511    /// These can be reclaimed by compaction to shrink the file.
512    pub fn trailing_free_pages(&self) -> u64 {
513        self.trailing_free_pages
514    }
515
516    /// Number of leaf pages that store user data
517    pub fn leaf_pages(&self) -> u64 {
518        self.leaf_pages
519    }
520
521    /// Number of branch pages in btrees that store user data
522    pub fn branch_pages(&self) -> u64 {
523        self.branch_pages
524    }
525
526    /// Number of bytes consumed by keys and values that have been inserted.
527    /// Does not include indexing overhead
528    pub fn stored_bytes(&self) -> u64 {
529        self.stored_leaf_bytes
530    }
531
532    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
533    pub fn metadata_bytes(&self) -> u64 {
534        self.metadata_bytes
535    }
536
537    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
538    pub fn fragmented_bytes(&self) -> u64 {
539        self.fragmented_bytes
540    }
541
542    /// Number of bytes per page
543    pub fn page_size(&self) -> usize {
544        self.page_size
545    }
546}
547
548#[derive(Copy, Clone, Debug)]
549#[non_exhaustive]
550pub enum Durability {
551    /// Commits with this durability level will not be persisted to disk unless followed by a
552    /// commit with [`Durability::Immediate`].
553    None,
554    /// Commits with this durability level are guaranteed to be persistent as soon as
555    /// [`WriteTransaction::commit`] returns.
556    Immediate,
557}
558
559// These are the actual durability levels used internally. `Durability::Paranoid` is translated
560// to `InternalDurability::Immediate`, and also enables 2-phase commit
561#[derive(Copy, Clone, Debug, PartialEq, Eq)]
562enum InternalDurability {
563    None,
564    Immediate,
565}
566
567// Like a Table but only one may be open at a time to avoid possible races
568pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
569    name: String,
570    namespace: &'s mut SystemNamespace<'db>,
571    tree: BtreeMut<'s, K, V>,
572    transaction_guard: Arc<TransactionGuard>,
573}
574
575impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
576    fn new(
577        name: &str,
578        table_root: Option<BtreeHeader>,
579        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
580        guard: Arc<TransactionGuard>,
581        mem: Arc<TransactionalMemory>,
582        namespace: &'s mut SystemNamespace<'db>,
583    ) -> SystemTable<'db, 's, K, V> {
584        // No need to track allocations in the system tree. Savepoint restoration only relies on
585        // freeing in the data tree
586        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
587        SystemTable {
588            name: name.to_string(),
589            namespace,
590            tree: BtreeMut::new_uncompressed(table_root, guard.clone(), mem, freed_pages, ignore),
591            transaction_guard: guard,
592        }
593    }
594
595    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
596    where
597        K: 'a,
598    {
599        self.tree.get(key.borrow())
600    }
601
602    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
603    where
604        K: 'a,
605        KR: Borrow<K::SelfType<'a>> + 'a,
606    {
607        self.tree
608            .range(&range)
609            .map(|x| Range::new(x, self.transaction_guard.clone()))
610    }
611
612    pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
613        &mut self,
614        range: impl RangeBounds<KR> + 'a,
615        predicate: F,
616    ) -> Result<ExtractIf<'_, K, V, F>>
617    where
618        KR: Borrow<K::SelfType<'a>> + 'a,
619    {
620        self.tree
621            .extract_from_if(&range, predicate)
622            .map(ExtractIf::new)
623    }
624
625    pub fn insert<'k, 'v>(
626        &mut self,
627        key: impl Borrow<K::SelfType<'k>>,
628        value: impl Borrow<V::SelfType<'v>>,
629    ) -> Result<Option<AccessGuard<'_, V>>> {
630        let value_len = V::as_bytes(value.borrow()).as_ref().len();
631        if value_len > MAX_VALUE_LENGTH {
632            return Err(StorageError::ValueTooLarge(value_len));
633        }
634        let key_len = K::as_bytes(key.borrow()).as_ref().len();
635        if key_len > MAX_VALUE_LENGTH {
636            return Err(StorageError::ValueTooLarge(key_len));
637        }
638        if value_len + key_len > MAX_PAIR_LENGTH {
639            return Err(StorageError::ValueTooLarge(value_len + key_len));
640        }
641        self.tree.insert(key.borrow(), value.borrow())
642    }
643
644    pub fn remove<'a>(
645        &mut self,
646        key: impl Borrow<K::SelfType<'a>>,
647    ) -> Result<Option<AccessGuard<'_, V>>>
648    where
649        K: 'a,
650    {
651        self.tree.remove(key.borrow())
652    }
653}
654
655impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
656    pub fn insert_reserve<'a>(
657        &mut self,
658        key: impl Borrow<K::SelfType<'a>>,
659        value_length: usize,
660    ) -> Result<AccessGuardMutInPlace<'_, V>> {
661        if value_length > MAX_VALUE_LENGTH {
662            return Err(StorageError::ValueTooLarge(value_length));
663        }
664        let key_len = K::as_bytes(key.borrow()).as_ref().len();
665        if key_len > MAX_VALUE_LENGTH {
666            return Err(StorageError::ValueTooLarge(key_len));
667        }
668        if value_length + key_len > MAX_PAIR_LENGTH {
669            return Err(StorageError::ValueTooLarge(value_length + key_len));
670        }
671        self.tree.insert_reserve(key.borrow(), value_length)
672    }
673}
674
675impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
676    fn drop(&mut self) {
677        self.namespace.close_table(
678            &self.name,
679            &self.tree,
680            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
681        );
682    }
683}
684
685struct SystemNamespace<'db> {
686    table_tree: TableTreeMut<'db>,
687    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
688    transaction_guard: Arc<TransactionGuard>,
689}
690
691impl<'db> SystemNamespace<'db> {
692    fn new(
693        root_page: Option<BtreeHeader>,
694        guard: Arc<TransactionGuard>,
695        mem: Arc<TransactionalMemory>,
696    ) -> Self {
697        // No need to track allocations in the system tree. Savepoint restoration only relies on
698        // freeing in the data tree
699        let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
700        let freed_pages = Arc::new(Mutex::new(vec![]));
701        Self {
702            table_tree: TableTreeMut::new(
703                root_page,
704                guard.clone(),
705                mem,
706                freed_pages.clone(),
707                ignore,
708            ),
709            freed_pages,
710            transaction_guard: guard.clone(),
711        }
712    }
713
714    fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
715        self.freed_pages.clone()
716    }
717
718    fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
719        &'s mut self,
720        transaction: &'txn WriteTransaction,
721        definition: SystemTableDefinition<K, V>,
722    ) -> Result<SystemTable<'db, 's, K, V>> {
723        let (root, _) = self
724            .table_tree
725            .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
726            .map_err(|e| {
727                e.into_storage_error_or_internal("Internal error. System table is corrupted")
728            })?;
729        transaction.dirty.store(true, Ordering::Release);
730
731        Ok(SystemTable::new(
732            definition.name(),
733            root,
734            self.freed_pages.clone(),
735            self.transaction_guard.clone(),
736            transaction.mem.clone(),
737            self,
738        ))
739    }
740
741    fn close_table<K: Key + 'static, V: Value + 'static>(
742        &mut self,
743        name: &str,
744        table: &BtreeMut<K, V>,
745        length: u64,
746    ) {
747        self.table_tree
748            .stage_update_table_root(name, table.get_root(), length);
749    }
750}
751
752struct TableNamespace<'db> {
753    open_tables: HashMap<String, &'static panic::Location<'static>>,
754    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
755    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
756    table_tree: TableTreeMut<'db>,
757}
758
759impl TableNamespace<'_> {
760    fn new(
761        root_page: Option<BtreeHeader>,
762        guard: Arc<TransactionGuard>,
763        mem: Arc<TransactionalMemory>,
764    ) -> Self {
765        let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
766        let freed_pages = Arc::new(Mutex::new(vec![]));
767        let table_tree = TableTreeMut::new(
768            root_page,
769            guard,
770            mem,
771            // Committed pages which are no longer reachable and will be queued for free'ing
772            // These are separated from the system freed pages
773            freed_pages.clone(),
774            allocated.clone(),
775        );
776        Self {
777            open_tables: Default::default(),
778            table_tree,
779            freed_pages,
780            allocated_pages: allocated,
781        }
782    }
783
784    fn set_dirty(&mut self, transaction: &WriteTransaction) {
785        transaction.dirty.store(true, Ordering::Release);
786        if !transaction
787            .transaction_tracker
788            .any_savepoint_exists()
789            .unwrap_or(true)
790        {
791            // No savepoints exist, and we don't allow savepoints to be created in a dirty transaction
792            // so we can disable allocation tracking now
793            *self.allocated_pages.lock() = PageTrackerPolicy::Ignore;
794        }
795    }
796
797    fn set_root(&mut self, root: Option<BtreeHeader>) -> Result<(), StorageError> {
798        if !self.open_tables.is_empty() {
799            return Err(StorageError::Internal(
800                "set_root called with open tables".into(),
801            ));
802        }
803        // Clear pending table root updates accumulated by close_table() calls
804        // that occurred after the savepoint was taken. Without this, commit()
805        // would re-stage post-savepoint roots into the restored tree, producing
806        // a mix of pre- and post-savepoint state.
807        self.table_tree.clear_pending_updates();
808        self.table_tree.set_root(root);
809        Ok(())
810    }
811
812    #[track_caller]
813    fn inner_open<K: Key + 'static, V: Value + 'static>(
814        &mut self,
815        name: &str,
816        table_type: TableType,
817    ) -> Result<(Option<BtreeHeader>, u64), TableError> {
818        if let Some(location) = self.open_tables.get(name) {
819            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
820        }
821
822        let root = self
823            .table_tree
824            .get_or_create_table::<K, V>(name, table_type)?;
825        self.open_tables
826            .insert(name.to_string(), panic::Location::caller());
827
828        Ok(root)
829    }
830
831    #[track_caller]
832    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
833        &mut self,
834        transaction: &'txn WriteTransaction,
835        definition: MultimapTableDefinition<K, V>,
836    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
837        #[cfg(feature = "logging")]
838        debug!("Opening multimap table: {definition}");
839        let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
840        self.set_dirty(transaction);
841
842        Ok(MultimapTable::new(
843            definition.name(),
844            root,
845            length,
846            self.freed_pages.clone(),
847            self.allocated_pages.clone(),
848            transaction.mem.clone(),
849            transaction,
850        ))
851    }
852
853    #[track_caller]
854    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
855        &mut self,
856        transaction: &'txn WriteTransaction,
857        definition: TableDefinition<K, V>,
858    ) -> Result<Table<'txn, K, V>, TableError> {
859        #[cfg(feature = "logging")]
860        debug!("Opening table: {definition}");
861        let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
862        self.set_dirty(transaction);
863
864        Ok(Table::new(
865            definition.name(),
866            root,
867            self.freed_pages.clone(),
868            self.allocated_pages.clone(),
869            transaction.mem.clone(),
870            transaction,
871        ))
872    }
873
874    #[track_caller]
875    fn inner_rename(
876        &mut self,
877        name: &str,
878        new_name: &str,
879        table_type: TableType,
880    ) -> Result<(), TableError> {
881        if let Some(location) = self.open_tables.get(name) {
882            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
883        }
884
885        self.table_tree.rename_table(name, new_name, table_type)
886    }
887
888    #[track_caller]
889    fn rename_table(
890        &mut self,
891        transaction: &WriteTransaction,
892        name: &str,
893        new_name: &str,
894    ) -> Result<(), TableError> {
895        #[cfg(feature = "logging")]
896        debug!("Renaming table: {name} to {new_name}");
897        self.set_dirty(transaction);
898        self.inner_rename(name, new_name, TableType::Normal)
899    }
900
901    #[track_caller]
902    fn rename_multimap_table(
903        &mut self,
904        transaction: &WriteTransaction,
905        name: &str,
906        new_name: &str,
907    ) -> Result<(), TableError> {
908        #[cfg(feature = "logging")]
909        debug!("Renaming multimap table: {name} to {new_name}");
910        self.set_dirty(transaction);
911        self.inner_rename(name, new_name, TableType::Multimap)
912    }
913
914    #[track_caller]
915    fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
916        if let Some(location) = self.open_tables.get(name) {
917            return Err(TableError::TableAlreadyOpen(name.to_string(), location));
918        }
919
920        self.table_tree.delete_table(name, table_type)
921    }
922
923    #[track_caller]
924    fn delete_table(
925        &mut self,
926        transaction: &WriteTransaction,
927        name: &str,
928    ) -> Result<bool, TableError> {
929        #[cfg(feature = "logging")]
930        debug!("Deleting table: {name}");
931        self.set_dirty(transaction);
932        self.inner_delete(name, TableType::Normal)
933    }
934
935    #[track_caller]
936    fn delete_multimap_table(
937        &mut self,
938        transaction: &WriteTransaction,
939        name: &str,
940    ) -> Result<bool, TableError> {
941        #[cfg(feature = "logging")]
942        debug!("Deleting multimap table: {name}");
943        self.set_dirty(transaction);
944        self.inner_delete(name, TableType::Multimap)
945    }
946
947    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
948        &mut self,
949        name: &str,
950        table: &BtreeMut<K, V>,
951        length: u64,
952    ) {
953        // Table should always be present when closing, but gracefully handle the case
954        // where it is not to avoid panicking in production
955        let _ = self.open_tables.remove(name);
956        self.table_tree
957            .stage_update_table_root(name, table.get_root(), length);
958    }
959}
960
961/// A read/write transaction
962///
963/// Only a single [`WriteTransaction`] may exist at a time
964pub struct WriteTransaction {
965    transaction_tracker: Arc<TransactionTracker>,
966    mem: Arc<TransactionalMemory>,
967    transaction_guard: Arc<TransactionGuard>,
968    transaction_id: TransactionId,
969    tables: Mutex<TableNamespace<'static>>,
970    system_tables: Mutex<SystemNamespace<'static>>,
971    completed: bool,
972    dirty: AtomicBool,
973    durability: InternalDurability,
974    two_phase_commit: bool,
975    shrink_policy: ShrinkPolicy,
976    quick_repair: bool,
977    // Persistent savepoints created during this transaction
978    created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
979    deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
980    // Guard: true while a BlobWriter is active, preventing concurrent blob ops
981    blob_writer_active: AtomicBool,
982    // Content-addressable blob dedup configuration
983    blob_dedup_config: BlobDedupConfig,
984    // CDC: in-memory change log, Some when CDC enabled
985    pub(crate) cdc_log: Option<Mutex<Vec<CdcEvent>>>,
986    cdc_config: CdcConfig,
987    history_retention: u64,
988    observer: Arc<dyn crate::observer::DatabaseObserver>,
989    #[cfg(feature = "metrics")]
990    db_metrics: Arc<crate::observer::DbMetrics>,
991}
992
993impl WriteTransaction {
994    #[allow(clippy::too_many_arguments)]
995    pub(crate) fn new(
996        guard: TransactionGuard,
997        transaction_tracker: Arc<TransactionTracker>,
998        mem: Arc<TransactionalMemory>,
999        blob_dedup_config: BlobDedupConfig,
1000        cdc_config: CdcConfig,
1001        history_retention: u64,
1002        observer: Arc<dyn crate::observer::DatabaseObserver>,
1003        #[cfg(feature = "metrics")] db_metrics: Arc<crate::observer::DbMetrics>,
1004    ) -> Result<Self> {
1005        let transaction_id = guard.id()?;
1006        let guard = Arc::new(guard);
1007
1008        let root_page = mem.get_data_root();
1009        let system_page = mem.get_system_root();
1010
1011        let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
1012        let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
1013
1014        Ok(Self {
1015            transaction_tracker,
1016            mem: mem.clone(),
1017            transaction_guard: guard.clone(),
1018            transaction_id,
1019            tables: Mutex::new(tables),
1020            system_tables: Mutex::new(system_tables),
1021            completed: false,
1022            dirty: AtomicBool::new(false),
1023            durability: InternalDurability::Immediate,
1024            two_phase_commit: false,
1025            quick_repair: false,
1026            shrink_policy: ShrinkPolicy::Default,
1027            created_persistent_savepoints: Mutex::new(Default::default()),
1028            deleted_persistent_savepoints: Mutex::new(vec![]),
1029            blob_writer_active: AtomicBool::new(false),
1030            blob_dedup_config,
1031            cdc_log: if cdc_config.enabled {
1032                Some(Mutex::new(Vec::new()))
1033            } else {
1034                None
1035            },
1036            cdc_config,
1037            history_retention,
1038            observer,
1039            #[cfg(feature = "metrics")]
1040            db_metrics,
1041        })
1042    }
1043
1044    /// Record a CDC event. No-op when CDC is disabled.
1045    pub(crate) fn record_cdc(&self, event: CdcEvent) {
1046        if let Some(ref log) = self.cdc_log {
1047            log.lock().push(event);
1048        }
1049    }
1050
1051    pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
1052        self.shrink_policy = shrink_policy;
1053    }
1054
1055    pub(crate) fn pending_free_pages(&self) -> Result<bool> {
1056        let mut system_tables = self.system_tables.lock();
1057        if system_tables
1058            .open_system_table(self, DATA_FREED_TABLE)?
1059            .tree
1060            .get_root()
1061            .is_some()
1062        {
1063            return Ok(true);
1064        }
1065        if system_tables
1066            .open_system_table(self, SYSTEM_FREED_TABLE)?
1067            .tree
1068            .get_root()
1069            .is_some()
1070        {
1071            return Ok(true);
1072        }
1073
1074        Ok(false)
1075    }
1076
1077    #[cfg(all(debug_assertions, feature = "std"))]
1078    pub fn print_allocated_page_debug(&self) {
1079        let mut all_allocated: HashSet<PageNumber> =
1080            HashSet::from_iter(self.mem.all_allocated_pages());
1081
1082        self.mem.debug_check_allocator_consistency();
1083
1084        let mut table_pages = vec![];
1085        self.tables
1086            .lock()
1087            .table_tree
1088            .visit_all_pages(|path| {
1089                table_pages.push(path.page_number());
1090                Ok(())
1091            })
1092            .unwrap();
1093        println!("Tables");
1094        for p in table_pages {
1095            assert!(all_allocated.remove(&p));
1096            println!("{p:?}");
1097        }
1098
1099        let mut system_table_pages = vec![];
1100        self.system_tables
1101            .lock()
1102            .table_tree
1103            .visit_all_pages(|path| {
1104                system_table_pages.push(path.page_number());
1105                Ok(())
1106            })
1107            .unwrap();
1108        println!("System tables");
1109        for p in system_table_pages {
1110            assert!(all_allocated.remove(&p));
1111            println!("{p:?}");
1112        }
1113
1114        {
1115            println!("Pending free (in data freed table)");
1116            let mut system_tables = self.system_tables.lock();
1117            let data_freed = system_tables
1118                .open_system_table(self, DATA_FREED_TABLE)
1119                .unwrap();
1120            for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
1121                let (_, entry) = entry.unwrap();
1122                let value = entry.value();
1123                for i in 0..value.len() {
1124                    let p = value.get(i);
1125                    assert!(all_allocated.remove(&p));
1126                    println!("{p:?}");
1127                }
1128            }
1129        }
1130        {
1131            println!("Pending free (in system freed table)");
1132            let mut system_tables = self.system_tables.lock();
1133            let system_freed = system_tables
1134                .open_system_table(self, SYSTEM_FREED_TABLE)
1135                .unwrap();
1136            for entry in system_freed
1137                .range::<TransactionIdWithPagination>(..)
1138                .unwrap()
1139            {
1140                let (_, entry) = entry.unwrap();
1141                let value = entry.value();
1142                for i in 0..value.len() {
1143                    let p = value.get(i);
1144                    assert!(all_allocated.remove(&p));
1145                    println!("{p:?}");
1146                }
1147            }
1148        }
1149        {
1150            let tables = self.tables.lock();
1151            let pages = tables.freed_pages.lock();
1152            if !pages.is_empty() {
1153                println!("Pages in in-memory data freed_pages");
1154                for p in pages.iter() {
1155                    println!("{p:?}");
1156                    assert!(all_allocated.remove(p));
1157                }
1158            }
1159        }
1160        {
1161            let system_tables = self.system_tables.lock();
1162            let pages = system_tables.freed_pages.lock();
1163            if !pages.is_empty() {
1164                println!("Pages in in-memory system freed_pages");
1165                for p in pages.iter() {
1166                    println!("{p:?}");
1167                    assert!(all_allocated.remove(p));
1168                }
1169            }
1170        }
1171        if !all_allocated.is_empty() {
1172            println!("Leaked pages");
1173            for p in all_allocated {
1174                println!("{p:?}");
1175            }
1176        }
1177    }
1178
1179    /// Creates a snapshot of the current database state, which can be used to rollback the database.
1180    /// This savepoint will exist until it is deleted with `[delete_savepoint()]`.
1181    ///
1182    /// Note that while a savepoint exists, pages that become unused after it was created are not freed.
1183    /// Therefore, the lifetime of a savepoint should be minimized.
1184    ///
1185    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
1186    /// or if the transaction's durability is less than `[Durability::Immediate]`
1187    pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
1188        if self.durability != InternalDurability::Immediate {
1189            return Err(SavepointError::InvalidSavepoint);
1190        }
1191
1192        let mut savepoint = self.ephemeral_savepoint()?;
1193
1194        let mut system_tables = self.system_tables.lock();
1195
1196        let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1197        next_table.insert((), savepoint.get_id().next()?)?;
1198        drop(next_table);
1199
1200        let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1201        savepoint_table.insert(
1202            savepoint.get_id(),
1203            SerializedSavepoint::from_savepoint(&savepoint),
1204        )?;
1205
1206        savepoint.set_persistent();
1207
1208        self.created_persistent_savepoints
1209            .lock()
1210            .insert(savepoint.get_id());
1211
1212        Ok(savepoint.get_id().0)
1213    }
1214
1215    pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
1216        self.transaction_guard.clone()
1217    }
1218
1219    pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
1220        let mut system_tables = self.system_tables.lock();
1221        let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1222        let value = next_table.get(())?;
1223        if let Some(next_id) = value {
1224            Ok(Some(next_id.value()))
1225        } else {
1226            Ok(None)
1227        }
1228    }
1229
1230    /// Get a persistent savepoint given its id
1231    pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
1232        let mut system_tables = self.system_tables.lock();
1233        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1234        let value = table.get(SavepointId(id))?;
1235
1236        match value {
1237            Some(x) => Ok(x.value().to_savepoint(self.transaction_tracker.clone())?),
1238            None => Err(SavepointError::InvalidSavepoint),
1239        }
1240    }
1241
1242    /// Delete the given persistent savepoint.
1243    ///
1244    /// Note that if the transaction is `abort()`'ed this deletion will be rolled back.
1245    ///
1246    /// Returns `true` if the savepoint existed
1247    /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]`
1248    pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1249        if self.durability != InternalDurability::Immediate {
1250            return Err(SavepointError::InvalidSavepoint);
1251        }
1252        let mut system_tables = self.system_tables.lock();
1253        let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1254        let savepoint = table.remove(SavepointId(id))?;
1255        if let Some(serialized) = savepoint {
1256            let savepoint = serialized
1257                .value()
1258                .to_savepoint(self.transaction_tracker.clone())?;
1259            self.deleted_persistent_savepoints
1260                .lock()
1261                .push((savepoint.get_id(), savepoint.get_transaction_id()));
1262            Ok(true)
1263        } else {
1264            Ok(false)
1265        }
1266    }
1267
1268    /// List all persistent savepoints
1269    pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1270        let mut system_tables = self.system_tables.lock();
1271        let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1272        let mut savepoints = vec![];
1273        for savepoint in table.range::<SavepointId>(..)? {
1274            savepoints.push(savepoint?.0.value().0);
1275        }
1276        Ok(savepoints.into_iter())
1277    }
1278
1279    fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1280        let id = self
1281            .transaction_tracker
1282            .register_read_transaction(&self.mem)?;
1283
1284        Ok(TransactionGuard::new_read(
1285            id,
1286            self.transaction_tracker.clone(),
1287        ))
1288    }
1289
1290    fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1291        let transaction_id = self.allocate_read_transaction()?.leak()?;
1292        let id = self
1293            .transaction_tracker
1294            .allocate_savepoint(transaction_id)?;
1295        Ok((id, transaction_id))
1296    }
1297
1298    /// Creates a snapshot of the current database state, which can be used to rollback the database
1299    ///
1300    /// This savepoint will be freed as soon as the returned `[Savepoint]` is dropped.
1301    ///
1302    /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened)
1303    pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1304        if self.dirty.load(Ordering::Acquire) {
1305            return Err(SavepointError::InvalidSavepoint);
1306        }
1307
1308        let (id, transaction_id) = self.allocate_savepoint()?;
1309        #[cfg(feature = "logging")]
1310        debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
1311
1312        let root = self.mem.get_data_root();
1313        let savepoint = Savepoint::new_ephemeral(
1314            &self.mem,
1315            self.transaction_tracker.clone(),
1316            id,
1317            transaction_id,
1318            root,
1319        );
1320
1321        Ok(savepoint)
1322    }
1323
1324    /// Restore the state of the database to the given [`Savepoint`]
1325    ///
1326    /// Calling this method invalidates all [`Savepoint`]s created after savepoint
1327    pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1328        // Ensure that user does not try to restore a Savepoint that is from a different Database
1329        assert_eq!(
1330            core::ptr::from_ref(self.transaction_tracker.as_ref()),
1331            savepoint.db_address()
1332        );
1333
1334        if !self
1335            .transaction_tracker
1336            .is_valid_savepoint(savepoint.get_id())?
1337        {
1338            return Err(SavepointError::InvalidSavepoint);
1339        }
1340        #[cfg(feature = "logging")]
1341        debug!(
1342            "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1343            savepoint.get_id(),
1344            self.transaction_id
1345        );
1346        // Restoring a savepoint that reverted a file format or checksum type change could corrupt
1347        // the database
1348        assert_eq!(self.mem.get_version(), savepoint.get_version());
1349        self.dirty.store(true, Ordering::Release);
1350
1351        // Restoring a savepoint needs to accomplish the following:
1352        // 1) restore the table tree. This is trivial, since we have the old root
1353        // 1a) we also filter the freed tree to remove any pages referenced by the old root
1354        // 2) free all pages that were allocated since the savepoint and are unreachable
1355        //    from the restored table tree root. Here we diff the reachable pages from the old
1356        //    and new roots
1357        // 3) update the system tree to remove invalid persistent savepoints.
1358
1359        // 1) restore the table tree
1360        {
1361            self.tables.lock().set_root(savepoint.get_user_root())?;
1362        }
1363
1364        // 1a) purge all transactions that happened after the savepoint from the data freed tree
1365        let txn_id = savepoint.get_transaction_id().next()?.raw_id();
1366        {
1367            let lower = TransactionIdWithPagination {
1368                transaction_id: txn_id,
1369                pagination_id: 0,
1370            };
1371            let mut system_tables = self.system_tables.lock();
1372            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1373            for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1374                entry?;
1375            }
1376            // No need to process the system freed table, because it only rolls forward
1377        }
1378
1379        // 2) queue all pages that became unreachable
1380        {
1381            let tables = self.tables.lock();
1382            let mut data_freed_pages = tables.freed_pages.lock();
1383            let mut system_tables = self.system_tables.lock();
1384            let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1385            let lower = TransactionIdWithPagination {
1386                transaction_id: txn_id,
1387                pagination_id: 0,
1388            };
1389            for entry in data_allocated.range(lower..)? {
1390                let (_, value) = entry?;
1391                for i in 0..value.value().len() {
1392                    data_freed_pages.push(value.value().get(i));
1393                }
1394            }
1395        }
1396
1397        // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
1398        // from later trying to restore a savepoint "on another timeline"
1399        self.transaction_tracker
1400            .invalidate_savepoints_after(savepoint.get_id())?;
1401        for persistent_savepoint in self.list_persistent_savepoints()? {
1402            if persistent_savepoint > savepoint.get_id().0 {
1403                self.delete_persistent_savepoint(persistent_savepoint)?;
1404            }
1405        }
1406
1407        Ok(())
1408    }
1409
1410    /// Set the desired durability level for writes made in this transaction
1411    /// Defaults to [`Durability::Immediate`]
1412    ///
1413    /// If a persistent savepoint has been created or deleted, in this transaction, the durability may not
1414    /// be reduced below [`Durability::Immediate`]
1415    pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
1416        let created = !self.created_persistent_savepoints.lock().is_empty();
1417        let deleted = !self.deleted_persistent_savepoints.lock().is_empty();
1418        if (created || deleted) && !matches!(durability, Durability::Immediate) {
1419            return Err(SetDurabilityError::PersistentSavepointModified);
1420        }
1421
1422        self.durability = match durability {
1423            Durability::None => InternalDurability::None,
1424            Durability::Immediate => InternalDurability::Immediate,
1425        };
1426
1427        Ok(())
1428    }
1429
1430    /// Enable or disable 2-phase commit (defaults to disabled)
1431    ///
1432    /// By default, data is written using the following 1-phase commit algorithm:
1433    ///
1434    /// 1. Update the inactive commit slot with the new database state
1435    /// 2. Flip the god byte primary bit to activate the newly updated commit slot
1436    /// 3. Call `fsync` to ensure all writes have been persisted to disk
1437    ///
1438    /// All data is written with checksums. When opening the database after a crash, the most
1439    /// recent of the two commit slots with a valid checksum is used.
1440    ///
1441    /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash
1442    /// function with close to perfect collision resistance when used with non-malicious input. An
1443    /// attacker with an extremely high degree of control over the database's workload, including
1444    /// the ability to cause the database process to crash, can cause invalid data to be written
1445    /// with a valid checksum, leaving the database in an invalid, attacker-controlled state.
1446    ///
1447    /// Alternatively, you can enable 2-phase commit, which writes data like this:
1448    ///
1449    /// 1. Update the inactive commit slot with the new database state
1450    /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted
1451    /// 3. Flip the god byte primary bit to activate the newly updated commit slot
1452    /// 4. Call `fsync` to ensure the write to the god byte has been persisted
1453    ///
1454    /// This mitigates a theoretical attack where an attacker who
1455    /// 1. can control the order in which pages are flushed to disk
1456    /// 2. can introduce crashes during `fsync`,
1457    /// 3. has knowledge of the database file contents, and
1458    /// 4. can include arbitrary data in a write transaction
1459    ///
1460    /// could cause a transaction to partially commit (some but not all of the data is written).
1461    /// This is described in the design doc in futher detail.
1462    ///
1463    /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data
1464    /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with
1465    /// a high degree of control over the database's workload, including the ability to cause the
1466    /// database process to crash, can cause the database to crash with the god byte primary bit
1467    /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-
1468    /// controlled state.
1469    pub fn set_two_phase_commit(&mut self, enabled: bool) {
1470        self.two_phase_commit = enabled;
1471    }
1472
1473    /// Enable or disable quick-repair (defaults to disabled)
1474    ///
1475    /// By default, when reopening the database after a crash, redb needs to do a full repair.
1476    /// This involves walking the entire database to verify the checksums and reconstruct the
1477    /// allocator state, so it can be very slow if the database is large.
1478    ///
1479    /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state
1480    /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit
1481    /// (which guarantees that the primary commit slot is valid without needing to look at the
1482    /// checksums). This means commits are slower, but recovery after a crash is almost instant.
1483    pub fn set_quick_repair(&mut self, enabled: bool) {
1484        self.quick_repair = enabled;
1485    }
1486
1487    /// Open the given table
1488    ///
1489    /// The table will be created if it does not exist
1490    #[track_caller]
1491    pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1492        &'txn self,
1493        definition: TableDefinition<K, V>,
1494    ) -> Result<Table<'txn, K, V>, TableError> {
1495        self.tables.lock().open_table(self, definition)
1496    }
1497
1498    /// Open a TTL-enabled table.
1499    ///
1500    /// The table will be created if it does not exist. Values are stored with an
1501    /// 8-byte expiry header; use `insert_with_ttl()` to set per-key lifetimes.
1502    #[cfg(feature = "std")]
1503    #[track_caller]
1504    pub fn open_ttl_table<K: Key + 'static, V: Value + 'static>(
1505        &self,
1506        definition: crate::ttl_table::TtlTableDefinition<K, V>,
1507    ) -> Result<crate::ttl_table::TtlTable<'_, K, V>, TableError> {
1508        let inner = self.open_table(definition.inner_def())?;
1509        Ok(crate::ttl_table::TtlTable::new(inner))
1510    }
1511
1512    /// Open or create an IVF-PQ vector index for writing.
1513    ///
1514    /// The index tables will be created if they do not exist.
1515    #[track_caller]
1516    pub fn open_ivfpq_index(
1517        &self,
1518        definition: &crate::ivfpq::config::IvfPqIndexDefinition,
1519    ) -> Result<crate::ivfpq::index::IvfPqIndex<'_, Self>, TableError> {
1520        crate::ivfpq::index::IvfPqIndex::open(
1521            self,
1522            definition,
1523            Arc::clone(&self.observer),
1524            #[cfg(feature = "metrics")]
1525            Arc::clone(&self.db_metrics),
1526        )
1527        .map_err(TableError::Storage)
1528    }
1529
1530    /// Open the given table
1531    ///
1532    /// The table will be created if it does not exist
1533    #[track_caller]
1534    pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1535        &'txn self,
1536        definition: MultimapTableDefinition<K, V>,
1537    ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1538        self.tables.lock().open_multimap_table(self, definition)
1539    }
1540
1541    pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1542        &self,
1543        name: &str,
1544        table: &BtreeMut<K, V>,
1545        length: u64,
1546    ) {
1547        self.tables.lock().close_table(name, table, length);
1548    }
1549
1550    /// Rename the given table
1551    pub fn rename_table(
1552        &self,
1553        definition: impl TableHandle,
1554        new_name: impl TableHandle,
1555    ) -> Result<(), TableError> {
1556        let name = definition.name().to_string();
1557        // Drop the definition so that callers can pass in a `Table` to rename, without getting a TableAlreadyOpen error
1558        drop(definition);
1559        self.tables
1560            .lock()
1561            .rename_table(self, &name, new_name.name())
1562    }
1563
1564    /// Rename the given multimap table
1565    pub fn rename_multimap_table(
1566        &self,
1567        definition: impl MultimapTableHandle,
1568        new_name: impl MultimapTableHandle,
1569    ) -> Result<(), TableError> {
1570        let name = definition.name().to_string();
1571        // Drop the definition so that callers can pass in a `MultimapTable` to rename, without getting a TableAlreadyOpen error
1572        drop(definition);
1573        self.tables
1574            .lock()
1575            .rename_multimap_table(self, &name, new_name.name())
1576    }
1577
1578    /// Delete the given table
1579    ///
1580    /// Returns a bool indicating whether the table existed
1581    pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1582        let name = definition.name().to_string();
1583        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1584        drop(definition);
1585        self.tables.lock().delete_table(self, &name)
1586    }
1587
1588    /// Delete the given table
1589    ///
1590    /// Returns a bool indicating whether the table existed
1591    pub fn delete_multimap_table(
1592        &self,
1593        definition: impl MultimapTableHandle,
1594    ) -> Result<bool, TableError> {
1595        let name = definition.name().to_string();
1596        // Drop the definition so that callers can pass in a `Table` or `MultimapTable` to delete, without getting a TableAlreadyOpen error
1597        drop(definition);
1598        self.tables.lock().delete_multimap_table(self, &name)
1599    }
1600
1601    /// List all the tables
1602    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1603        self.tables
1604            .lock()
1605            .table_tree
1606            .list_tables(TableType::Normal)
1607            .map(|x| x.into_iter().map(UntypedTableHandle::new))
1608    }
1609
1610    /// List all the multimap tables
1611    pub fn list_multimap_tables(
1612        &self,
1613    ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1614        self.tables
1615            .lock()
1616            .table_tree
1617            .list_tables(TableType::Multimap)
1618            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1619    }
1620
1621    // -----------------------------------------------------------------------
1622    // Blob store operations
1623    // -----------------------------------------------------------------------
1624
1625    /// Store a blob with temporal, causal, tag, and namespace metadata.
1626    ///
1627    /// The blob data is written to the append-only blob region, and indexed in
1628    /// the `BLOB_TABLE`, `BLOB_TEMPORAL_INDEX`, and optionally
1629    /// `BLOB_CAUSAL_EDGES`, `BLOB_TAG_INDEX`, `BLOB_NAMESPACE` system tables.
1630    ///
1631    /// Returns the assigned `BlobId`.
1632    pub fn store_blob(
1633        &self,
1634        data: &[u8],
1635        content_type: ContentType,
1636        label: &str,
1637        opts: StoreOptions,
1638    ) -> Result<BlobId> {
1639        if self.blob_writer_active.load(Ordering::Acquire) {
1640            return Err(StorageError::BlobWriterActive);
1641        }
1642        // 1. Get current blob state
1643        let mut blob_state = self.mem.get_blob_state();
1644
1645        // 2. Initialize blob region offset on first use
1646        if blob_state.region_offset == 0 {
1647            let file_len = self.mem.file_len()?;
1648            blob_state.region_offset = file_len;
1649        }
1650
1651        // 3. Assign sequence number and compute content prefix hash
1652        let sequence = blob_state.next_sequence;
1653        blob_state.next_sequence = sequence + 1;
1654
1655        let prefix_len = data.len().min(4096);
1656        let content_prefix_hash = xxh3_hash64(&data[..prefix_len]);
1657        let blob_id = BlobId::new(sequence, content_prefix_hash);
1658
1659        // 4. Compute full checksum
1660        let checksum = xxh3_hash128(data);
1661
1662        // 5. Dedup check: compute SHA-256 and look for existing identical blob
1663        let dedup_eligible =
1664            self.blob_dedup_config.enabled && data.len() >= self.blob_dedup_config.min_size;
1665        let sha_key = if dedup_eligible {
1666            let hash: [u8; 32] = Sha256::digest(data).into();
1667            Some(Sha256Key(hash))
1668        } else {
1669            None
1670        };
1671
1672        let dedup_hit = if let Some(ref sha_key) = sha_key {
1673            let mut system_tables = self.system_tables.lock();
1674            let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1675            dedup_table.get(sha_key)?.map(|g| g.value())
1676        } else {
1677            None
1678        };
1679
1680        let mut dedup_saved = 0u64;
1681        let blob_ref = if let Some(existing) = dedup_hit
1682            && existing.checksum == checksum
1683            && existing.length == data.len() as u64
1684        {
1685            // Reuse existing physical data -- SHA-256 matched AND xxh3-128
1686            // checksum + length confirmed. Without the secondary check a
1687            // SHA-256 collision (or corrupted dedup index entry) would
1688            // silently bind the new blob_id to wrong physical data.
1689            dedup_saved = existing.length;
1690            BlobRef {
1691                offset: existing.offset,
1692                length: existing.length,
1693                checksum,
1694                ref_count: 1,
1695                content_type: content_type.as_byte(),
1696                compression: 0,
1697            }
1698        } else {
1699            // 5b. Write blob data to the blob region
1700            let blob_offset = blob_state.region_length;
1701            let file_offset = blob_state.region_offset + blob_offset;
1702            self.mem.blob_write(file_offset, data)?;
1703            blob_state.region_length += data.len() as u64;
1704
1705            BlobRef {
1706                offset: blob_offset,
1707                length: data.len() as u64,
1708                checksum,
1709                ref_count: 1,
1710                content_type: content_type.as_byte(),
1711                compression: 0,
1712            }
1713        };
1714
1715        // 6. Advance HLC
1716        let hlc = HybridLogicalClock::from_raw(blob_state.hlc_state).advance();
1717        blob_state.hlc_state = hlc.to_raw();
1718
1719        // as_nanos() returns u128, but u64 nanoseconds covers ~584 years from epoch.
1720        // Truncation is intentional and safe for any realistic timestamp.
1721        #[allow(clippy::cast_possible_truncation)]
1722        let wall_clock_ns = {
1723            #[cfg(feature = "std")]
1724            {
1725                // If the system clock is before UNIX epoch, fall back to zero;
1726                // HLC still provides causal ordering in that degenerate case.
1727                std::time::SystemTime::now()
1728                    .duration_since(std::time::UNIX_EPOCH)
1729                    .unwrap_or_default()
1730                    .as_nanos() as u64
1731            }
1732            #[cfg(not(feature = "std"))]
1733            {
1734                // no_std: wall clock unavailable; HLC provides causal ordering
1735                0u64
1736            }
1737        };
1738
1739        // 7. Build BlobMeta
1740        let causal_parent = opts.causal_link.as_ref().map(|l| l.parent);
1741        let meta = BlobMeta::new(blob_ref, wall_clock_ns, hlc.to_raw(), causal_parent, label);
1742
1743        // 8. Index in system tables
1744        {
1745            let mut system_tables = self.system_tables.lock();
1746
1747            let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
1748            blob_table.insert(&blob_id, &meta)?;
1749            drop(blob_table);
1750
1751            let temporal_key = TemporalKey::new(wall_clock_ns, hlc, blob_id);
1752            let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
1753            temporal_table.insert(&temporal_key, &())?;
1754            drop(temporal_table);
1755
1756            if let Some(link) = &opts.causal_link {
1757                let edge = CausalEdge::new(blob_id, link.relation, &link.context);
1758                let edge_key = CausalEdgeKey::new(link.parent, blob_id);
1759                let mut causal_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
1760                causal_table.insert(&edge_key, &edge)?;
1761                drop(causal_table);
1762            }
1763
1764            Self::index_tags_and_namespace(
1765                &mut system_tables,
1766                self,
1767                blob_id,
1768                &opts.tags,
1769                opts.namespace.as_deref(),
1770            )?;
1771
1772            // 8b. Update dedup index
1773            if let Some(sha_key) = sha_key {
1774                if let Some(existing) = dedup_hit {
1775                    // Increment ref_count on existing dedup entry
1776                    let mut dedup_table =
1777                        system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1778                    let updated = DedupVal {
1779                        ref_count: existing.ref_count + 1,
1780                        ..existing
1781                    };
1782                    dedup_table.insert(&sha_key, &updated)?;
1783                    drop(dedup_table);
1784                } else {
1785                    // New dedup entry
1786                    let mut dedup_table =
1787                        system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1788                    let entry = DedupVal {
1789                        offset: blob_ref.offset,
1790                        length: blob_ref.length,
1791                        checksum: blob_ref.checksum,
1792                        ref_count: 1,
1793                    };
1794                    dedup_table.insert(&sha_key, &entry)?;
1795                    drop(dedup_table);
1796                }
1797
1798                // Reverse map: BlobId -> Sha256Key
1799                let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
1800                dedup_map.insert(&blob_id, &sha_key)?;
1801                drop(dedup_map);
1802            }
1803        }
1804
1805        // 9. Update pending blob state for commit
1806        self.mem.set_pending_blob_state(blob_state);
1807        self.dirty.store(true, Ordering::Release);
1808
1809        // 10. Observer / metrics
1810        if dedup_saved > 0 {
1811            self.observer.on_blob_dedup(blob_id.sequence, dedup_saved);
1812            #[cfg(feature = "metrics")]
1813            self.db_metrics
1814                .blob_dedup_hits
1815                .fetch_add(1, portable_atomic::Ordering::Relaxed);
1816        } else {
1817            self.observer
1818                .on_blob_write(blob_id.sequence, blob_ref.length);
1819            #[cfg(feature = "metrics")]
1820            self.db_metrics
1821                .blob_writes
1822                .fetch_add(1, portable_atomic::Ordering::Relaxed);
1823        }
1824
1825        Ok(blob_id)
1826    }
1827
1828    /// Create a streaming blob writer that writes data in arbitrary-sized
1829    /// chunks with constant memory overhead.
1830    ///
1831    /// Only one `BlobWriter` may be active at a time. Calling `blob_writer()`
1832    /// or `store_blob()` while a writer is active returns
1833    /// [`StorageError::BlobWriterActive`].
1834    pub fn blob_writer(
1835        &self,
1836        content_type: ContentType,
1837        label: &str,
1838        opts: StoreOptions,
1839    ) -> Result<BlobWriter<'_>> {
1840        if self
1841            .blob_writer_active
1842            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1843            .is_err()
1844        {
1845            return Err(StorageError::BlobWriterActive);
1846        }
1847
1848        let mut blob_state = self.mem.get_blob_state();
1849
1850        if blob_state.region_offset == 0 {
1851            let file_len = self.mem.file_len()?;
1852            blob_state.region_offset = file_len;
1853        }
1854
1855        let sequence = blob_state.next_sequence;
1856        blob_state.next_sequence = sequence + 1;
1857
1858        let blob_region_start = blob_state.region_length;
1859        let blob_file_offset = blob_state.region_offset + blob_region_start;
1860
1861        // Persist the incremented sequence immediately so that a concurrent
1862        // store_blob (after this writer finishes) picks up the right counter.
1863        self.mem.set_pending_blob_state(blob_state);
1864
1865        Ok(BlobWriter::new(
1866            self,
1867            sequence,
1868            content_type,
1869            label,
1870            opts,
1871            blob_file_offset,
1872            blob_region_start,
1873            self.blob_dedup_config.enabled,
1874        ))
1875    }
1876
1877    /// Low-level: write bytes directly to the blob region (bypasses page cache).
1878    /// Used by `BlobWriter`.
1879    pub(crate) fn blob_write_raw(&self, file_offset: u64, data: &[u8]) -> Result {
1880        self.mem.blob_write(file_offset, data)
1881    }
1882
1883    /// Low-level: called by `BlobWriter::finish()` to index the completed blob
1884    /// in system tables and update pending blob state.
1885    pub(crate) fn finalize_blob_writer(
1886        &self,
1887        blob_id: BlobId,
1888        mut meta: BlobMeta,
1889        bytes_written: u64,
1890        opts: StoreOptions,
1891        sha_key: Option<Sha256Key>,
1892    ) -> Result {
1893        let mut blob_state = self.mem.get_blob_state();
1894
1895        // Advance HLC
1896        let hlc = HybridLogicalClock::from_raw(blob_state.hlc_state).advance();
1897        blob_state.hlc_state = hlc.to_raw();
1898
1899        // Update the HLC in the meta
1900        meta.hlc = hlc.to_raw();
1901
1902        // Update region length to account for the written data
1903        blob_state.region_length = meta.blob_ref.offset + bytes_written;
1904
1905        // Index in system tables
1906        {
1907            let mut system_tables = self.system_tables.lock();
1908
1909            let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
1910            blob_table.insert(&blob_id, &meta)?;
1911            drop(blob_table);
1912
1913            let temporal_key = TemporalKey::new(meta.wall_clock_ns, hlc, blob_id);
1914            let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
1915            temporal_table.insert(&temporal_key, &())?;
1916            drop(temporal_table);
1917
1918            if let Some(link) = &opts.causal_link {
1919                let edge = CausalEdge::new(blob_id, link.relation, &link.context);
1920                let edge_key = CausalEdgeKey::new(link.parent, blob_id);
1921                let mut causal_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
1922                causal_table.insert(&edge_key, &edge)?;
1923                drop(causal_table);
1924            }
1925
1926            Self::index_tags_and_namespace(
1927                &mut system_tables,
1928                self,
1929                blob_id,
1930                &opts.tags,
1931                opts.namespace.as_deref(),
1932            )?;
1933
1934            // Update dedup index for streaming writes
1935            if let Some(sha_key) = sha_key {
1936                let existing = {
1937                    let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1938                    dedup_table.get(&sha_key)?.map(|g| g.value())
1939                };
1940
1941                if let Some(existing) = existing {
1942                    // Another blob with same content already exists -- increment ref_count
1943                    let mut dedup_table =
1944                        system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1945                    let updated = DedupVal {
1946                        ref_count: existing.ref_count + 1,
1947                        ..existing
1948                    };
1949                    dedup_table.insert(&sha_key, &updated)?;
1950                    drop(dedup_table);
1951                } else {
1952                    // First occurrence -- create new dedup entry
1953                    let mut dedup_table =
1954                        system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1955                    let entry = DedupVal {
1956                        offset: meta.blob_ref.offset,
1957                        length: meta.blob_ref.length,
1958                        checksum: meta.blob_ref.checksum,
1959                        ref_count: 1,
1960                    };
1961                    dedup_table.insert(&sha_key, &entry)?;
1962                    drop(dedup_table);
1963                }
1964
1965                let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
1966                dedup_map.insert(&blob_id, &sha_key)?;
1967                drop(dedup_map);
1968            }
1969        }
1970
1971        self.mem.set_pending_blob_state(blob_state);
1972        self.dirty.store(true, Ordering::Release);
1973
1974        Ok(())
1975    }
1976
1977    /// Access the blob-writer-active flag. Used by `BlobWriter::drop`.
1978    pub(crate) fn blob_writer_active(&self) -> &AtomicBool {
1979        &self.blob_writer_active
1980    }
1981
1982    /// Index tags and namespace for a blob. Called from both `store_blob` and
1983    /// `finalize_blob_writer`.
1984    fn index_tags_and_namespace(
1985        system_tables: &mut SystemNamespace<'_>,
1986        txn: &WriteTransaction,
1987        blob_id: BlobId,
1988        tags: &[String],
1989        namespace: Option<&str>,
1990    ) -> Result {
1991        let tag_count = tags.len().min(MAX_TAGS_PER_BLOB);
1992        if tag_count > 0 {
1993            let mut tag_table = system_tables.open_system_table(txn, BLOB_TAG_INDEX)?;
1994            for tag in &tags[..tag_count] {
1995                let tag_key = TagKey::new(tag, blob_id);
1996                tag_table.insert(&tag_key, &())?;
1997            }
1998            drop(tag_table);
1999        }
2000
2001        if let Some(ns) = namespace {
2002            let ns_val = NamespaceVal::new(ns);
2003            let mut ns_table = system_tables.open_system_table(txn, BLOB_NAMESPACE)?;
2004            ns_table.insert(&blob_id, &ns_val)?;
2005            drop(ns_table);
2006
2007            let ns_key = NamespaceKey::new(ns, blob_id);
2008            let mut ns_idx = system_tables.open_system_table(txn, BLOB_NAMESPACE_INDEX)?;
2009            ns_idx.insert(&ns_key, &())?;
2010            drop(ns_idx);
2011        }
2012
2013        Ok(())
2014    }
2015
2016    /// Get tags for a blob within a write transaction.
2017    pub fn blob_tags(&self, blob_id: &BlobId) -> Result<Vec<String>> {
2018        let mut system_tables = self.system_tables.lock();
2019        let tag_table = system_tables.open_system_table(self, BLOB_TAG_INDEX)?;
2020
2021        let mut tags = Vec::new();
2022        // Scan all tag keys -- we need to find entries where blob_id matches.
2023        // Since TagKey is ordered (tag, blob_id), we scan the full table.
2024        // This is acceptable for the write-path read (low frequency).
2025        let range = tag_table.range::<TagKey>(..)?;
2026        for entry in range {
2027            let (key_guard, _) = entry?;
2028            let key = key_guard.value();
2029            if key.blob_id == *blob_id {
2030                tags.push(key.tag_str().to_string());
2031            }
2032        }
2033        Ok(tags)
2034    }
2035
2036    /// Get namespace for a blob within a write transaction.
2037    pub fn blob_namespace(&self, blob_id: &BlobId) -> Result<Option<String>> {
2038        let mut system_tables = self.system_tables.lock();
2039        let ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
2040        match ns_table.get(blob_id)? {
2041            Some(g) => Ok(Some(g.value().namespace_str().to_string())),
2042            None => Ok(None),
2043        }
2044    }
2045
2046    /// Retrieve a blob's data and metadata by its `BlobId`.
2047    ///
2048    /// The returned data is verified against the stored xxh3-128 checksum.
2049    pub fn get_blob(&self, blob_id: &BlobId) -> Result<Option<(Vec<u8>, BlobMeta)>> {
2050        let meta = {
2051            let mut system_tables = self.system_tables.lock();
2052            let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2053            match blob_table.get(blob_id)? {
2054                Some(g) => g.value(),
2055                None => return Ok(None),
2056            }
2057        };
2058
2059        let blob_state = self.mem.get_blob_state();
2060        let file_offset = blob_state.region_offset + meta.blob_ref.offset;
2061        #[allow(clippy::cast_possible_truncation)]
2062        let data = self
2063            .mem
2064            .blob_read(file_offset, meta.blob_ref.length as usize)?;
2065
2066        let actual = xxh3_hash128(&data);
2067        if actual != meta.blob_ref.checksum {
2068            return Err(StorageError::BlobChecksumMismatch {
2069                sequence: blob_id.sequence,
2070                expected: meta.blob_ref.checksum,
2071                actual,
2072            });
2073        }
2074
2075        Ok(Some((data, meta)))
2076    }
2077
2078    /// Retrieve only a blob's metadata (no data read).
2079    pub fn get_blob_meta(&self, blob_id: &BlobId) -> Result<Option<BlobMeta>> {
2080        let mut system_tables = self.system_tables.lock();
2081        let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2082
2083        let guard = blob_table.get(blob_id)?;
2084        Ok(guard.map(|g| g.value()))
2085    }
2086
2087    /// Read a byte range from a blob without checksum verification.
2088    ///
2089    /// Returns `None` if the blob does not exist. Returns
2090    /// [`StorageError::BlobRangeOutOfBounds`] if `offset + length` exceeds the
2091    /// blob's total size.
2092    pub fn read_blob_range(
2093        &self,
2094        blob_id: &BlobId,
2095        offset: u64,
2096        length: u64,
2097    ) -> Result<Option<Vec<u8>>> {
2098        let meta = {
2099            let mut system_tables = self.system_tables.lock();
2100            let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2101            match blob_table.get(blob_id)? {
2102                Some(g) => g.value(),
2103                None => return Ok(None),
2104            }
2105        };
2106
2107        if length == 0 {
2108            return Ok(Some(Vec::new()));
2109        }
2110
2111        let end = offset.saturating_add(length);
2112        if end > meta.blob_ref.length {
2113            return Err(StorageError::BlobRangeOutOfBounds {
2114                blob_length: meta.blob_ref.length,
2115                requested_offset: offset,
2116                requested_length: length,
2117            });
2118        }
2119
2120        let blob_state = self.mem.get_blob_state();
2121        let file_offset = blob_state.region_offset + meta.blob_ref.offset + offset;
2122        #[allow(clippy::cast_possible_truncation)]
2123        let data = self.mem.blob_read(file_offset, length as usize)?;
2124
2125        Ok(Some(data))
2126    }
2127
2128    /// Get a seekable reader for a blob's data.
2129    ///
2130    /// Returns `None` if the blob does not exist. The returned [`BlobReader`]
2131    /// implements [`std::io::Read`] and [`std::io::Seek`] for streaming access.
2132    ///
2133    /// Range reads bypass checksum verification since the stored checksum
2134    /// covers the entire blob.
2135    pub fn blob_reader(&self, blob_id: &BlobId) -> Result<Option<BlobReader>> {
2136        let meta = {
2137            let mut system_tables = self.system_tables.lock();
2138            let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2139            match blob_table.get(blob_id)? {
2140                Some(g) => g.value(),
2141                None => return Ok(None),
2142            }
2143        };
2144
2145        let blob_state = self.mem.get_blob_state();
2146        let file_offset = blob_state.region_offset + meta.blob_ref.offset;
2147
2148        Ok(Some(BlobReader::new(
2149            Arc::clone(&self.mem),
2150            file_offset,
2151            meta.blob_ref.length,
2152        )))
2153    }
2154
2155    /// Delete a blob and remove it from all indexes.
2156    pub fn delete_blob(&self, blob_id: &BlobId) -> Result<bool> {
2157        let mut system_tables = self.system_tables.lock();
2158
2159        // Read metadata to find temporal key and causal parent
2160        let meta = {
2161            let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2162            match blob_table.get(blob_id)? {
2163                Some(g) => g.value(),
2164                None => return Ok(false),
2165            }
2166        };
2167
2168        // Remove from temporal index
2169        let temporal_key = TemporalKey::new(
2170            meta.wall_clock_ns,
2171            HybridLogicalClock::from_raw(meta.hlc),
2172            *blob_id,
2173        );
2174
2175        let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
2176        temporal_table.remove(&temporal_key)?;
2177        drop(temporal_table);
2178
2179        // Remove incoming causal edge (parent -> this blob).
2180        if let Some(parent) = meta.causal_parent {
2181            let mut edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
2182            edges_table.remove(&CausalEdgeKey::new(parent, *blob_id))?;
2183            drop(edges_table);
2184
2185            // Also remove from legacy table only if it points to this blob
2186            let mut legacy_table = system_tables.open_system_table(self, BLOB_CAUSAL_CHILDREN)?;
2187            let should_remove_legacy = legacy_table
2188                .get(&parent)?
2189                .is_some_and(|g| g.value() == *blob_id);
2190            if should_remove_legacy {
2191                legacy_table.remove(&parent)?;
2192            }
2193            drop(legacy_table);
2194        }
2195
2196        // Remove outgoing causal edges (this blob -> children). Without this,
2197        // deleting a parent blob leaves dangling edge entries that waste space
2198        // and could confuse causal graph traversals.
2199        {
2200            let edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
2201            let start = CausalEdgeKey::new(*blob_id, BlobId::MIN);
2202            let end = CausalEdgeKey::new(*blob_id, BlobId::MAX);
2203            let mut outgoing = Vec::new();
2204            if let Ok(range) = edges_table.range(start..=end) {
2205                for entry in range {
2206                    let (key_guard, _) = entry?;
2207                    outgoing.push(key_guard.value());
2208                }
2209            }
2210            drop(edges_table);
2211
2212            if !outgoing.is_empty() {
2213                let mut edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
2214                for key in &outgoing {
2215                    edges_table.remove(key)?;
2216                }
2217                drop(edges_table);
2218            }
2219        }
2220
2221        // Remove tag index entries -- scan for all TagKeys that reference this blob
2222        {
2223            let mut tag_table = system_tables.open_system_table(self, BLOB_TAG_INDEX)?;
2224            let mut to_remove = Vec::new();
2225            let range = tag_table.range::<TagKey>(..)?;
2226            for entry in range {
2227                let (key_guard, _) = entry?;
2228                let key = key_guard.value();
2229                if key.blob_id == *blob_id {
2230                    to_remove.push(key);
2231                }
2232            }
2233            for key in &to_remove {
2234                tag_table.remove(key)?;
2235            }
2236            drop(tag_table);
2237        }
2238
2239        // Remove namespace entries
2240        {
2241            let ns_key = {
2242                let ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
2243                match ns_table.get(blob_id)? {
2244                    Some(ns_guard) => {
2245                        let ns_str = ns_guard.value().namespace_str().to_string();
2246                        Some(NamespaceKey::new(&ns_str, *blob_id))
2247                    }
2248                    None => None,
2249                }
2250            };
2251
2252            if let Some(ns_key) = ns_key {
2253                let mut ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
2254                ns_table.remove(blob_id)?;
2255                drop(ns_table);
2256
2257                let mut ns_idx = system_tables.open_system_table(self, BLOB_NAMESPACE_INDEX)?;
2258                ns_idx.remove(&ns_key)?;
2259                drop(ns_idx);
2260            }
2261        }
2262
2263        // Remove dedup entries (if this blob was dedup-indexed)
2264        let sha_key = {
2265            let dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
2266            let result = dedup_map.get(blob_id)?.map(|g| g.value());
2267            drop(dedup_map);
2268            result
2269        };
2270
2271        if let Some(sha_key) = sha_key {
2272            // Read current dedup entry to decide whether to decrement or remove
2273            let dedup_val = {
2274                let dedup_idx = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2275                let result = dedup_idx.get(&sha_key)?.map(|g| g.value());
2276                drop(dedup_idx);
2277                result
2278            };
2279
2280            if let Some(val) = dedup_val {
2281                let mut dedup_idx = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2282                if val.ref_count > 1 {
2283                    let updated = DedupVal {
2284                        offset: val.offset,
2285                        length: val.length,
2286                        checksum: val.checksum,
2287                        ref_count: val.ref_count - 1,
2288                    };
2289                    dedup_idx.insert(&sha_key, &updated)?;
2290                } else {
2291                    dedup_idx.remove(&sha_key)?;
2292                }
2293                drop(dedup_idx);
2294            }
2295
2296            let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
2297            dedup_map.remove(blob_id)?;
2298            drop(dedup_map);
2299        }
2300
2301        // Remove from primary table
2302        let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2303        blob_table.remove(blob_id)?;
2304        drop(blob_table);
2305
2306        self.dirty.store(true, Ordering::Release);
2307        Ok(true)
2308    }
2309
2310    /// Returns statistics about blob region space usage.
2311    ///
2312    /// Scans the primary blob table to compute live bytes, then compares with
2313    /// the total region length to determine dead space and fragmentation.
2314    pub fn blob_stats(&self) -> Result<BlobStats> {
2315        let blob_state = self.mem.get_blob_state();
2316        let region_bytes = blob_state.region_length;
2317
2318        if region_bytes == 0 {
2319            return Ok(BlobStats {
2320                blob_count: 0,
2321                live_bytes: 0,
2322                region_bytes: 0,
2323                dead_bytes: 0,
2324                fragmentation_ratio: 0.0,
2325            });
2326        }
2327
2328        let mut system_tables = self.system_tables.lock();
2329        let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2330
2331        let mut blob_count: u64 = 0;
2332        let mut unique_offsets = crate::compat::HashSet::new();
2333        let mut live_bytes: u64 = 0;
2334
2335        let range = blob_table.range::<BlobId>(..)?;
2336        for entry in range {
2337            let (_, value_guard) = entry?;
2338            let meta = value_guard.value();
2339            blob_count += 1;
2340            // Dedup: multiple BlobIds may share the same physical offset.
2341            // Only count each physical region once.
2342            if unique_offsets.insert(meta.blob_ref.offset) {
2343                live_bytes += meta.blob_ref.length;
2344            }
2345        }
2346        drop(blob_table);
2347
2348        let dead_bytes = region_bytes.saturating_sub(live_bytes);
2349        #[allow(clippy::cast_precision_loss)]
2350        let fragmentation_ratio = if region_bytes > 0 {
2351            dead_bytes as f64 / region_bytes as f64
2352        } else {
2353            0.0
2354        };
2355
2356        Ok(BlobStats {
2357            blob_count,
2358            live_bytes,
2359            region_bytes,
2360            dead_bytes,
2361            fragmentation_ratio,
2362        })
2363    }
2364
2365    /// Single pass of blob compaction: reads all live blobs, copies them
2366    /// contiguously to a destination offset, updates all offsets in
2367    /// `BLOB_TABLE` and `BLOB_DEDUP_INDEX`, and updates the pending blob state.
2368    ///
2369    /// When `write_from_zero` is false (Pass 1), data is appended after the
2370    /// current region end -- safe even on crash since old data is untouched.
2371    /// When `write_from_zero` is true (Pass 2), data is written from offset 0 --
2372    /// safe because committed offsets point to the appended area from Pass 1.
2373    ///
2374    /// Returns `(unique_blobs_relocated, total_live_bytes)`.
2375    pub(crate) fn compact_blobs_pass(&self, write_from_zero: bool) -> Result<(u64, u64)> {
2376        let mut blob_state = self.mem.get_blob_state();
2377        let region_offset = blob_state.region_offset;
2378        let old_region_length = blob_state.region_length;
2379
2380        if old_region_length == 0 {
2381            return Ok((0, 0));
2382        }
2383
2384        // Step 1: Collect all live blobs from BLOB_TABLE
2385        let mut live_blobs: Vec<(BlobId, BlobMeta)> = Vec::new();
2386        {
2387            let mut system_tables = self.system_tables.lock();
2388            let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2389            let range = blob_table.range::<BlobId>(..)?;
2390            for entry in range {
2391                let (key_guard, value_guard) = entry?;
2392                live_blobs.push((key_guard.value(), value_guard.value()));
2393            }
2394            drop(blob_table);
2395        }
2396
2397        if live_blobs.is_empty() {
2398            // All blobs deleted -- reset region
2399            blob_state.region_length = 0;
2400            self.mem.set_pending_blob_state(blob_state);
2401            self.dirty.store(true, Ordering::Release);
2402            return Ok((0, 0));
2403        }
2404
2405        // Step 2: Deduplicate physical locations and sort by offset
2406        let mut unique_physical: Vec<(u64, u64)> = Vec::new(); // (offset, length)
2407        {
2408            let mut seen = crate::compat::HashSet::new();
2409            for (_, meta) in &live_blobs {
2410                if seen.insert(meta.blob_ref.offset) {
2411                    unique_physical.push((meta.blob_ref.offset, meta.blob_ref.length));
2412                }
2413            }
2414        }
2415        unique_physical.sort_by_key(|&(offset, _)| offset);
2416
2417        // Step 3: Copy each unique physical blob to new contiguous position
2418        let write_base = if write_from_zero {
2419            0
2420        } else {
2421            old_region_length
2422        };
2423        let mut offset_map = crate::compat::HashMap::new();
2424        let mut write_cursor: u64 = 0;
2425
2426        for &(old_offset, length) in &unique_physical {
2427            let src_file_offset = region_offset + old_offset;
2428            let new_offset = write_base + write_cursor;
2429            let dst_file_offset = region_offset + new_offset;
2430
2431            #[allow(clippy::cast_possible_truncation)]
2432            let data = self.mem.blob_read(src_file_offset, length as usize)?;
2433            self.mem.blob_write(dst_file_offset, &data)?;
2434
2435            offset_map.insert(old_offset, new_offset);
2436            write_cursor += length;
2437        }
2438
2439        let total_live_size = write_cursor;
2440        let blobs_relocated = unique_physical.len() as u64;
2441
2442        // Step 4: Update all BlobRef offsets in BLOB_TABLE
2443        {
2444            let mut system_tables = self.system_tables.lock();
2445            let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2446            for (blob_id, meta) in &live_blobs {
2447                if let Some(&new_offset) = offset_map.get(&meta.blob_ref.offset)
2448                    && new_offset != meta.blob_ref.offset
2449                {
2450                    let mut updated_meta = meta.clone();
2451                    updated_meta.blob_ref.offset = new_offset;
2452                    blob_table.insert(blob_id, &updated_meta)?;
2453                }
2454            }
2455            drop(blob_table);
2456
2457            // Step 5: Update BLOB_DEDUP_INDEX offsets
2458            let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2459            let mut dedup_entries: Vec<(Sha256Key, DedupVal)> = Vec::new();
2460            let range = dedup_table.range::<Sha256Key>(..)?;
2461            for entry in range {
2462                let (key_guard, value_guard) = entry?;
2463                dedup_entries.push((key_guard.value(), value_guard.value()));
2464            }
2465            drop(dedup_table);
2466
2467            if !dedup_entries.is_empty() {
2468                let mut dedup_table_mut =
2469                    system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2470                for (sha_key, val) in &dedup_entries {
2471                    if let Some(&new_offset) = offset_map.get(&val.offset)
2472                        && new_offset != val.offset
2473                    {
2474                        let updated = DedupVal {
2475                            offset: new_offset,
2476                            length: val.length,
2477                            checksum: val.checksum,
2478                            ref_count: val.ref_count,
2479                        };
2480                        dedup_table_mut.insert(sha_key, &updated)?;
2481                    }
2482                }
2483                drop(dedup_table_mut);
2484            }
2485        }
2486
2487        // Step 6: Update blob state
2488        if write_from_zero {
2489            blob_state.region_length = total_live_size;
2490        } else {
2491            blob_state.region_length = old_region_length + total_live_size;
2492        }
2493        self.mem.set_pending_blob_state(blob_state);
2494        self.dirty.store(true, Ordering::Release);
2495
2496        Ok((blobs_relocated, total_live_size))
2497    }
2498
2499    /// Commit the transaction
2500    ///
2501    /// All writes performed in this transaction will be visible to future transactions, and are
2502    /// durable as consistent with the [`Durability`] level set by [`Self::set_durability`]
2503    pub fn commit(mut self) -> Result<(), CommitError> {
2504        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
2505        self.completed = true;
2506        self.commit_inner()
2507    }
2508
2509    fn commit_inner(&mut self) -> Result<(), CommitError> {
2510        #[cfg(feature = "std")]
2511        let commit_start = std::time::Instant::now();
2512
2513        // Quick-repair requires 2-phase commit
2514        if self.quick_repair {
2515            self.two_phase_commit = true;
2516        }
2517
2518        // Early freed page processing: reclaim pages from previous transactions BEFORE
2519        // this transaction's B-tree mutations, so the buddy allocator can reuse them.
2520        // Without this, freed pages from transaction N are only returned to the allocator
2521        // during transaction N+1's commit -- after N+1 has already allocated fresh pages.
2522        //
2523        // Only done for durable commits. Non-durable commits have complex savepoint
2524        // interactions that require freed page processing to stay in the commit path.
2525        if matches!(self.durability, InternalDurability::Immediate) {
2526            let free_until_transaction = self
2527                .transaction_tracker
2528                .oldest_live_read_transaction()?
2529                .map(|x| x.next())
2530                .transpose()?
2531                .unwrap_or(self.transaction_id);
2532            if let Err(err) = self.process_freed_pages(free_until_transaction) {
2533                self.tables.lock().table_tree.clear_root_updates_and_close();
2534                return Err(err.into());
2535            }
2536        }
2537
2538        let (user_root, allocated_pages, data_freed) =
2539            self.tables.lock().table_tree.flush_and_close()?;
2540
2541        let dirty_page_count = allocated_pages.len() as u64;
2542        self.store_data_freed_pages(data_freed)?;
2543        self.store_allocated_pages(allocated_pages.into_iter().collect())?;
2544        self.flush_cdc_log()?;
2545
2546        #[cfg(feature = "logging")]
2547        debug!(
2548            "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
2549            self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
2550        );
2551        match self.durability {
2552            InternalDurability::None => self.non_durable_commit(user_root)?,
2553            InternalDurability::Immediate => self.durable_commit(user_root)?,
2554        }
2555
2556        for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().iter() {
2557            self.transaction_tracker
2558                .deallocate_savepoint(*savepoint, *transaction)?;
2559        }
2560
2561        debug_assert!(
2562            self.system_tables
2563                .lock()
2564                .system_freed_pages()
2565                .lock()
2566                .is_empty()
2567        );
2568        debug_assert!(self.tables.lock().freed_pages.lock().is_empty());
2569
2570        #[cfg(feature = "logging")]
2571        debug!(
2572            "Finished commit of transaction id={:?}",
2573            self.transaction_id
2574        );
2575
2576        let info = crate::observer::CommitInfo {
2577            transaction_id: self.transaction_id.raw_id(),
2578            dirty_page_count,
2579            two_phase: self.two_phase_commit,
2580            #[cfg(feature = "std")]
2581            commit_duration: commit_start.elapsed(),
2582        };
2583        self.observer.on_write_commit(&info);
2584        #[cfg(feature = "metrics")]
2585        self.db_metrics
2586            .write_txn_committed
2587            .fetch_add(1, portable_atomic::Ordering::Relaxed);
2588
2589        Ok(())
2590    }
2591
2592    fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
2593        let mut system_tables = self.system_tables.lock();
2594        let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
2595        let mut pagination_counter = 0;
2596        while !freed_pages.is_empty() {
2597            let chunk_size = 400;
2598            let buffer_size = PageList::required_bytes(chunk_size);
2599            let key = TransactionIdWithPagination {
2600                transaction_id: self.transaction_id.raw_id(),
2601                pagination_id: pagination_counter,
2602            };
2603            let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
2604
2605            let len = freed_pages.len();
2606            access_guard.as_mut().clear();
2607            for page in freed_pages.drain(len - min(len, chunk_size)..) {
2608                // Make sure that the page is currently allocated
2609                debug_assert!(
2610                    self.mem.is_allocated(page),
2611                    "Page is not allocated: {page:?}"
2612                );
2613                debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
2614                access_guard.as_mut().push_back(page);
2615            }
2616
2617            pagination_counter += 1;
2618        }
2619
2620        Ok(())
2621    }
2622
2623    fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
2624        let mut system_tables = self.system_tables.lock();
2625        let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
2626        let mut pagination_counter = 0;
2627        while !data_allocated_pages.is_empty() {
2628            let chunk_size = 400;
2629            let buffer_size = PageList::required_bytes(chunk_size);
2630            let key = TransactionIdWithPagination {
2631                transaction_id: self.transaction_id.raw_id(),
2632                pagination_id: pagination_counter,
2633            };
2634            let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
2635
2636            let len = data_allocated_pages.len();
2637            access_guard.as_mut().clear();
2638            for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
2639                // Make sure that the page is currently allocated. This is to catch scenarios like
2640                // a page getting allocated, and then deallocated within the same transaction,
2641                // but errantly being left in the allocated pages list
2642                debug_assert!(
2643                    self.mem.is_allocated(page),
2644                    "Page is not allocated: {page:?}"
2645                );
2646                debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
2647                access_guard.as_mut().push_back(page);
2648            }
2649
2650            pagination_counter += 1;
2651        }
2652
2653        // Purge any transactions that are no longer referenced
2654        let oldest = self
2655            .transaction_tracker
2656            .oldest_savepoint()?
2657            .map_or(u64::MAX, |(_, x)| x.raw_id());
2658        let key = TransactionIdWithPagination {
2659            transaction_id: oldest,
2660            pagination_id: 0,
2661        };
2662        for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
2663            entry?;
2664        }
2665
2666        Ok(())
2667    }
2668
2669    fn flush_cdc_log(&self) -> Result {
2670        let events = match self.cdc_log {
2671            Some(ref log) => {
2672                let mut guard = log.lock();
2673                if guard.is_empty() {
2674                    return Ok(());
2675                }
2676                core::mem::take(&mut *guard)
2677            }
2678            None => return Ok(()),
2679        };
2680
2681        let txn_id = self.transaction_id.raw_id();
2682        let mut system_tables = self.system_tables.lock();
2683        let mut cdc_table = system_tables.open_system_table(self, CDC_LOG_TABLE)?;
2684
2685        for (seq, event) in events.iter().enumerate() {
2686            let key = CdcKey::new(txn_id, u32::try_from(seq).unwrap_or(u32::MAX));
2687            let record = CdcRecord::from_event(event)?;
2688            cdc_table.insert(&key, &record)?;
2689        }
2690
2691        // Retention pruning -- respect active consumer cursors to prevent
2692        // silent data loss. The effective cutoff is the LARGER of (txn_id -
2693        // retention_max_txns) and the oldest cursor position, so we never
2694        // prune events that a registered consumer hasn't consumed yet.
2695        if self.cdc_config.retention_max_txns > 0 && txn_id > self.cdc_config.retention_max_txns {
2696            let retention_cutoff = txn_id - self.cdc_config.retention_max_txns;
2697
2698            // Drop cdc_table to release the borrow on system_tables before
2699            // opening the cursor table.
2700            drop(cdc_table);
2701
2702            // Find the oldest active cursor position (if any cursors exist)
2703            let oldest_cursor = {
2704                let cursor_table = system_tables.open_system_table(self, CDC_CURSOR_TABLE)?;
2705                let mut oldest: Option<u64> = None;
2706                for entry in cursor_table.range::<&str>(..)? {
2707                    let (_, val_guard) = entry?;
2708                    let pos = val_guard.value();
2709                    oldest = Some(oldest.map_or(pos, |o: u64| o.min(pos)));
2710                }
2711                drop(cursor_table);
2712                oldest
2713            };
2714
2715            // Don't prune past the oldest cursor -- a slow consumer would
2716            // silently miss mutations if we deleted entries it hasn't read.
2717            // Use min() so the cutoff never advances beyond what the slowest
2718            // consumer has already read.
2719            let effective_cutoff = match oldest_cursor {
2720                Some(cursor_pos) => retention_cutoff.min(cursor_pos),
2721                None => retention_cutoff,
2722            };
2723
2724            if effective_cutoff > 0 {
2725                let mut cdc_table = system_tables.open_system_table(self, CDC_LOG_TABLE)?;
2726                let end_key = CdcKey::new(effective_cutoff, u32::MAX);
2727                for entry in cdc_table.extract_from_if(..=end_key, |_, _| true)? {
2728                    entry?;
2729                }
2730            }
2731        }
2732
2733        Ok(())
2734    }
2735
2736    /// Advance a named CDC cursor to the given transaction ID.
2737    ///
2738    /// Cursors persist across transactions and allow consumers to track
2739    /// their position in the CDC log. Only the named consumer should
2740    /// advance its own cursor.
2741    pub fn advance_cdc_cursor(&self, name: &str, up_to_txn: u64) -> Result {
2742        let mut system_tables = self.system_tables.lock();
2743        let mut cursor_table = system_tables.open_system_table(self, CDC_CURSOR_TABLE)?;
2744        cursor_table.insert(name, &up_to_txn)?;
2745        Ok(())
2746    }
2747
2748    pub(crate) fn list_history_snapshot_ids(&self) -> Result<Vec<u64>> {
2749        let mut system_tables = self.system_tables.lock();
2750        let history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
2751        let mut ids = Vec::new();
2752        for entry in history_table.range::<u64>(..)? {
2753            let (key_guard, _) = entry?;
2754            ids.push(key_guard.value());
2755        }
2756        Ok(ids)
2757    }
2758
2759    pub(crate) fn purge_all_history_snapshots(&self) -> Result {
2760        let mut system_tables = self.system_tables.lock();
2761        let mut history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
2762        let keys: Vec<u64> = history_table
2763            .range::<u64>(..)?
2764            .map(|entry| entry.map(|(k, _)| k.value()))
2765            .collect::<Result<Vec<_>>>()?;
2766        for key in &keys {
2767            history_table.remove(key)?;
2768        }
2769        Ok(())
2770    }
2771
2772    /// Abort the transaction
2773    ///
2774    /// All writes performed in this transaction will be rolled back
2775    pub fn abort(mut self) -> Result {
2776        // Set completed flag first, so that we don't go through the abort() path on drop, if this fails
2777        self.completed = true;
2778        self.abort_inner()
2779    }
2780
2781    fn abort_inner(&mut self) -> Result {
2782        #[cfg(feature = "logging")]
2783        debug!("Aborting transaction id={:?}", self.transaction_id);
2784        self.tables.lock().table_tree.clear_root_updates_and_close();
2785        for savepoint in self.created_persistent_savepoints.lock().iter() {
2786            match self.delete_persistent_savepoint(savepoint.0) {
2787                Ok(_) => {}
2788                Err(err) => match err {
2789                    SavepointError::InvalidSavepoint => {
2790                        return Err(StorageError::Internal(
2791                            "invalid savepoint encountered during transaction abort".to_string(),
2792                        ));
2793                    }
2794                    SavepointError::Storage(storage_err) => {
2795                        return Err(storage_err);
2796                    }
2797                },
2798            }
2799        }
2800        self.mem.rollback_uncommitted_writes()?;
2801        #[cfg(feature = "logging")]
2802        debug!("Finished abort of transaction id={:?}", self.transaction_id);
2803
2804        self.observer.on_write_abort(self.transaction_id.raw_id());
2805        #[cfg(feature = "metrics")]
2806        self.db_metrics
2807            .write_txn_aborted
2808            .fetch_add(1, portable_atomic::Ordering::Relaxed);
2809
2810        Ok(())
2811    }
2812
2813    pub(crate) fn durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
2814        // NOTE: process_freed_pages() has already run in commit_inner() before flush_and_close(),
2815        // so previously freed pages are already returned to the buddy allocator.
2816
2817        let mut system_tables = self.system_tables.lock();
2818
2819        // Save history snapshot for time-travel reads.
2820        // Must happen before flush_table_root_updates() so the history table
2821        // changes are included in the system root.
2822        if self.history_retention > 0 && !self.quick_repair {
2823            #[cfg(feature = "std")]
2824            #[allow(clippy::cast_possible_truncation)]
2825            let timestamp_ms = std::time::SystemTime::now()
2826                .duration_since(std::time::UNIX_EPOCH)
2827                .unwrap_or(std::time::Duration::ZERO)
2828                .as_millis() as u64;
2829            #[cfg(not(feature = "std"))]
2830            let timestamp_ms = 0u64;
2831
2832            let blob_state = self.mem.get_blob_state();
2833            let snapshot = HistorySnapshot::new(
2834                user_root,
2835                timestamp_ms,
2836                blob_state.region_offset,
2837                blob_state.region_length,
2838                blob_state.next_sequence,
2839                blob_state.hlc_state,
2840            );
2841            let mut history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
2842            history_table.insert(&self.transaction_id.raw_id(), &snapshot)?;
2843            self.transaction_tracker
2844                .register_history_hold(self.transaction_id)?;
2845
2846            // Prune old snapshots beyond retention limit.
2847            let mut all_keys = Vec::new();
2848            for entry in history_table.range::<u64>(..)? {
2849                let (key_guard, _) = entry?;
2850                all_keys.push(key_guard.value());
2851            }
2852            let retention = usize::try_from(self.history_retention).unwrap_or(usize::MAX);
2853            if all_keys.len() > retention {
2854                let to_remove = all_keys.len() - retention;
2855                for key in &all_keys[..to_remove] {
2856                    history_table.remove(key)?;
2857                    self.transaction_tracker
2858                        .deallocate_history_hold(TransactionId::new(*key))?;
2859                }
2860            }
2861        }
2862
2863        let system_freed_pages = system_tables.system_freed_pages();
2864        let system_tree = system_tables.table_tree.flush_table_root_updates()?;
2865        system_tree
2866            .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
2867            .map_err(|e| e.into_storage_error_or_internal("Unexpected TableError"))?;
2868
2869        if self.quick_repair {
2870            system_tree.create_table_and_flush_table_root(
2871                ALLOCATOR_STATE_TABLE_NAME,
2872                |system_tree_ref, tree: &mut AllocatorStateTreeMut| {
2873                    let mut pagination_counter = 0;
2874
2875                    loop {
2876                        let num_regions = self
2877                            .mem
2878                            .reserve_allocator_state(tree, self.transaction_id)?;
2879
2880                        // We can't free pages after the commit, because that would invalidate our
2881                        // saved allocator state. Everything needs to go through the transactional
2882                        // free mechanism
2883                        self.store_system_freed_pages(
2884                            system_tree_ref,
2885                            system_freed_pages.clone(),
2886                            None,
2887                            &mut pagination_counter,
2888                        )?;
2889
2890                        if self.mem.try_save_allocator_state(tree, num_regions)? {
2891                            return Ok(());
2892                        }
2893
2894                        // Clear out the table before retrying, just in case the number of regions
2895                        // has somehow shrunk. Don't use retain_in() for this, since it doesn't
2896                        // free the pages immediately -- we need to reuse those pages to guarantee
2897                        // that our retry loop will eventually terminate
2898                        while let Some(guards) = tree.last()? {
2899                            let key = guards.0.value();
2900                            drop(guards);
2901                            tree.remove(&key)?;
2902                        }
2903                    }
2904                },
2905            )?;
2906        }
2907
2908        let system_root = system_tree.finalize_dirty_checksums()?;
2909
2910        self.mem.commit(
2911            user_root,
2912            system_root,
2913            self.transaction_id,
2914            self.two_phase_commit,
2915            self.shrink_policy,
2916        )?;
2917
2918        // Mark any pending non-durable commits as fully committed.
2919        self.transaction_tracker
2920            .clear_pending_non_durable_commits()?;
2921
2922        // Immediately free the pages that were freed from the system-tree. These are only
2923        // accessed by write transactions, so it's safe to free them as soon as the commit is done.
2924        for page in system_freed_pages.lock().drain(..) {
2925            self.mem.free(page, &mut PageTrackerPolicy::Ignore);
2926        }
2927
2928        Ok(())
2929    }
2930
2931    // Commit without a durability guarantee
2932    pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
2933        let mut free_until_transaction = self
2934            .transaction_tracker
2935            .oldest_live_read_nondurable_transaction()?
2936            .map(|x| x.next())
2937            .transpose()?
2938            .unwrap_or(self.transaction_id);
2939        if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint()? {
2940            free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
2941        }
2942        self.process_freed_pages_nondurable(free_until_transaction)?;
2943
2944        let mut post_commit_frees = vec![];
2945
2946        let system_root = {
2947            let mut system_tables = self.system_tables.lock();
2948            let system_freed_pages = system_tables.system_freed_pages();
2949            system_tables.table_tree.flush_table_root_updates()?;
2950            for page in system_freed_pages
2951                .lock()
2952                .extract_if(.., |p| self.mem.unpersisted(*p))
2953            {
2954                post_commit_frees.push(page);
2955            }
2956            // Store all freed pages for a future commit(), since we can't free pages during a
2957            // non-durable commit (it's non-durable, so could be rolled back anytime in the future)
2958            self.store_system_freed_pages(
2959                &mut system_tables.table_tree,
2960                system_freed_pages,
2961                Some(&mut post_commit_frees),
2962                &mut 0,
2963            )?;
2964
2965            system_tables
2966                .table_tree
2967                .flush_table_root_updates()?
2968                .finalize_dirty_checksums()?
2969        };
2970
2971        self.mem
2972            .non_durable_commit(user_root, system_root, self.transaction_id)?;
2973        // Register this as a non-durable transaction to ensure that the freed pages we just pushed
2974        // are only processed after this has been persisted
2975        self.transaction_tracker.register_non_durable_commit(
2976            self.transaction_id,
2977            self.mem.get_last_durable_transaction_id()?,
2978        )?;
2979
2980        for page in post_commit_frees {
2981            self.mem.free(page, &mut PageTrackerPolicy::Ignore);
2982        }
2983
2984        Ok(())
2985    }
2986
2987    // Relocate pages to lower number regions/pages
2988    // Returns true if a page(s) was moved
2989    pub(crate) fn compact_pages(&mut self) -> Result<bool> {
2990        let mut progress = false;
2991
2992        // Find the 1M highest pages
2993        let mut highest_pages = BTreeMap::new();
2994        let mut tables = self.tables.lock();
2995        let table_tree = &mut tables.table_tree;
2996        table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
2997        let mut system_tables = self.system_tables.lock();
2998        let system_table_tree = &mut system_tables.table_tree;
2999        system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
3000
3001        // Calculate how many of them can be relocated to lower pages, starting from the last page
3002        let mut relocation_map = HashMap::new();
3003        for path in highest_pages.into_values().rev() {
3004            if relocation_map.contains_key(&path.page_number()) {
3005                continue;
3006            }
3007            let old_page = self.mem.get_page(path.page_number())?;
3008            let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
3009            let new_page_number = new_page.get_page_number();
3010            // We have to copy at least the page type into the new page.
3011            // Otherwise its cache priority will be calculated incorrectly
3012            new_page.memory_mut()?[0] = old_page.memory()[0];
3013            drop(new_page);
3014            // We're able to move this to a lower page, so insert it and rewrite all its parents
3015            if new_page_number < path.page_number() {
3016                relocation_map.insert(path.page_number(), new_page_number);
3017                for parent in path.parents() {
3018                    if relocation_map.contains_key(parent) {
3019                        continue;
3020                    }
3021                    let old_parent = self.mem.get_page(*parent)?;
3022                    let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
3023                    let new_page_number = new_page.get_page_number();
3024                    // We have to copy at least the page type into the new page.
3025                    // Otherwise its cache priority will be calculated incorrectly
3026                    new_page.memory_mut()?[0] = old_parent.memory()[0];
3027                    drop(new_page);
3028                    relocation_map.insert(*parent, new_page_number);
3029                }
3030            } else {
3031                self.mem
3032                    .free(new_page_number, &mut PageTrackerPolicy::Ignore);
3033                break;
3034            }
3035        }
3036
3037        if !relocation_map.is_empty() {
3038            progress = true;
3039        }
3040
3041        table_tree.relocate_tables(&relocation_map)?;
3042        system_table_tree.relocate_tables(&relocation_map)?;
3043
3044        Ok(progress)
3045    }
3046
3047    // NOTE: must be called before store_system_freed_pages() during commit, since this can create
3048    // more pages freed by the current transaction
3049    fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
3050        // We assume below that PageNumber is length 8
3051        assert_eq!(PageNumber::serialized_size(), 8);
3052
3053        // Handle the data freed tree
3054        let mut system_tables = self.system_tables.lock();
3055        {
3056            let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
3057            let key = TransactionIdWithPagination {
3058                transaction_id: free_until.raw_id(),
3059                pagination_id: 0,
3060            };
3061            for entry in data_freed.extract_from_if(..key, |_, _| true)? {
3062                let (_, page_list) = entry?;
3063                for i in 0..page_list.value().len() {
3064                    self.mem
3065                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
3066                }
3067            }
3068        }
3069
3070        // Handle the system freed tree
3071        {
3072            let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
3073            let key = TransactionIdWithPagination {
3074                transaction_id: free_until.raw_id(),
3075                pagination_id: 0,
3076            };
3077            for entry in system_freed.extract_from_if(..key, |_, _| true)? {
3078                let (_, page_list) = entry?;
3079                for i in 0..page_list.value().len() {
3080                    self.mem
3081                        .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
3082                }
3083            }
3084        }
3085
3086        Ok(())
3087    }
3088
3089    fn process_freed_pages_nondurable_helper(
3090        &mut self,
3091        free_until: TransactionId,
3092        definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
3093    ) -> Result<Vec<TransactionId>> {
3094        let mut processed = vec![];
3095        let mut system_tables = self.system_tables.lock();
3096
3097        let last_key = TransactionIdWithPagination {
3098            transaction_id: free_until.raw_id(),
3099            pagination_id: 0,
3100        };
3101        let oldest_unprocessed = self
3102            .transaction_tracker
3103            .oldest_unprocessed_non_durable_commit()?
3104            .map_or(free_until.raw_id(), |x| x.raw_id());
3105        let first_key = TransactionIdWithPagination {
3106            transaction_id: oldest_unprocessed,
3107            pagination_id: 0,
3108        };
3109        let mut data_freed = system_tables.open_system_table(self, definition)?;
3110
3111        let mut candidate_transactions = vec![];
3112        for entry in data_freed.range(first_key..last_key)? {
3113            let (key, _) = entry?;
3114            let transaction_id = TransactionId::new(key.value().transaction_id);
3115            if self
3116                .transaction_tracker
3117                .is_unprocessed_non_durable_commit(transaction_id)?
3118            {
3119                candidate_transactions.push(transaction_id);
3120            }
3121        }
3122        for transaction_id in candidate_transactions {
3123            let mut key = TransactionIdWithPagination {
3124                transaction_id: transaction_id.raw_id(),
3125                pagination_id: 0,
3126            };
3127            loop {
3128                let Some(entry) = data_freed.get(&key)? else {
3129                    break;
3130                };
3131                let pages = entry.value();
3132                let mut new_pages = vec![];
3133                for i in 0..pages.len() {
3134                    let page = pages.get(i);
3135                    if !self
3136                        .mem
3137                        .free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
3138                    {
3139                        new_pages.push(page);
3140                    }
3141                }
3142                if new_pages.len() != pages.len() {
3143                    drop(entry);
3144                    if new_pages.is_empty() {
3145                        data_freed.remove(&key)?;
3146                    } else {
3147                        let required = PageList::required_bytes(new_pages.len());
3148                        let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
3149                        for page in new_pages {
3150                            page_list_mut.as_mut().push_back(page);
3151                        }
3152                    }
3153                }
3154                key.pagination_id += 1;
3155            }
3156            processed.push(transaction_id);
3157        }
3158
3159        Ok(processed)
3160    }
3161
3162    // NOTE: must be called before store_system_freed_pages() during commit, since this can create
3163    // more pages freed by the current transaction
3164    //
3165    // This method only frees pages that are unpersisted, in non-durable transactions, since
3166    // it is called from a non-durable commit() and therefore can't modify anything that the
3167    // on-disk state in the last durable transaction might reference.
3168    fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
3169        // We assume below that PageNumber is length 8
3170        assert_eq!(PageNumber::serialized_size(), 8);
3171
3172        // Handle the data freed tree
3173        let mut processed =
3174            self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
3175
3176        // Handle the system freed tree
3177        processed
3178            .extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
3179
3180        for transaction_id in processed {
3181            self.transaction_tracker
3182                .mark_unprocessed_non_durable_commit(transaction_id)?;
3183        }
3184
3185        Ok(())
3186    }
3187
3188    fn store_system_freed_pages(
3189        &self,
3190        system_tree: &mut TableTreeMut,
3191        system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
3192        mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
3193        pagination_counter: &mut u64,
3194    ) -> Result {
3195        assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8
3196
3197        system_tree.open_table_and_flush_table_root(
3198            SYSTEM_FREED_TABLE.name(),
3199            |system_freed_tree: &mut SystemFreedTree| {
3200                while !system_freed_pages.lock().is_empty() {
3201                    let chunk_size = 200;
3202                    let buffer_size = PageList::required_bytes(chunk_size);
3203                    let key = TransactionIdWithPagination {
3204                        transaction_id: self.transaction_id.raw_id(),
3205                        pagination_id: *pagination_counter,
3206                    };
3207                    let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
3208
3209                    let mut freed_pages = system_freed_pages.lock();
3210                    let len = freed_pages.len();
3211                    access_guard.as_mut().clear();
3212                    for page in freed_pages.drain(len - min(len, chunk_size)..) {
3213                        if let Some(ref mut unpersisted_pages) = unpersisted_pages
3214                            && self.mem.unpersisted(page)
3215                        {
3216                            unpersisted_pages.push(page);
3217                        } else {
3218                            access_guard.as_mut().push_back(page);
3219                        }
3220                    }
3221                    drop(access_guard);
3222
3223                    *pagination_counter += 1;
3224                }
3225                Ok(())
3226            },
3227        )?;
3228
3229        Ok(())
3230    }
3231
3232    /// Retrieves information about storage usage in the database
3233    pub fn stats(&self) -> Result<DatabaseStats> {
3234        let tables = self.tables.lock();
3235        let table_tree = &tables.table_tree;
3236        let data_tree_stats = table_tree.stats()?;
3237
3238        let system_tables = self.system_tables.lock();
3239        let system_table_tree = &system_tables.table_tree;
3240        let system_tree_stats = system_table_tree.stats()?;
3241
3242        let total_metadata_bytes = data_tree_stats.metadata_bytes()
3243            + system_tree_stats.metadata_bytes
3244            + system_tree_stats.stored_leaf_bytes;
3245        let total_fragmented = data_tree_stats.fragmented_bytes()
3246            + system_tree_stats.fragmented_bytes
3247            + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
3248
3249        Ok(DatabaseStats {
3250            tree_height: data_tree_stats.tree_height(),
3251            allocated_pages: self.mem.count_allocated_pages()?,
3252            free_pages: self.mem.count_free_pages()?,
3253            trailing_free_pages: self.mem.trailing_free_pages()?,
3254            leaf_pages: data_tree_stats.leaf_pages(),
3255            branch_pages: data_tree_stats.branch_pages(),
3256            stored_leaf_bytes: data_tree_stats.stored_bytes(),
3257            metadata_bytes: total_metadata_bytes,
3258            fragmented_bytes: total_fragmented,
3259            page_size: self.mem.get_page_size(),
3260        })
3261    }
3262
3263    #[allow(dead_code)]
3264    #[cfg(feature = "std")]
3265    pub(crate) fn print_debug(&self) -> Result {
3266        // Flush any pending updates to make sure we get the latest root
3267        let mut tables = self.tables.lock();
3268        if let Some(page) = tables
3269            .table_tree
3270            .flush_table_root_updates()?
3271            .finalize_dirty_checksums()?
3272        {
3273            eprintln!("Master tree:");
3274            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new_uncompressed(
3275                Some(page),
3276                PageHint::None,
3277                self.transaction_guard.clone(),
3278                self.mem.clone(),
3279            )?;
3280            master_tree.print_debug(true)?;
3281        }
3282
3283        // Flush any pending updates to make sure we get the latest root
3284        let mut system_tables = self.system_tables.lock();
3285        if let Some(page) = system_tables
3286            .table_tree
3287            .flush_table_root_updates()?
3288            .finalize_dirty_checksums()?
3289        {
3290            eprintln!("System tree:");
3291            let master_tree: Btree<&str, InternalTableDefinition> = Btree::new_uncompressed(
3292                Some(page),
3293                PageHint::None,
3294                self.transaction_guard.clone(),
3295                self.mem.clone(),
3296            )?;
3297            master_tree.print_debug(true)?;
3298        }
3299
3300        Ok(())
3301    }
3302}
3303
3304impl Drop for WriteTransaction {
3305    fn drop(&mut self) {
3306        let is_panicking = {
3307            #[cfg(feature = "std")]
3308            {
3309                std::thread::panicking()
3310            }
3311            #[cfg(not(feature = "std"))]
3312            {
3313                false
3314            }
3315        };
3316        if !self.completed && !is_panicking && !self.mem.storage_failure() {
3317            #[allow(unused_variables)]
3318            if let Err(error) = self.abort_inner() {
3319                #[cfg(feature = "logging")]
3320                warn!("Failure automatically aborting transaction: {error}");
3321            }
3322        } else if !self.completed && self.mem.storage_failure() {
3323            self.tables.lock().table_tree.clear_root_updates_and_close();
3324        }
3325    }
3326}
3327
3328/// A read-only transaction
3329///
3330/// Read-only transactions may exist concurrently with writes
3331pub struct ReadTransaction {
3332    mem: Arc<TransactionalMemory>,
3333    tree: TableTree,
3334    transaction_id: Option<u64>,
3335    observer: Arc<dyn crate::observer::DatabaseObserver>,
3336    #[cfg(feature = "metrics")]
3337    db_metrics: Arc<crate::observer::DbMetrics>,
3338}
3339
3340impl ReadTransaction {
3341    pub(crate) fn new(
3342        mem: Arc<TransactionalMemory>,
3343        guard: TransactionGuard,
3344        observer: Arc<dyn crate::observer::DatabaseObserver>,
3345        #[cfg(feature = "metrics")] db_metrics: Arc<crate::observer::DbMetrics>,
3346    ) -> Result<Self, TransactionError> {
3347        let root_page = mem.get_data_root();
3348        let txn_id = guard.id().ok().map(|id| id.raw_id());
3349        let guard = Arc::new(guard);
3350        Ok(Self {
3351            mem: mem.clone(),
3352            tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
3353                .map_err(TransactionError::Storage)?,
3354            transaction_id: txn_id,
3355            observer,
3356            #[cfg(feature = "metrics")]
3357            db_metrics,
3358        })
3359    }
3360
3361    /// Create a read transaction that reads from a historical `user_root` snapshot.
3362    pub(crate) fn new_historical(
3363        mem: Arc<TransactionalMemory>,
3364        guard: TransactionGuard,
3365        user_root: Option<BtreeHeader>,
3366        observer: Arc<dyn crate::observer::DatabaseObserver>,
3367        #[cfg(feature = "metrics")] db_metrics: Arc<crate::observer::DbMetrics>,
3368    ) -> Result<Self, TransactionError> {
3369        let txn_id = guard.id().ok().map(|id| id.raw_id());
3370        let guard = Arc::new(guard);
3371        Ok(Self {
3372            mem: mem.clone(),
3373            tree: TableTree::new(user_root, PageHint::Clean, guard, mem)
3374                .map_err(TransactionError::Storage)?,
3375            transaction_id: txn_id,
3376            observer,
3377            #[cfg(feature = "metrics")]
3378            db_metrics,
3379        })
3380    }
3381
3382    #[cfg(feature = "std")]
3383    pub(crate) fn table_tree(&self) -> &TableTree {
3384        &self.tree
3385    }
3386
3387    /// Open the given table
3388    pub fn open_table<K: Key + 'static, V: Value + 'static>(
3389        &self,
3390        definition: TableDefinition<K, V>,
3391    ) -> Result<ReadOnlyTable<K, V>, TableError> {
3392        let header = self
3393            .tree
3394            .get_table::<K, V>(definition.name(), TableType::Normal)?
3395            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
3396
3397        match header {
3398            InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
3399                definition.name().to_string(),
3400                table_root,
3401                PageHint::Clean,
3402                self.tree.transaction_guard().clone(),
3403                self.mem.clone(),
3404            )?),
3405            InternalTableDefinition::Multimap { .. } => {
3406                Err(TableError::Storage(StorageError::Internal(
3407                    "unexpected multimap table type when opening normal table".to_string(),
3408                )))
3409            }
3410        }
3411    }
3412
3413    /// Open a TTL-enabled table for reading.
3414    ///
3415    /// Returns an error if the table does not exist.
3416    #[cfg(feature = "std")]
3417    pub fn open_ttl_table<K: Key + 'static, V: Value + 'static>(
3418        &self,
3419        definition: crate::ttl_table::TtlTableDefinition<K, V>,
3420    ) -> Result<crate::ttl_table::ReadOnlyTtlTable<K, V>, TableError> {
3421        let inner = self.open_table(definition.inner_def())?;
3422        Ok(crate::ttl_table::ReadOnlyTtlTable::new(inner))
3423    }
3424
3425    /// Open an IVF-PQ vector index for reading.
3426    ///
3427    /// Returns an error if the index does not exist.
3428    pub fn open_ivfpq_index(
3429        &self,
3430        definition: &crate::ivfpq::config::IvfPqIndexDefinition,
3431    ) -> Result<crate::ivfpq::index::ReadOnlyIvfPqIndex, TableError> {
3432        crate::ivfpq::index::ReadOnlyIvfPqIndex::open(
3433            self,
3434            definition,
3435            #[cfg(feature = "metrics")]
3436            Arc::clone(&self.db_metrics),
3437        )
3438        .map_err(TableError::Storage)
3439    }
3440
3441    /// Open the given table without a type
3442    pub fn open_untyped_table(
3443        &self,
3444        handle: impl TableHandle,
3445    ) -> Result<ReadOnlyUntypedTable, TableError> {
3446        let header = self
3447            .tree
3448            .get_table_untyped(handle.name(), TableType::Normal)?
3449            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
3450
3451        match header {
3452            InternalTableDefinition::Normal {
3453                table_root,
3454                fixed_key_size,
3455                fixed_value_size,
3456                ..
3457            } => Ok(ReadOnlyUntypedTable::new(
3458                table_root,
3459                fixed_key_size,
3460                fixed_value_size,
3461                self.mem.clone(),
3462            )),
3463            InternalTableDefinition::Multimap { .. } => {
3464                Err(TableError::Storage(StorageError::Internal(
3465                    "unexpected multimap table type when opening untyped normal table".to_string(),
3466                )))
3467            }
3468        }
3469    }
3470
3471    /// Open the given table
3472    pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
3473        &self,
3474        definition: MultimapTableDefinition<K, V>,
3475    ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
3476        let header = self
3477            .tree
3478            .get_table::<K, V>(definition.name(), TableType::Multimap)?
3479            .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
3480
3481        match header {
3482            InternalTableDefinition::Normal { .. } => {
3483                Err(TableError::Storage(StorageError::Internal(
3484                    "unexpected normal table type when opening multimap table".to_string(),
3485                )))
3486            }
3487            InternalTableDefinition::Multimap {
3488                table_root,
3489                table_length,
3490                ..
3491            } => Ok(ReadOnlyMultimapTable::new(
3492                table_root,
3493                table_length,
3494                PageHint::Clean,
3495                self.tree.transaction_guard().clone(),
3496                self.mem.clone(),
3497            )?),
3498        }
3499    }
3500
3501    /// Open the given table without a type
3502    pub fn open_untyped_multimap_table(
3503        &self,
3504        handle: impl MultimapTableHandle,
3505    ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
3506        let header = self
3507            .tree
3508            .get_table_untyped(handle.name(), TableType::Multimap)?
3509            .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
3510
3511        match header {
3512            InternalTableDefinition::Normal { .. } => {
3513                Err(TableError::Storage(StorageError::Internal(
3514                    "unexpected normal table type when opening untyped multimap table".to_string(),
3515                )))
3516            }
3517            InternalTableDefinition::Multimap {
3518                table_root,
3519                table_length,
3520                fixed_key_size,
3521                fixed_value_size,
3522                ..
3523            } => Ok(ReadOnlyUntypedMultimapTable::new(
3524                table_root,
3525                table_length,
3526                fixed_key_size,
3527                fixed_value_size,
3528                self.mem.clone(),
3529            )),
3530        }
3531    }
3532
3533    /// List all the tables
3534    pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
3535        self.tree
3536            .list_tables(TableType::Normal)
3537            .map(|x| x.into_iter().map(UntypedTableHandle::new))
3538    }
3539
3540    /// List all the multimap tables
3541    pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
3542        self.tree
3543            .list_tables(TableType::Multimap)
3544            .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
3545    }
3546
3547    /// Open a read-only B-tree over a system table.
3548    ///
3549    /// Returns `None` if the system table does not exist (e.g. no blobs stored yet).
3550    fn open_system_btree<K: Key + 'static, V: Value + 'static>(
3551        &self,
3552        definition: SystemTableDefinition<K, V>,
3553    ) -> Result<Option<Btree<K, V>>> {
3554        let system_root = self.mem.get_system_root();
3555        let system_tree = TableTree::new(
3556            system_root,
3557            PageHint::Clean,
3558            self.tree.transaction_guard().clone(),
3559            self.mem.clone(),
3560        )?;
3561
3562        let header = system_tree.get_table::<K, V>(definition.name(), TableType::Normal);
3563        match header {
3564            Ok(Some(InternalTableDefinition::Normal { table_root, .. })) => {
3565                let btree = Btree::new_uncompressed(
3566                    table_root,
3567                    PageHint::Clean,
3568                    self.tree.transaction_guard().clone(),
3569                    self.mem.clone(),
3570                )?;
3571                Ok(Some(btree))
3572            }
3573            Ok(Some(InternalTableDefinition::Multimap { .. })) => Err(StorageError::Internal(
3574                "unexpected multimap table type in system table lookup".to_string(),
3575            )),
3576            Ok(None) => Ok(None),
3577            Err(e) => {
3578                Err(e.into_storage_error_or_internal("Internal error: blob system table corrupted"))
3579            }
3580        }
3581    }
3582
3583    /// Look up a single history snapshot by transaction ID (read-only).
3584    pub(crate) fn get_history_snapshot_ro(
3585        &self,
3586        transaction_id: u64,
3587    ) -> Result<Option<HistorySnapshot>> {
3588        let Some(btree) = self.open_system_btree(HISTORY_TABLE)? else {
3589            return Ok(None);
3590        };
3591        Ok(btree.get(&transaction_id)?.map(|guard| guard.value()))
3592    }
3593
3594    /// List all retained history snapshot IDs in ascending order (read-only).
3595    pub(crate) fn list_history_snapshot_ids_ro(&self) -> Result<Vec<u64>> {
3596        let Some(btree) = self.open_system_btree(HISTORY_TABLE)? else {
3597            return Ok(Vec::new());
3598        };
3599        let mut ids = Vec::new();
3600        for entry in btree.range::<core::ops::RangeFull, u64>(&(..))? {
3601            let guard = entry?;
3602            ids.push(guard.key());
3603        }
3604        Ok(ids)
3605    }
3606
3607    /// Retrieve a blob's data and metadata by ID.
3608    ///
3609    /// The returned data is verified against the stored xxh3-128 checksum.
3610    pub fn get_blob(&self, blob_id: &BlobId) -> Result<Option<(Vec<u8>, BlobMeta)>> {
3611        let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3612            return Ok(None);
3613        };
3614
3615        let Some(guard) = btree.get(blob_id)? else {
3616            return Ok(None);
3617        };
3618        let meta = guard.value();
3619
3620        let blob_state = self.mem.get_committed_blob_state();
3621        let file_offset = blob_state.region_offset + meta.blob_ref.offset;
3622        #[allow(clippy::cast_possible_truncation)]
3623        let data = self
3624            .mem
3625            .blob_read(file_offset, meta.blob_ref.length as usize)?;
3626
3627        let actual = xxh3_hash128(&data);
3628        if actual != meta.blob_ref.checksum {
3629            return Err(StorageError::BlobChecksumMismatch {
3630                sequence: blob_id.sequence,
3631                expected: meta.blob_ref.checksum,
3632                actual,
3633            });
3634        }
3635
3636        Ok(Some((data, meta)))
3637    }
3638
3639    /// Retrieve only a blob's metadata (no data read).
3640    pub fn get_blob_meta(&self, blob_id: &BlobId) -> Result<Option<BlobMeta>> {
3641        let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3642            return Ok(None);
3643        };
3644
3645        Ok(btree.get(blob_id)?.map(|g| g.value()))
3646    }
3647
3648    /// Look up a blob by its sequence number.
3649    ///
3650    /// IVF-PQ indexes store vector IDs as `u64` sequence numbers. This method
3651    /// resolves a sequence number back to the full `(BlobId, BlobMeta)` pair by
3652    /// performing a range scan on the blob table from `BlobId::new(seq, 0)` to
3653    /// `BlobId::new(seq, u64::MAX)`.
3654    ///
3655    /// Returns the first matching blob, or `None` if no blob has that sequence.
3656    pub fn blob_by_sequence(&self, seq: u64) -> Result<Option<(BlobId, BlobMeta)>> {
3657        let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3658            return Ok(None);
3659        };
3660
3661        let start = BlobId::new(seq, 0);
3662        let end = BlobId::new(seq, u64::MAX);
3663        let mut range = btree.range::<core::ops::RangeInclusive<BlobId>, BlobId>(&(start..=end))?;
3664        match range.next() {
3665            Some(entry) => {
3666                let entry = entry?;
3667                Ok(Some((entry.key(), entry.value())))
3668            }
3669            None => Ok(None),
3670        }
3671    }
3672
3673    /// Start building a composite multi-signal query.
3674    ///
3675    /// Fuses vector similarity, temporal recency, and causal proximity into
3676    /// a single ranked result set. See [`CompositeQuery`](crate::composite::CompositeQuery)
3677    /// for the full builder API.
3678    pub fn composite_query(&self) -> crate::composite::CompositeQuery<'_> {
3679        crate::composite::CompositeQuery::new(self)
3680    }
3681
3682    /// Read a byte range from a blob without checksum verification.
3683    ///
3684    /// Returns `None` if the blob does not exist. Returns
3685    /// [`StorageError::BlobRangeOutOfBounds`] if `offset + length` exceeds the
3686    /// blob's total size.
3687    pub fn read_blob_range(
3688        &self,
3689        blob_id: &BlobId,
3690        offset: u64,
3691        length: u64,
3692    ) -> Result<Option<Vec<u8>>> {
3693        let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3694            return Ok(None);
3695        };
3696
3697        let Some(guard) = btree.get(blob_id)? else {
3698            return Ok(None);
3699        };
3700        let meta = guard.value();
3701
3702        if length == 0 {
3703            return Ok(Some(Vec::new()));
3704        }
3705
3706        let end = offset.saturating_add(length);
3707        if end > meta.blob_ref.length {
3708            return Err(StorageError::BlobRangeOutOfBounds {
3709                blob_length: meta.blob_ref.length,
3710                requested_offset: offset,
3711                requested_length: length,
3712            });
3713        }
3714
3715        let blob_state = self.mem.get_committed_blob_state();
3716        let file_offset = blob_state.region_offset + meta.blob_ref.offset + offset;
3717        #[allow(clippy::cast_possible_truncation)]
3718        let data = self.mem.blob_read(file_offset, length as usize)?;
3719
3720        Ok(Some(data))
3721    }
3722
3723    /// Get a seekable reader for a blob's data.
3724    ///
3725    /// Returns `None` if the blob does not exist. The returned [`BlobReader`]
3726    /// implements [`std::io::Read`] and [`std::io::Seek`] for streaming access.
3727    ///
3728    /// Range reads bypass checksum verification since the stored checksum
3729    /// covers the entire blob.
3730    pub fn blob_reader(&self, blob_id: &BlobId) -> Result<Option<BlobReader>> {
3731        let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3732            return Ok(None);
3733        };
3734
3735        let Some(guard) = btree.get(blob_id)? else {
3736            return Ok(None);
3737        };
3738        let meta = guard.value();
3739
3740        let blob_state = self.mem.get_committed_blob_state();
3741        let file_offset = blob_state.region_offset + meta.blob_ref.offset;
3742
3743        Ok(Some(BlobReader::new(
3744            Arc::clone(&self.mem),
3745            file_offset,
3746            meta.blob_ref.length,
3747        )))
3748    }
3749
3750    /// Query deduplication statistics.
3751    ///
3752    /// Scans the dedup index and returns the number of unique content entries,
3753    /// total reference count across all entries, and estimated bytes saved by dedup.
3754    /// Returns zeroed stats if dedup has never been used.
3755    pub fn dedup_stats(&self) -> Result<DedupStats> {
3756        let Some(btree) = self.open_system_btree(BLOB_DEDUP_INDEX)? else {
3757            return Ok(DedupStats {
3758                total_dedup_entries: 0,
3759                total_ref_count: 0,
3760                bytes_saved: 0,
3761            });
3762        };
3763
3764        let mut total_dedup_entries: u64 = 0;
3765        let mut total_ref_count: u64 = 0;
3766        let mut bytes_saved: u64 = 0;
3767
3768        let range = btree.range::<RangeFull, Sha256Key>(&(..))?;
3769        for entry in range {
3770            let entry = entry?;
3771            let val = entry.value();
3772            total_dedup_entries += 1;
3773            total_ref_count += u64::from(val.ref_count);
3774            // Each extra reference beyond the first saves `length` bytes
3775            if val.ref_count > 1 {
3776                bytes_saved += val.length * u64::from(val.ref_count - 1);
3777            }
3778        }
3779
3780        Ok(DedupStats {
3781            total_dedup_entries,
3782            total_ref_count,
3783            bytes_saved,
3784        })
3785    }
3786
3787    /// Returns statistics about blob region space usage (committed state).
3788    ///
3789    /// Scans the primary blob table to compute live bytes, then compares with
3790    /// the committed region length to determine dead space and fragmentation.
3791    pub fn blob_stats(&self) -> Result<BlobStats> {
3792        let blob_state = self.mem.get_committed_blob_state();
3793        let region_bytes = blob_state.region_length;
3794
3795        if region_bytes == 0 {
3796            return Ok(BlobStats {
3797                blob_count: 0,
3798                live_bytes: 0,
3799                region_bytes: 0,
3800                dead_bytes: 0,
3801                fragmentation_ratio: 0.0,
3802            });
3803        }
3804
3805        let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3806            return Ok(BlobStats {
3807                blob_count: 0,
3808                live_bytes: 0,
3809                region_bytes,
3810                dead_bytes: region_bytes,
3811                fragmentation_ratio: 1.0,
3812            });
3813        };
3814
3815        let mut blob_count: u64 = 0;
3816        let mut unique_offsets = crate::compat::HashSet::new();
3817        let mut live_bytes: u64 = 0;
3818
3819        let range = btree.range::<RangeFull, BlobId>(&(..))?;
3820        for entry in range {
3821            let entry = entry?;
3822            let meta = entry.value();
3823            blob_count += 1;
3824            if unique_offsets.insert(meta.blob_ref.offset) {
3825                live_bytes += meta.blob_ref.length;
3826            }
3827        }
3828
3829        let dead_bytes = region_bytes.saturating_sub(live_bytes);
3830        #[allow(clippy::cast_precision_loss)]
3831        let fragmentation_ratio = if region_bytes > 0 {
3832            dead_bytes as f64 / region_bytes as f64
3833        } else {
3834            0.0
3835        };
3836
3837        Ok(BlobStats {
3838            blob_count,
3839            live_bytes,
3840            region_bytes,
3841            dead_bytes,
3842            fragmentation_ratio,
3843        })
3844    }
3845
3846    /// Query blobs within a wall-clock time range (nanoseconds).
3847    ///
3848    /// Returns `(TemporalKey, BlobMeta)` pairs ordered by timestamp.
3849    /// Complexity: O(log N + K) where K is the number of results.
3850    /// Returns an empty vec if `start_ns > end_ns`.
3851    pub fn blobs_in_time_range(
3852        &self,
3853        start_ns: u64,
3854        end_ns: u64,
3855    ) -> Result<Vec<(TemporalKey, BlobMeta)>> {
3856        if start_ns > end_ns {
3857            return Ok(Vec::new());
3858        }
3859        let Some(temporal_btree) = self.open_system_btree(BLOB_TEMPORAL_INDEX)? else {
3860            return Ok(Vec::new());
3861        };
3862        let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
3863            return Ok(Vec::new());
3864        };
3865
3866        let start = TemporalKey::range_start(start_ns);
3867        let end = TemporalKey::range_end(end_ns);
3868        let range = temporal_btree.range(&(start..=end))?;
3869
3870        let mut results = Vec::new();
3871        for entry in range {
3872            let entry = entry?;
3873            let temporal_key = entry.key();
3874            if let Some(meta_guard) = blob_btree.get(&temporal_key.blob_id)? {
3875                results.push((temporal_key, meta_guard.value()));
3876            }
3877        }
3878        Ok(results)
3879    }
3880
3881    /// Find blobs near a reference blob within a time window.
3882    ///
3883    /// Looks up the reference blob's timestamp, then scans +/-`window_ns`/2.
3884    /// Used for sensor fusion queries ("all inputs within 100ms of X").
3885    pub fn blobs_near(
3886        &self,
3887        reference: &BlobId,
3888        window_ns: u64,
3889    ) -> Result<Vec<(TemporalKey, BlobMeta)>> {
3890        let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
3891            return Ok(Vec::new());
3892        };
3893
3894        let Some(guard) = blob_btree.get(reference)? else {
3895            return Ok(Vec::new());
3896        };
3897        let ref_meta = guard.value();
3898
3899        let half_window = window_ns / 2;
3900        let start_ns = ref_meta.wall_clock_ns.saturating_sub(half_window);
3901        let end_ns = ref_meta.wall_clock_ns.saturating_add(half_window);
3902
3903        self.blobs_in_time_range(start_ns, end_ns)
3904    }
3905
3906    /// Traverse the causal chain backwards from a blob.
3907    ///
3908    /// Follows `causal_parent` links up to `max_hops` steps.
3909    /// Returns blobs from newest to oldest (starting with the given blob).
3910    /// The `Option<CausalEdge>` is the edge from the blob's parent to itself
3911    /// (`None` for the root of the chain or if no edge metadata exists).
3912    pub fn causal_chain(
3913        &self,
3914        blob_id: &BlobId,
3915        max_hops: usize,
3916    ) -> Result<Vec<(BlobId, BlobMeta, Option<CausalEdge>)>> {
3917        let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
3918            return Ok(Vec::new());
3919        };
3920
3921        let edges_btree = self.open_system_btree(BLOB_CAUSAL_EDGES)?;
3922        let legacy_btree = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?;
3923
3924        let mut chain = Vec::new();
3925        let mut current = *blob_id;
3926
3927        for _ in 0..=max_hops {
3928            let Some(g) = blob_btree.get(&current)? else {
3929                break;
3930            };
3931            let meta = g.value();
3932            let parent = meta.causal_parent;
3933
3934            // Look up the edge from parent -> current
3935            let edge = if let Some(parent_id) = parent {
3936                Self::lookup_causal_edge(
3937                    &parent_id,
3938                    &current,
3939                    edges_btree.as_ref(),
3940                    legacy_btree.as_ref(),
3941                )?
3942            } else {
3943                None
3944            };
3945
3946            chain.push((current, meta, edge));
3947            match parent {
3948                Some(p) => current = p,
3949                None => break,
3950            }
3951        }
3952
3953        Ok(chain)
3954    }
3955
3956    /// Get all direct causal children of a blob, with edge metadata.
3957    ///
3958    /// The v2 causal edges index uses a composite `(parent, child)` key,
3959    /// supporting multiple children per parent (branching causal graphs).
3960    pub fn causal_children(&self, blob_id: &BlobId) -> Result<Vec<CausalEdge>> {
3961        // Try v2 edges table (supports multiple children per parent)
3962        if let Some(edges_btree) = self.open_system_btree(BLOB_CAUSAL_EDGES)? {
3963            let start_key = CausalEdgeKey::new(*blob_id, BlobId::MIN);
3964            let end_key = CausalEdgeKey::new(*blob_id, BlobId::MAX);
3965            let mut children = Vec::new();
3966            let range = edges_btree.range(&(start_key..=end_key))?;
3967            for entry in range {
3968                let entry = entry?;
3969                children.push(entry.value());
3970            }
3971            if !children.is_empty() {
3972                return Ok(children);
3973            }
3974        }
3975
3976        // Fall back to legacy table
3977        if let Some(legacy_btree) = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?
3978            && let Some(g) = legacy_btree.get(blob_id)?
3979        {
3980            return Ok(vec![CausalEdge::legacy(g.value())]);
3981        }
3982
3983        Ok(Vec::new())
3984    }
3985
3986    /// Find a causal path between two blobs via backward traversal.
3987    ///
3988    /// Walks from `to` toward `from` via `causal_parent` links.
3989    /// Returns the path including both endpoints with edge metadata,
3990    /// or `None` if no path exists within `max_depth` hops.
3991    /// The `Option<CausalEdge>` is the edge from parent to the node
3992    /// (`None` for the `from` endpoint which has no incoming edge).
3993    pub fn causal_path(
3994        &self,
3995        from: &BlobId,
3996        to: &BlobId,
3997        max_depth: usize,
3998    ) -> Result<Option<CausalPath>> {
3999        if from == to {
4000            return Ok(Some(vec![(*from, None)]));
4001        }
4002
4003        let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
4004            return Ok(None);
4005        };
4006
4007        let edges_btree = self.open_system_btree(BLOB_CAUSAL_EDGES)?;
4008        let legacy_btree = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?;
4009
4010        // path stores (blob_id, edge from parent->this)
4011        let mut path: CausalPath = Vec::new();
4012        let mut current = *to;
4013        // We'll fill in edges as we walk backward; the `to` node has an edge from its parent
4014        for _ in 0..max_depth {
4015            let Some(g) = blob_btree.get(&current)? else {
4016                break;
4017            };
4018            let meta = g.value();
4019            let Some(parent) = meta.causal_parent else {
4020                break;
4021            };
4022
4023            // Edge from parent -> current
4024            let edge = Self::lookup_causal_edge(
4025                &parent,
4026                &current,
4027                edges_btree.as_ref(),
4028                legacy_btree.as_ref(),
4029            )?;
4030            path.push((current, edge));
4031
4032            if parent == *from {
4033                path.push((*from, None));
4034                path.reverse();
4035                return Ok(Some(path));
4036            }
4037            current = parent;
4038        }
4039
4040        Ok(None)
4041    }
4042
4043    /// Look up a causal edge by (parent, child) composite key, checking v2 table then legacy.
4044    fn lookup_causal_edge(
4045        parent: &BlobId,
4046        child: &BlobId,
4047        edges_btree: Option<&Btree<CausalEdgeKey, CausalEdge>>,
4048        legacy_btree: Option<&Btree<BlobId, BlobId>>,
4049    ) -> Result<Option<CausalEdge>> {
4050        if let Some(bt) = edges_btree {
4051            let key = CausalEdgeKey::new(*parent, *child);
4052            if let Some(g) = bt.get(&key)? {
4053                return Ok(Some(g.value()));
4054            }
4055        }
4056        if let Some(bt) = legacy_btree
4057            && let Some(g) = bt.get(parent)?
4058        {
4059            return Ok(Some(CausalEdge::legacy(g.value())));
4060        }
4061        Ok(None)
4062    }
4063
4064    /// Get all blobs with a given tag.
4065    ///
4066    /// Uses a prefix range scan on the tag index for O(log N + K) performance
4067    /// where K is the number of matching blobs.
4068    pub fn blobs_by_tag(&self, tag: &str) -> Result<Vec<BlobId>> {
4069        let Some(tag_btree) = self.open_system_btree(BLOB_TAG_INDEX)? else {
4070            return Ok(Vec::new());
4071        };
4072
4073        let start = TagKey::range_start(tag);
4074        let end = TagKey::range_end(tag);
4075        let range = tag_btree.range(&(start..=end))?;
4076
4077        let mut results = Vec::new();
4078        for entry in range {
4079            let entry = entry?;
4080            results.push(entry.key().blob_id);
4081        }
4082        Ok(results)
4083    }
4084
4085    /// Get all blobs in a namespace, with their metadata.
4086    ///
4087    /// Uses the namespace index for efficient prefix scanning.
4088    pub fn blobs_in_namespace(&self, namespace: &str) -> Result<Vec<(BlobId, BlobMeta)>> {
4089        let Some(ns_idx_btree) = self.open_system_btree(BLOB_NAMESPACE_INDEX)? else {
4090            return Ok(Vec::new());
4091        };
4092        let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
4093            return Ok(Vec::new());
4094        };
4095
4096        let start = NamespaceKey::range_start(namespace);
4097        let end = NamespaceKey::range_end(namespace);
4098        let range = ns_idx_btree.range(&(start..=end))?;
4099
4100        let mut results = Vec::new();
4101        for entry in range {
4102            let entry = entry?;
4103            let blob_id = entry.key().blob_id;
4104            if let Some(meta_guard) = blob_btree.get(&blob_id)? {
4105                results.push((blob_id, meta_guard.value()));
4106            }
4107        }
4108        Ok(results)
4109    }
4110
4111    /// Query blobs within a time range, optionally filtered by namespace.
4112    ///
4113    /// When `namespace` is `Some`, only blobs belonging to that namespace are
4114    /// included. When `None`, behaves identically to `blobs_in_time_range`.
4115    pub fn blobs_in_time_range_ns(
4116        &self,
4117        start_ns: u64,
4118        end_ns: u64,
4119        namespace: Option<&str>,
4120    ) -> Result<Vec<(TemporalKey, BlobMeta)>> {
4121        let results = self.blobs_in_time_range(start_ns, end_ns)?;
4122        let Some(ns) = namespace else {
4123            return Ok(results);
4124        };
4125
4126        let Some(ns_btree) = self.open_system_btree(BLOB_NAMESPACE)? else {
4127            return Ok(Vec::new());
4128        };
4129
4130        let mut filtered = Vec::new();
4131        for (tk, meta) in results {
4132            if let Some(ns_guard) = ns_btree.get(&tk.blob_id)?
4133                && ns_guard.value().namespace_str() == ns
4134            {
4135                filtered.push((tk, meta));
4136            }
4137        }
4138        Ok(filtered)
4139    }
4140
4141    /// Get tags for a blob.
4142    pub fn blob_tags(&self, blob_id: &BlobId) -> Result<Vec<String>> {
4143        let Some(tag_btree) = self.open_system_btree(BLOB_TAG_INDEX)? else {
4144            return Ok(Vec::new());
4145        };
4146
4147        // TagKey is ordered (tag, blob_id), so we must do a full scan to find
4148        // all tags for a given blob_id. Acceptable because the tag index is
4149        // typically small.
4150        let start = TagKey::new("", BlobId::MIN);
4151        let end = TagKey {
4152            tag: [0xFF; 32],
4153            tag_len: 32,
4154            blob_id: BlobId::MAX,
4155        };
4156        let range = tag_btree.range(&(start..=end))?;
4157        let mut tags = Vec::new();
4158        for entry in range {
4159            let entry = entry?;
4160            let key = entry.key();
4161            if key.blob_id == *blob_id {
4162                tags.push(key.tag_str().to_string());
4163            }
4164        }
4165        Ok(tags)
4166    }
4167
4168    /// Get namespace for a blob.
4169    pub fn blob_namespace(&self, blob_id: &BlobId) -> Result<Option<String>> {
4170        let Some(ns_btree) = self.open_system_btree(BLOB_NAMESPACE)? else {
4171            return Ok(None);
4172        };
4173
4174        match ns_btree.get(blob_id)? {
4175            Some(g) => Ok(Some(g.value().namespace_str().to_string())),
4176            None => Ok(None),
4177        }
4178    }
4179
4180    /// Read CDC changes committed after the given transaction ID.
4181    ///
4182    /// Returns all change records with `transaction_id > after_txn_id`,
4183    /// ordered by `(transaction_id, sequence)`. Returns an empty vec if
4184    /// CDC has never been used or if there are no newer changes.
4185    pub fn read_cdc_since(&self, after_txn_id: u64) -> Result<Vec<ChangeStream>> {
4186        let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
4187            return Ok(Vec::new());
4188        };
4189
4190        // Detect if the cursor is behind the retention window.
4191        // If the oldest retained entry is newer than the requested position,
4192        // entries have been pruned and the consumer may have missed changes.
4193        if after_txn_id > 0 {
4194            let mut all_range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
4195            if let Some(first) = all_range.next() {
4196                let first = first?;
4197                let oldest_txn = first.key().transaction_id;
4198                if oldest_txn > after_txn_id.saturating_add(1) {
4199                    return Err(StorageError::CdcCursorBehindRetention {
4200                        cursor_txn_id: after_txn_id,
4201                        oldest_retained_txn_id: oldest_txn,
4202                    });
4203                }
4204            }
4205        }
4206
4207        let start = CdcKey::new(after_txn_id.saturating_add(1), 0);
4208        let end = CdcKey::new(u64::MAX, u32::MAX);
4209        let range = btree.range::<core::ops::RangeInclusive<CdcKey>, CdcKey>(&(start..=end))?;
4210
4211        let mut results = Vec::new();
4212        for entry in range {
4213            let entry = entry?;
4214            let record = CdcRecord::deserialize(entry.value_data())?;
4215            results.push(ChangeStream::from_key_record(entry.key(), record));
4216        }
4217        Ok(results)
4218    }
4219
4220    /// Read CDC changes within a transaction ID range (inclusive on both ends).
4221    ///
4222    /// Returns all change records where `start_txn <= transaction_id <= end_txn`,
4223    /// ordered by `(transaction_id, sequence)`.
4224    pub fn read_cdc_range(&self, start_txn: u64, end_txn: u64) -> Result<Vec<ChangeStream>> {
4225        if start_txn > end_txn {
4226            return Ok(Vec::new());
4227        }
4228        let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
4229            return Ok(Vec::new());
4230        };
4231
4232        // Detect if the requested range start is behind the retention window.
4233        if start_txn > 0 {
4234            let mut all_range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
4235            if let Some(first) = all_range.next() {
4236                let first = first?;
4237                let oldest_txn = first.key().transaction_id;
4238                if oldest_txn > start_txn {
4239                    return Err(StorageError::CdcCursorBehindRetention {
4240                        cursor_txn_id: start_txn,
4241                        oldest_retained_txn_id: oldest_txn,
4242                    });
4243                }
4244            }
4245        }
4246
4247        let start = CdcKey::new(start_txn, 0);
4248        let end = CdcKey::new(end_txn, u32::MAX);
4249        let range = btree.range::<core::ops::RangeInclusive<CdcKey>, CdcKey>(&(start..=end))?;
4250
4251        let mut results = Vec::new();
4252        for entry in range {
4253            let entry = entry?;
4254            let record = CdcRecord::deserialize(entry.value_data())?;
4255            results.push(ChangeStream::from_key_record(entry.key(), record));
4256        }
4257        Ok(results)
4258    }
4259
4260    /// Read the position of a named CDC cursor.
4261    ///
4262    /// Returns the transaction ID that the named consumer has processed up to,
4263    /// or `None` if the cursor has never been set.
4264    pub fn cdc_cursor(&self, name: &str) -> Result<Option<u64>> {
4265        let Some(btree) = self.open_system_btree(CDC_CURSOR_TABLE)? else {
4266            return Ok(None);
4267        };
4268        match btree.get(&name)? {
4269            Some(guard) => Ok(Some(guard.value())),
4270            None => Ok(None),
4271        }
4272    }
4273
4274    /// Returns the transaction ID of the latest CDC log entry, or `None` if empty.
4275    pub fn latest_cdc_transaction_id(&self) -> Result<Option<u64>> {
4276        let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
4277            return Ok(None);
4278        };
4279        let mut range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
4280        match range.next_back() {
4281            Some(entry) => {
4282                let entry = entry?;
4283                Ok(Some(entry.key().transaction_id))
4284            }
4285            None => Ok(None),
4286        }
4287    }
4288
4289    /// Close the transaction
4290    ///
4291    /// Transactions are automatically closed when they and all objects referencing them have been dropped,
4292    /// so this method does not normally need to be called.
4293    /// This method can be used to ensure that there are no outstanding objects remaining.
4294    ///
4295    /// Returns `ReadTransactionStillInUse` error if a table or other object retrieved from the transaction still references this transaction
4296    pub fn close(self) -> Result<(), TransactionError> {
4297        if Arc::strong_count(self.tree.transaction_guard()) > 1 {
4298            return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
4299        }
4300        // No-op, just drop ourself
4301        Ok(())
4302    }
4303}
4304
4305impl Drop for ReadTransaction {
4306    fn drop(&mut self) {
4307        if let Some(id) = self.transaction_id {
4308            self.observer.on_read_end(id);
4309            #[cfg(feature = "metrics")]
4310            self.db_metrics
4311                .read_txn_closed
4312                .fetch_add(1, portable_atomic::Ordering::Relaxed);
4313        }
4314    }
4315}
4316
4317impl Debug for ReadTransaction {
4318    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
4319        f.write_str("ReadTransaction")
4320    }
4321}
4322
4323// ---------------------------------------------------------------------------
4324// storage_traits::StorageWrite for WriteTransaction
4325// ---------------------------------------------------------------------------
4326
4327impl crate::storage_traits::StorageWrite for WriteTransaction {
4328    type Table<'txn, K: Key + 'static, V: Value + 'static>
4329        = Table<'txn, K, V>
4330    where
4331        Self: 'txn;
4332
4333    fn open_storage_table<K: Key + 'static, V: Value + 'static>(
4334        &self,
4335        definition: TableDefinition<K, V>,
4336    ) -> Result<Self::Table<'_, K, V>> {
4337        self.open_table(definition)
4338            .map_err(|e| e.into_storage_error_or_internal("open_storage_table"))
4339    }
4340}
4341
4342// ---------------------------------------------------------------------------
4343// storage_traits::StorageRead for ReadTransaction
4344// ---------------------------------------------------------------------------
4345
4346impl crate::storage_traits::StorageRead for ReadTransaction {
4347    type Table<'txn, K: Key + 'static, V: Value + 'static>
4348        = ReadOnlyTable<K, V>
4349    where
4350        Self: 'txn;
4351
4352    fn open_storage_table<K: Key + 'static, V: Value + 'static>(
4353        &self,
4354        definition: TableDefinition<K, V>,
4355    ) -> Result<Self::Table<'_, K, V>> {
4356        self.open_table(definition)
4357            .map_err(|e| e.into_storage_error_or_internal("open_storage_table"))
4358    }
4359}
4360
4361#[cfg(test)]
4362mod test {
4363    use crate::{Database, TableDefinition};
4364
4365    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
4366
4367    #[test]
4368    fn transaction_id_persistence() {
4369        let tmpfile = crate::create_tempfile();
4370        let db = Database::create(tmpfile.path()).unwrap();
4371        let write_txn = db.begin_write().unwrap();
4372        {
4373            let mut table = write_txn.open_table(X).unwrap();
4374            table.insert("hello", "world").unwrap();
4375        }
4376        let first_txn_id = write_txn.transaction_id;
4377        write_txn.commit().unwrap();
4378        drop(db);
4379
4380        let db2 = Database::create(tmpfile.path()).unwrap();
4381        let write_txn = db2.begin_write().unwrap();
4382        assert!(write_txn.transaction_id > first_txn_id);
4383    }
4384}