1use crate::blob_store::reader::BlobReader;
2use crate::blob_store::types::{
3 BlobDedupConfig, BlobId, BlobMeta, BlobRef, BlobStats, CausalEdge, CausalEdgeKey, CausalPath,
4 ContentType, DedupStats, DedupVal, MAX_TAGS_PER_BLOB, NamespaceKey, NamespaceVal, Sha256Key,
5 StoreOptions, TagKey, TemporalKey,
6};
7use crate::blob_store::writer::BlobWriter;
8use crate::cdc::CdcConfig;
9use crate::cdc::types::{CdcEvent, CdcKey, CdcRecord, ChangeStream};
10use crate::compat::{HashMap, HashSet, Mutex};
11use crate::db::TransactionGuard;
12use crate::error::CommitError;
13use crate::multimap_table::ReadOnlyUntypedMultimapTable;
14use crate::sealed::Sealed;
15use crate::table::ReadOnlyUntypedTable;
16use crate::temporal::HybridLogicalClock;
17use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
18use crate::tree_store::{
19 Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
20 PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
21 TableTree, TableTreeMut, TableType, TransactionalMemory, hash64_with_seed, hash128_with_seed,
22};
23use crate::types::{Key, Value};
24use crate::{
25 AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
26 MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
27 Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
28 TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
29 UntypedTableHandle,
30};
31use alloc::boxed::Box;
32use alloc::collections::BTreeMap;
33use alloc::string::{String, ToString};
34use alloc::sync::Arc;
35use alloc::vec;
36use alloc::vec::Vec;
37use core::borrow::Borrow;
38use core::cmp::min;
39use core::fmt::{Debug, Display, Formatter};
40use core::marker::PhantomData;
41use core::mem::size_of;
42use core::ops::{RangeBounds, RangeFull};
43use core::panic;
44use core::sync::atomic::{AtomicBool, Ordering};
45#[cfg(feature = "logging")]
46use log::{debug, warn};
47use sha2::{Digest, Sha256};
48
49const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
50
51fn xxh3_hash64(data: &[u8]) -> u64 {
52 hash64_with_seed(data, 0)
53}
54
55fn xxh3_hash128(data: &[u8]) -> u128 {
56 hash128_with_seed(data, 0)
57}
58const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
59 SystemTableDefinition::new("next_savepoint_id");
60pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
61 SystemTableDefinition::new("persistent_savepoints");
62pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
65 TransactionIdWithPagination,
66 PageList,
67> = SystemTableDefinition::new("data_pages_allocated");
68pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
71 SystemTableDefinition::new("data_pages_unreachable");
72pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
75 SystemTableDefinition::new("system_pages_unreachable");
76const BLOB_TABLE: SystemTableDefinition<BlobId, BlobMeta> =
78 SystemTableDefinition::new("blob_store");
79const BLOB_TEMPORAL_INDEX: SystemTableDefinition<TemporalKey, ()> =
80 SystemTableDefinition::new("blob_temporal_idx");
81const BLOB_CAUSAL_CHILDREN: SystemTableDefinition<BlobId, BlobId> =
82 SystemTableDefinition::new("blob_causal_children");
83const BLOB_CAUSAL_EDGES: SystemTableDefinition<CausalEdgeKey, CausalEdge> =
84 SystemTableDefinition::new("blob_causal_edges_v2");
85const BLOB_TAG_INDEX: SystemTableDefinition<TagKey, ()> =
86 SystemTableDefinition::new("blob_tag_idx");
87const BLOB_NAMESPACE: SystemTableDefinition<BlobId, NamespaceVal> =
88 SystemTableDefinition::new("blob_namespace");
89const BLOB_NAMESPACE_INDEX: SystemTableDefinition<NamespaceKey, ()> =
90 SystemTableDefinition::new("blob_namespace_idx");
91const BLOB_DEDUP_INDEX: SystemTableDefinition<Sha256Key, DedupVal> =
92 SystemTableDefinition::new("blob_dedup_idx");
93const BLOB_DEDUP_MAP: SystemTableDefinition<BlobId, Sha256Key> =
94 SystemTableDefinition::new("blob_dedup_map");
95const CDC_LOG_TABLE: SystemTableDefinition<CdcKey, CdcRecord> =
96 SystemTableDefinition::new("cdc_log");
97const CDC_CURSOR_TABLE: SystemTableDefinition<&str, u64> =
98 SystemTableDefinition::new("cdc_cursors");
99const HISTORY_TABLE: SystemTableDefinition<u64, HistorySnapshot> =
100 SystemTableDefinition::new("transaction_history");
101#[derive(Debug, Clone)]
119pub(crate) struct HistorySnapshot {
120 data: [u8; Self::SIZE],
121}
122
123impl HistorySnapshot {
124 const SIZE: usize = 1 + BtreeHeader::serialized_size() + 5 * size_of::<u64>();
125
126 const USER_ROOT_FLAG: usize = 0;
127 const USER_ROOT: usize = 1;
128 const TIMESTAMP: usize = 1 + BtreeHeader::serialized_size();
129 const BLOB_OFFSET: usize = Self::TIMESTAMP + size_of::<u64>();
130 const BLOB_LENGTH: usize = Self::BLOB_OFFSET + size_of::<u64>();
131 const BLOB_SEQUENCE: usize = Self::BLOB_LENGTH + size_of::<u64>();
132 const BLOB_HLC: usize = Self::BLOB_SEQUENCE + size_of::<u64>();
133
134 const USER_ROOT_END: usize = Self::USER_ROOT + BtreeHeader::serialized_size();
135
136 pub(crate) fn new(
137 user_root: Option<BtreeHeader>,
138 timestamp_ms: u64,
139 blob_region_offset: u64,
140 blob_region_length: u64,
141 blob_next_sequence: u64,
142 blob_hlc_state: u64,
143 ) -> Self {
144 let mut data = [0u8; Self::SIZE];
145 if let Some(root) = user_root {
146 data[Self::USER_ROOT_FLAG] = 1;
147 data[Self::USER_ROOT..Self::USER_ROOT_END].copy_from_slice(&root.to_le_bytes());
148 }
149 data[Self::TIMESTAMP..Self::TIMESTAMP + 8].copy_from_slice(×tamp_ms.to_le_bytes());
150 data[Self::BLOB_OFFSET..Self::BLOB_OFFSET + 8]
151 .copy_from_slice(&blob_region_offset.to_le_bytes());
152 data[Self::BLOB_LENGTH..Self::BLOB_LENGTH + 8]
153 .copy_from_slice(&blob_region_length.to_le_bytes());
154 data[Self::BLOB_SEQUENCE..Self::BLOB_SEQUENCE + 8]
155 .copy_from_slice(&blob_next_sequence.to_le_bytes());
156 data[Self::BLOB_HLC..Self::BLOB_HLC + 8].copy_from_slice(&blob_hlc_state.to_le_bytes());
157 Self { data }
158 }
159
160 pub(crate) fn user_root(&self) -> Option<BtreeHeader> {
161 if self.data[Self::USER_ROOT_FLAG] != 0 {
162 self.data[Self::USER_ROOT..Self::USER_ROOT_END]
163 .try_into()
164 .ok()
165 .map(BtreeHeader::from_le_bytes)
166 } else {
167 None
168 }
169 }
170
171 pub(crate) fn timestamp_ms(&self) -> u64 {
172 self.data[Self::TIMESTAMP..Self::TIMESTAMP + 8]
173 .try_into()
174 .map(u64::from_le_bytes)
175 .unwrap_or(0)
176 }
177}
178
179impl Value for HistorySnapshot {
180 type SelfType<'a>
181 = HistorySnapshot
182 where
183 Self: 'a;
184 type AsBytes<'a>
185 = [u8; HistorySnapshot::SIZE]
186 where
187 Self: 'a;
188
189 fn fixed_width() -> Option<usize> {
190 Some(Self::SIZE)
191 }
192
193 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
194 where
195 Self: 'a,
196 {
197 let mut buf = [0u8; Self::SIZE];
198 buf.copy_from_slice(&data[..Self::SIZE]);
199 Self { data: buf }
200 }
201
202 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
203 where
204 Self: 'b,
205 {
206 value.data
207 }
208
209 fn type_name() -> TypeName {
210 TypeName::internal("redb::HistorySnapshot")
211 }
212}
213
214pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
217pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
218pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
219pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
220
221#[derive(Debug)]
225pub(crate) struct PageList<'a> {
226 data: &'a [u8],
227}
228
229impl PageList<'_> {
230 fn required_bytes(len: usize) -> usize {
231 2 + PageNumber::serialized_size() * len
232 }
233
234 pub(crate) fn len(&self) -> usize {
235 self.data[..size_of::<u16>()]
236 .try_into()
237 .map(|b| u16::from_le_bytes(b) as usize)
238 .unwrap_or(0)
239 }
240
241 pub(crate) fn get(&self, index: usize) -> PageNumber {
242 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
243 self.data[start..(start + PageNumber::serialized_size())]
244 .try_into()
245 .map(PageNumber::from_le_bytes)
246 .unwrap_or(PageNumber::new(0, 0, 0))
247 }
248}
249
250impl Value for PageList<'_> {
251 type SelfType<'a>
252 = PageList<'a>
253 where
254 Self: 'a;
255 type AsBytes<'a>
256 = &'a [u8]
257 where
258 Self: 'a;
259
260 fn fixed_width() -> Option<usize> {
261 None
262 }
263
264 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
265 where
266 Self: 'a,
267 {
268 PageList { data }
269 }
270
271 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
272 where
273 Self: 'b,
274 {
275 value.data
276 }
277
278 fn type_name() -> TypeName {
279 TypeName::internal("redb::PageList")
280 }
281}
282
283impl MutInPlaceValue for PageList<'_> {
284 type BaseRefType = PageListMut;
285
286 fn initialize(data: &mut [u8]) {
287 debug_assert!(data.len() >= 8);
288 if data.len() >= 8 {
290 data[..8].fill(0);
291 }
292 }
293
294 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
295 unsafe { &mut *(core::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
300 }
301}
302
303#[derive(Debug)]
304pub(crate) struct TransactionIdWithPagination {
305 pub(crate) transaction_id: u64,
306 pub(crate) pagination_id: u64,
307}
308
309impl Value for TransactionIdWithPagination {
310 type SelfType<'a>
311 = TransactionIdWithPagination
312 where
313 Self: 'a;
314 type AsBytes<'a>
315 = [u8; 2 * size_of::<u64>()]
316 where
317 Self: 'a;
318
319 fn fixed_width() -> Option<usize> {
320 Some(2 * size_of::<u64>())
321 }
322
323 #[allow(clippy::big_endian_bytes)]
324 fn from_bytes<'a>(data: &'a [u8]) -> Self
325 where
326 Self: 'a,
327 {
328 let transaction_id = data[..size_of::<u64>()]
329 .try_into()
330 .map(u64::from_be_bytes)
331 .unwrap_or(0);
332 let pagination_id = data[size_of::<u64>()..]
333 .try_into()
334 .map(u64::from_be_bytes)
335 .unwrap_or(0);
336 Self {
337 transaction_id,
338 pagination_id,
339 }
340 }
341
342 #[allow(clippy::big_endian_bytes)]
343 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
344 where
345 Self: 'b,
346 {
347 let mut result = [0u8; 2 * size_of::<u64>()];
348 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_be_bytes());
349 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_be_bytes());
350 result
351 }
352
353 fn type_name() -> TypeName {
354 TypeName::internal("redb::TransactionIdWithPagination")
355 }
356}
357
358impl Key for TransactionIdWithPagination {
359 fn compare(data1: &[u8], data2: &[u8]) -> core::cmp::Ordering {
360 let len = (2 * size_of::<u64>()).min(data1.len()).min(data2.len());
362 data1[..len]
363 .cmp(&data2[..len])
364 .then_with(|| data1.len().cmp(&data2.len()))
365 }
366}
367
368#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
369pub(crate) enum AllocatorStateKey {
370 Deprecated,
371 Region(u32),
372 RegionTracker,
373 TransactionId,
374}
375
376impl Value for AllocatorStateKey {
377 type SelfType<'a> = Self;
378 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
379
380 fn fixed_width() -> Option<usize> {
381 Some(1 + size_of::<u32>())
382 }
383
384 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
385 where
386 Self: 'a,
387 {
388 match data[0] {
389 3 => Self::Region(data[1..].try_into().map(u32::from_le_bytes).unwrap_or(0)),
390 4 => Self::RegionTracker,
391 5 => Self::TransactionId,
392 _ => Self::Deprecated,
395 }
396 }
397
398 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
399 where
400 Self: 'a,
401 Self: 'b,
402 {
403 let mut result = Self::AsBytes::default();
404 match value {
405 Self::Region(region) => {
406 result[0] = 3;
407 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
408 }
409 Self::RegionTracker => {
410 result[0] = 4;
411 }
412 Self::TransactionId => {
413 result[0] = 5;
414 }
415 AllocatorStateKey::Deprecated => {
416 result[0] = 0;
417 }
418 }
419
420 result
421 }
422
423 fn type_name() -> TypeName {
424 TypeName::internal("redb::AllocatorStateKey")
425 }
426}
427
428impl Key for AllocatorStateKey {
429 fn compare(data1: &[u8], data2: &[u8]) -> core::cmp::Ordering {
430 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
431 }
432}
433
434pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
435 name: &'a str,
436 _key_type: PhantomData<K>,
437 _value_type: PhantomData<V>,
438}
439
440impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
441 pub const fn new(name: &'a str) -> Self {
442 assert!(!name.is_empty());
443 Self {
444 name,
445 _key_type: PhantomData,
446 _value_type: PhantomData,
447 }
448 }
449}
450
451impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
452 fn name(&self) -> &str {
453 self.name
454 }
455}
456
457impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
458
459impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
460 fn clone(&self) -> Self {
461 *self
462 }
463}
464
465impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
466
467impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
468 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
469 write!(
470 f,
471 "{}<{}, {}>",
472 self.name,
473 K::type_name().name(),
474 V::type_name().name()
475 )
476 }
477}
478
479#[derive(Debug, Clone, Copy)]
481pub struct DatabaseStats {
482 pub(crate) tree_height: u32,
483 pub(crate) allocated_pages: u64,
484 pub(crate) free_pages: u64,
485 pub(crate) trailing_free_pages: u64,
486 pub(crate) leaf_pages: u64,
487 pub(crate) branch_pages: u64,
488 pub(crate) stored_leaf_bytes: u64,
489 pub(crate) metadata_bytes: u64,
490 pub(crate) fragmented_bytes: u64,
491 pub(crate) page_size: usize,
492}
493
494impl DatabaseStats {
495 pub fn tree_height(&self) -> u32 {
497 self.tree_height
498 }
499
500 pub fn allocated_pages(&self) -> u64 {
502 self.allocated_pages
503 }
504
505 pub fn free_pages(&self) -> u64 {
507 self.free_pages
508 }
509
510 pub fn trailing_free_pages(&self) -> u64 {
513 self.trailing_free_pages
514 }
515
516 pub fn leaf_pages(&self) -> u64 {
518 self.leaf_pages
519 }
520
521 pub fn branch_pages(&self) -> u64 {
523 self.branch_pages
524 }
525
526 pub fn stored_bytes(&self) -> u64 {
529 self.stored_leaf_bytes
530 }
531
532 pub fn metadata_bytes(&self) -> u64 {
534 self.metadata_bytes
535 }
536
537 pub fn fragmented_bytes(&self) -> u64 {
539 self.fragmented_bytes
540 }
541
542 pub fn page_size(&self) -> usize {
544 self.page_size
545 }
546}
547
548#[derive(Copy, Clone, Debug)]
549#[non_exhaustive]
550pub enum Durability {
551 None,
554 Immediate,
557}
558
559#[derive(Copy, Clone, Debug, PartialEq, Eq)]
562enum InternalDurability {
563 None,
564 Immediate,
565}
566
567pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
569 name: String,
570 namespace: &'s mut SystemNamespace<'db>,
571 tree: BtreeMut<'s, K, V>,
572 transaction_guard: Arc<TransactionGuard>,
573}
574
575impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
576 fn new(
577 name: &str,
578 table_root: Option<BtreeHeader>,
579 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
580 guard: Arc<TransactionGuard>,
581 mem: Arc<TransactionalMemory>,
582 namespace: &'s mut SystemNamespace<'db>,
583 ) -> SystemTable<'db, 's, K, V> {
584 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
587 SystemTable {
588 name: name.to_string(),
589 namespace,
590 tree: BtreeMut::new_uncompressed(table_root, guard.clone(), mem, freed_pages, ignore),
591 transaction_guard: guard,
592 }
593 }
594
595 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
596 where
597 K: 'a,
598 {
599 self.tree.get(key.borrow())
600 }
601
602 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
603 where
604 K: 'a,
605 KR: Borrow<K::SelfType<'a>> + 'a,
606 {
607 self.tree
608 .range(&range)
609 .map(|x| Range::new(x, self.transaction_guard.clone()))
610 }
611
612 pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
613 &mut self,
614 range: impl RangeBounds<KR> + 'a,
615 predicate: F,
616 ) -> Result<ExtractIf<'_, K, V, F>>
617 where
618 KR: Borrow<K::SelfType<'a>> + 'a,
619 {
620 self.tree
621 .extract_from_if(&range, predicate)
622 .map(ExtractIf::new)
623 }
624
625 pub fn insert<'k, 'v>(
626 &mut self,
627 key: impl Borrow<K::SelfType<'k>>,
628 value: impl Borrow<V::SelfType<'v>>,
629 ) -> Result<Option<AccessGuard<'_, V>>> {
630 let value_len = V::as_bytes(value.borrow()).as_ref().len();
631 if value_len > MAX_VALUE_LENGTH {
632 return Err(StorageError::ValueTooLarge(value_len));
633 }
634 let key_len = K::as_bytes(key.borrow()).as_ref().len();
635 if key_len > MAX_VALUE_LENGTH {
636 return Err(StorageError::ValueTooLarge(key_len));
637 }
638 if value_len + key_len > MAX_PAIR_LENGTH {
639 return Err(StorageError::ValueTooLarge(value_len + key_len));
640 }
641 self.tree.insert(key.borrow(), value.borrow())
642 }
643
644 pub fn remove<'a>(
645 &mut self,
646 key: impl Borrow<K::SelfType<'a>>,
647 ) -> Result<Option<AccessGuard<'_, V>>>
648 where
649 K: 'a,
650 {
651 self.tree.remove(key.borrow())
652 }
653}
654
655impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
656 pub fn insert_reserve<'a>(
657 &mut self,
658 key: impl Borrow<K::SelfType<'a>>,
659 value_length: usize,
660 ) -> Result<AccessGuardMutInPlace<'_, V>> {
661 if value_length > MAX_VALUE_LENGTH {
662 return Err(StorageError::ValueTooLarge(value_length));
663 }
664 let key_len = K::as_bytes(key.borrow()).as_ref().len();
665 if key_len > MAX_VALUE_LENGTH {
666 return Err(StorageError::ValueTooLarge(key_len));
667 }
668 if value_length + key_len > MAX_PAIR_LENGTH {
669 return Err(StorageError::ValueTooLarge(value_length + key_len));
670 }
671 self.tree.insert_reserve(key.borrow(), value_length)
672 }
673}
674
675impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
676 fn drop(&mut self) {
677 self.namespace.close_table(
678 &self.name,
679 &self.tree,
680 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
681 );
682 }
683}
684
685struct SystemNamespace<'db> {
686 table_tree: TableTreeMut<'db>,
687 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
688 transaction_guard: Arc<TransactionGuard>,
689}
690
691impl<'db> SystemNamespace<'db> {
692 fn new(
693 root_page: Option<BtreeHeader>,
694 guard: Arc<TransactionGuard>,
695 mem: Arc<TransactionalMemory>,
696 ) -> Self {
697 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
700 let freed_pages = Arc::new(Mutex::new(vec![]));
701 Self {
702 table_tree: TableTreeMut::new(
703 root_page,
704 guard.clone(),
705 mem,
706 freed_pages.clone(),
707 ignore,
708 ),
709 freed_pages,
710 transaction_guard: guard.clone(),
711 }
712 }
713
714 fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
715 self.freed_pages.clone()
716 }
717
718 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
719 &'s mut self,
720 transaction: &'txn WriteTransaction,
721 definition: SystemTableDefinition<K, V>,
722 ) -> Result<SystemTable<'db, 's, K, V>> {
723 let (root, _) = self
724 .table_tree
725 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
726 .map_err(|e| {
727 e.into_storage_error_or_internal("Internal error. System table is corrupted")
728 })?;
729 transaction.dirty.store(true, Ordering::Release);
730
731 Ok(SystemTable::new(
732 definition.name(),
733 root,
734 self.freed_pages.clone(),
735 self.transaction_guard.clone(),
736 transaction.mem.clone(),
737 self,
738 ))
739 }
740
741 fn close_table<K: Key + 'static, V: Value + 'static>(
742 &mut self,
743 name: &str,
744 table: &BtreeMut<K, V>,
745 length: u64,
746 ) {
747 self.table_tree
748 .stage_update_table_root(name, table.get_root(), length);
749 }
750}
751
752struct TableNamespace<'db> {
753 open_tables: HashMap<String, &'static panic::Location<'static>>,
754 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
755 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
756 table_tree: TableTreeMut<'db>,
757}
758
759impl TableNamespace<'_> {
760 fn new(
761 root_page: Option<BtreeHeader>,
762 guard: Arc<TransactionGuard>,
763 mem: Arc<TransactionalMemory>,
764 ) -> Self {
765 let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
766 let freed_pages = Arc::new(Mutex::new(vec![]));
767 let table_tree = TableTreeMut::new(
768 root_page,
769 guard,
770 mem,
771 freed_pages.clone(),
774 allocated.clone(),
775 );
776 Self {
777 open_tables: Default::default(),
778 table_tree,
779 freed_pages,
780 allocated_pages: allocated,
781 }
782 }
783
784 fn set_dirty(&mut self, transaction: &WriteTransaction) {
785 transaction.dirty.store(true, Ordering::Release);
786 if !transaction
787 .transaction_tracker
788 .any_savepoint_exists()
789 .unwrap_or(true)
790 {
791 *self.allocated_pages.lock() = PageTrackerPolicy::Ignore;
794 }
795 }
796
797 fn set_root(&mut self, root: Option<BtreeHeader>) -> Result<(), StorageError> {
798 if !self.open_tables.is_empty() {
799 return Err(StorageError::Internal(
800 "set_root called with open tables".into(),
801 ));
802 }
803 self.table_tree.clear_pending_updates();
808 self.table_tree.set_root(root);
809 Ok(())
810 }
811
812 #[track_caller]
813 fn inner_open<K: Key + 'static, V: Value + 'static>(
814 &mut self,
815 name: &str,
816 table_type: TableType,
817 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
818 if let Some(location) = self.open_tables.get(name) {
819 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
820 }
821
822 let root = self
823 .table_tree
824 .get_or_create_table::<K, V>(name, table_type)?;
825 self.open_tables
826 .insert(name.to_string(), panic::Location::caller());
827
828 Ok(root)
829 }
830
831 #[track_caller]
832 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
833 &mut self,
834 transaction: &'txn WriteTransaction,
835 definition: MultimapTableDefinition<K, V>,
836 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
837 #[cfg(feature = "logging")]
838 debug!("Opening multimap table: {definition}");
839 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
840 self.set_dirty(transaction);
841
842 Ok(MultimapTable::new(
843 definition.name(),
844 root,
845 length,
846 self.freed_pages.clone(),
847 self.allocated_pages.clone(),
848 transaction.mem.clone(),
849 transaction,
850 ))
851 }
852
853 #[track_caller]
854 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
855 &mut self,
856 transaction: &'txn WriteTransaction,
857 definition: TableDefinition<K, V>,
858 ) -> Result<Table<'txn, K, V>, TableError> {
859 #[cfg(feature = "logging")]
860 debug!("Opening table: {definition}");
861 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
862 self.set_dirty(transaction);
863
864 Ok(Table::new(
865 definition.name(),
866 root,
867 self.freed_pages.clone(),
868 self.allocated_pages.clone(),
869 transaction.mem.clone(),
870 transaction,
871 ))
872 }
873
874 #[track_caller]
875 fn inner_rename(
876 &mut self,
877 name: &str,
878 new_name: &str,
879 table_type: TableType,
880 ) -> Result<(), TableError> {
881 if let Some(location) = self.open_tables.get(name) {
882 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
883 }
884
885 self.table_tree.rename_table(name, new_name, table_type)
886 }
887
888 #[track_caller]
889 fn rename_table(
890 &mut self,
891 transaction: &WriteTransaction,
892 name: &str,
893 new_name: &str,
894 ) -> Result<(), TableError> {
895 #[cfg(feature = "logging")]
896 debug!("Renaming table: {name} to {new_name}");
897 self.set_dirty(transaction);
898 self.inner_rename(name, new_name, TableType::Normal)
899 }
900
901 #[track_caller]
902 fn rename_multimap_table(
903 &mut self,
904 transaction: &WriteTransaction,
905 name: &str,
906 new_name: &str,
907 ) -> Result<(), TableError> {
908 #[cfg(feature = "logging")]
909 debug!("Renaming multimap table: {name} to {new_name}");
910 self.set_dirty(transaction);
911 self.inner_rename(name, new_name, TableType::Multimap)
912 }
913
914 #[track_caller]
915 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
916 if let Some(location) = self.open_tables.get(name) {
917 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
918 }
919
920 self.table_tree.delete_table(name, table_type)
921 }
922
923 #[track_caller]
924 fn delete_table(
925 &mut self,
926 transaction: &WriteTransaction,
927 name: &str,
928 ) -> Result<bool, TableError> {
929 #[cfg(feature = "logging")]
930 debug!("Deleting table: {name}");
931 self.set_dirty(transaction);
932 self.inner_delete(name, TableType::Normal)
933 }
934
935 #[track_caller]
936 fn delete_multimap_table(
937 &mut self,
938 transaction: &WriteTransaction,
939 name: &str,
940 ) -> Result<bool, TableError> {
941 #[cfg(feature = "logging")]
942 debug!("Deleting multimap table: {name}");
943 self.set_dirty(transaction);
944 self.inner_delete(name, TableType::Multimap)
945 }
946
947 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
948 &mut self,
949 name: &str,
950 table: &BtreeMut<K, V>,
951 length: u64,
952 ) {
953 let _ = self.open_tables.remove(name);
956 self.table_tree
957 .stage_update_table_root(name, table.get_root(), length);
958 }
959}
960
961pub struct WriteTransaction {
965 transaction_tracker: Arc<TransactionTracker>,
966 mem: Arc<TransactionalMemory>,
967 transaction_guard: Arc<TransactionGuard>,
968 transaction_id: TransactionId,
969 tables: Mutex<TableNamespace<'static>>,
970 system_tables: Mutex<SystemNamespace<'static>>,
971 completed: bool,
972 dirty: AtomicBool,
973 durability: InternalDurability,
974 two_phase_commit: bool,
975 shrink_policy: ShrinkPolicy,
976 quick_repair: bool,
977 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
979 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
980 blob_writer_active: AtomicBool,
982 blob_dedup_config: BlobDedupConfig,
984 pub(crate) cdc_log: Option<Mutex<Vec<CdcEvent>>>,
986 cdc_config: CdcConfig,
987 history_retention: u64,
988 observer: Arc<dyn crate::observer::DatabaseObserver>,
989 #[cfg(feature = "metrics")]
990 db_metrics: Arc<crate::observer::DbMetrics>,
991}
992
993impl WriteTransaction {
994 #[allow(clippy::too_many_arguments)]
995 pub(crate) fn new(
996 guard: TransactionGuard,
997 transaction_tracker: Arc<TransactionTracker>,
998 mem: Arc<TransactionalMemory>,
999 blob_dedup_config: BlobDedupConfig,
1000 cdc_config: CdcConfig,
1001 history_retention: u64,
1002 observer: Arc<dyn crate::observer::DatabaseObserver>,
1003 #[cfg(feature = "metrics")] db_metrics: Arc<crate::observer::DbMetrics>,
1004 ) -> Result<Self> {
1005 let transaction_id = guard.id()?;
1006 let guard = Arc::new(guard);
1007
1008 let root_page = mem.get_data_root();
1009 let system_page = mem.get_system_root();
1010
1011 let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
1012 let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
1013
1014 Ok(Self {
1015 transaction_tracker,
1016 mem: mem.clone(),
1017 transaction_guard: guard.clone(),
1018 transaction_id,
1019 tables: Mutex::new(tables),
1020 system_tables: Mutex::new(system_tables),
1021 completed: false,
1022 dirty: AtomicBool::new(false),
1023 durability: InternalDurability::Immediate,
1024 two_phase_commit: false,
1025 quick_repair: false,
1026 shrink_policy: ShrinkPolicy::Default,
1027 created_persistent_savepoints: Mutex::new(Default::default()),
1028 deleted_persistent_savepoints: Mutex::new(vec![]),
1029 blob_writer_active: AtomicBool::new(false),
1030 blob_dedup_config,
1031 cdc_log: if cdc_config.enabled {
1032 Some(Mutex::new(Vec::new()))
1033 } else {
1034 None
1035 },
1036 cdc_config,
1037 history_retention,
1038 observer,
1039 #[cfg(feature = "metrics")]
1040 db_metrics,
1041 })
1042 }
1043
1044 pub(crate) fn record_cdc(&self, event: CdcEvent) {
1046 if let Some(ref log) = self.cdc_log {
1047 log.lock().push(event);
1048 }
1049 }
1050
1051 pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
1052 self.shrink_policy = shrink_policy;
1053 }
1054
1055 pub(crate) fn pending_free_pages(&self) -> Result<bool> {
1056 let mut system_tables = self.system_tables.lock();
1057 if system_tables
1058 .open_system_table(self, DATA_FREED_TABLE)?
1059 .tree
1060 .get_root()
1061 .is_some()
1062 {
1063 return Ok(true);
1064 }
1065 if system_tables
1066 .open_system_table(self, SYSTEM_FREED_TABLE)?
1067 .tree
1068 .get_root()
1069 .is_some()
1070 {
1071 return Ok(true);
1072 }
1073
1074 Ok(false)
1075 }
1076
1077 #[cfg(all(debug_assertions, feature = "std"))]
1078 pub fn print_allocated_page_debug(&self) {
1079 let mut all_allocated: HashSet<PageNumber> =
1080 HashSet::from_iter(self.mem.all_allocated_pages());
1081
1082 self.mem.debug_check_allocator_consistency();
1083
1084 let mut table_pages = vec![];
1085 self.tables
1086 .lock()
1087 .table_tree
1088 .visit_all_pages(|path| {
1089 table_pages.push(path.page_number());
1090 Ok(())
1091 })
1092 .unwrap();
1093 println!("Tables");
1094 for p in table_pages {
1095 assert!(all_allocated.remove(&p));
1096 println!("{p:?}");
1097 }
1098
1099 let mut system_table_pages = vec![];
1100 self.system_tables
1101 .lock()
1102 .table_tree
1103 .visit_all_pages(|path| {
1104 system_table_pages.push(path.page_number());
1105 Ok(())
1106 })
1107 .unwrap();
1108 println!("System tables");
1109 for p in system_table_pages {
1110 assert!(all_allocated.remove(&p));
1111 println!("{p:?}");
1112 }
1113
1114 {
1115 println!("Pending free (in data freed table)");
1116 let mut system_tables = self.system_tables.lock();
1117 let data_freed = system_tables
1118 .open_system_table(self, DATA_FREED_TABLE)
1119 .unwrap();
1120 for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
1121 let (_, entry) = entry.unwrap();
1122 let value = entry.value();
1123 for i in 0..value.len() {
1124 let p = value.get(i);
1125 assert!(all_allocated.remove(&p));
1126 println!("{p:?}");
1127 }
1128 }
1129 }
1130 {
1131 println!("Pending free (in system freed table)");
1132 let mut system_tables = self.system_tables.lock();
1133 let system_freed = system_tables
1134 .open_system_table(self, SYSTEM_FREED_TABLE)
1135 .unwrap();
1136 for entry in system_freed
1137 .range::<TransactionIdWithPagination>(..)
1138 .unwrap()
1139 {
1140 let (_, entry) = entry.unwrap();
1141 let value = entry.value();
1142 for i in 0..value.len() {
1143 let p = value.get(i);
1144 assert!(all_allocated.remove(&p));
1145 println!("{p:?}");
1146 }
1147 }
1148 }
1149 {
1150 let tables = self.tables.lock();
1151 let pages = tables.freed_pages.lock();
1152 if !pages.is_empty() {
1153 println!("Pages in in-memory data freed_pages");
1154 for p in pages.iter() {
1155 println!("{p:?}");
1156 assert!(all_allocated.remove(p));
1157 }
1158 }
1159 }
1160 {
1161 let system_tables = self.system_tables.lock();
1162 let pages = system_tables.freed_pages.lock();
1163 if !pages.is_empty() {
1164 println!("Pages in in-memory system freed_pages");
1165 for p in pages.iter() {
1166 println!("{p:?}");
1167 assert!(all_allocated.remove(p));
1168 }
1169 }
1170 }
1171 if !all_allocated.is_empty() {
1172 println!("Leaked pages");
1173 for p in all_allocated {
1174 println!("{p:?}");
1175 }
1176 }
1177 }
1178
1179 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
1188 if self.durability != InternalDurability::Immediate {
1189 return Err(SavepointError::InvalidSavepoint);
1190 }
1191
1192 let mut savepoint = self.ephemeral_savepoint()?;
1193
1194 let mut system_tables = self.system_tables.lock();
1195
1196 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1197 next_table.insert((), savepoint.get_id().next()?)?;
1198 drop(next_table);
1199
1200 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1201 savepoint_table.insert(
1202 savepoint.get_id(),
1203 SerializedSavepoint::from_savepoint(&savepoint),
1204 )?;
1205
1206 savepoint.set_persistent();
1207
1208 self.created_persistent_savepoints
1209 .lock()
1210 .insert(savepoint.get_id());
1211
1212 Ok(savepoint.get_id().0)
1213 }
1214
1215 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
1216 self.transaction_guard.clone()
1217 }
1218
1219 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
1220 let mut system_tables = self.system_tables.lock();
1221 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1222 let value = next_table.get(())?;
1223 if let Some(next_id) = value {
1224 Ok(Some(next_id.value()))
1225 } else {
1226 Ok(None)
1227 }
1228 }
1229
1230 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
1232 let mut system_tables = self.system_tables.lock();
1233 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1234 let value = table.get(SavepointId(id))?;
1235
1236 match value {
1237 Some(x) => Ok(x.value().to_savepoint(self.transaction_tracker.clone())?),
1238 None => Err(SavepointError::InvalidSavepoint),
1239 }
1240 }
1241
1242 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1249 if self.durability != InternalDurability::Immediate {
1250 return Err(SavepointError::InvalidSavepoint);
1251 }
1252 let mut system_tables = self.system_tables.lock();
1253 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1254 let savepoint = table.remove(SavepointId(id))?;
1255 if let Some(serialized) = savepoint {
1256 let savepoint = serialized
1257 .value()
1258 .to_savepoint(self.transaction_tracker.clone())?;
1259 self.deleted_persistent_savepoints
1260 .lock()
1261 .push((savepoint.get_id(), savepoint.get_transaction_id()));
1262 Ok(true)
1263 } else {
1264 Ok(false)
1265 }
1266 }
1267
1268 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1270 let mut system_tables = self.system_tables.lock();
1271 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1272 let mut savepoints = vec![];
1273 for savepoint in table.range::<SavepointId>(..)? {
1274 savepoints.push(savepoint?.0.value().0);
1275 }
1276 Ok(savepoints.into_iter())
1277 }
1278
1279 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1280 let id = self
1281 .transaction_tracker
1282 .register_read_transaction(&self.mem)?;
1283
1284 Ok(TransactionGuard::new_read(
1285 id,
1286 self.transaction_tracker.clone(),
1287 ))
1288 }
1289
1290 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1291 let transaction_id = self.allocate_read_transaction()?.leak()?;
1292 let id = self
1293 .transaction_tracker
1294 .allocate_savepoint(transaction_id)?;
1295 Ok((id, transaction_id))
1296 }
1297
1298 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1304 if self.dirty.load(Ordering::Acquire) {
1305 return Err(SavepointError::InvalidSavepoint);
1306 }
1307
1308 let (id, transaction_id) = self.allocate_savepoint()?;
1309 #[cfg(feature = "logging")]
1310 debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
1311
1312 let root = self.mem.get_data_root();
1313 let savepoint = Savepoint::new_ephemeral(
1314 &self.mem,
1315 self.transaction_tracker.clone(),
1316 id,
1317 transaction_id,
1318 root,
1319 );
1320
1321 Ok(savepoint)
1322 }
1323
1324 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1328 assert_eq!(
1330 core::ptr::from_ref(self.transaction_tracker.as_ref()),
1331 savepoint.db_address()
1332 );
1333
1334 if !self
1335 .transaction_tracker
1336 .is_valid_savepoint(savepoint.get_id())?
1337 {
1338 return Err(SavepointError::InvalidSavepoint);
1339 }
1340 #[cfg(feature = "logging")]
1341 debug!(
1342 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1343 savepoint.get_id(),
1344 self.transaction_id
1345 );
1346 assert_eq!(self.mem.get_version(), savepoint.get_version());
1349 self.dirty.store(true, Ordering::Release);
1350
1351 {
1361 self.tables.lock().set_root(savepoint.get_user_root())?;
1362 }
1363
1364 let txn_id = savepoint.get_transaction_id().next()?.raw_id();
1366 {
1367 let lower = TransactionIdWithPagination {
1368 transaction_id: txn_id,
1369 pagination_id: 0,
1370 };
1371 let mut system_tables = self.system_tables.lock();
1372 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1373 for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1374 entry?;
1375 }
1376 }
1378
1379 {
1381 let tables = self.tables.lock();
1382 let mut data_freed_pages = tables.freed_pages.lock();
1383 let mut system_tables = self.system_tables.lock();
1384 let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1385 let lower = TransactionIdWithPagination {
1386 transaction_id: txn_id,
1387 pagination_id: 0,
1388 };
1389 for entry in data_allocated.range(lower..)? {
1390 let (_, value) = entry?;
1391 for i in 0..value.value().len() {
1392 data_freed_pages.push(value.value().get(i));
1393 }
1394 }
1395 }
1396
1397 self.transaction_tracker
1400 .invalidate_savepoints_after(savepoint.get_id())?;
1401 for persistent_savepoint in self.list_persistent_savepoints()? {
1402 if persistent_savepoint > savepoint.get_id().0 {
1403 self.delete_persistent_savepoint(persistent_savepoint)?;
1404 }
1405 }
1406
1407 Ok(())
1408 }
1409
1410 pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
1416 let created = !self.created_persistent_savepoints.lock().is_empty();
1417 let deleted = !self.deleted_persistent_savepoints.lock().is_empty();
1418 if (created || deleted) && !matches!(durability, Durability::Immediate) {
1419 return Err(SetDurabilityError::PersistentSavepointModified);
1420 }
1421
1422 self.durability = match durability {
1423 Durability::None => InternalDurability::None,
1424 Durability::Immediate => InternalDurability::Immediate,
1425 };
1426
1427 Ok(())
1428 }
1429
1430 pub fn set_two_phase_commit(&mut self, enabled: bool) {
1470 self.two_phase_commit = enabled;
1471 }
1472
1473 pub fn set_quick_repair(&mut self, enabled: bool) {
1484 self.quick_repair = enabled;
1485 }
1486
1487 #[track_caller]
1491 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1492 &'txn self,
1493 definition: TableDefinition<K, V>,
1494 ) -> Result<Table<'txn, K, V>, TableError> {
1495 self.tables.lock().open_table(self, definition)
1496 }
1497
1498 #[cfg(feature = "std")]
1503 #[track_caller]
1504 pub fn open_ttl_table<K: Key + 'static, V: Value + 'static>(
1505 &self,
1506 definition: crate::ttl_table::TtlTableDefinition<K, V>,
1507 ) -> Result<crate::ttl_table::TtlTable<'_, K, V>, TableError> {
1508 let inner = self.open_table(definition.inner_def())?;
1509 Ok(crate::ttl_table::TtlTable::new(inner))
1510 }
1511
1512 #[track_caller]
1516 pub fn open_ivfpq_index(
1517 &self,
1518 definition: &crate::ivfpq::config::IvfPqIndexDefinition,
1519 ) -> Result<crate::ivfpq::index::IvfPqIndex<'_, Self>, TableError> {
1520 crate::ivfpq::index::IvfPqIndex::open(
1521 self,
1522 definition,
1523 Arc::clone(&self.observer),
1524 #[cfg(feature = "metrics")]
1525 Arc::clone(&self.db_metrics),
1526 )
1527 .map_err(TableError::Storage)
1528 }
1529
1530 #[track_caller]
1534 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1535 &'txn self,
1536 definition: MultimapTableDefinition<K, V>,
1537 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1538 self.tables.lock().open_multimap_table(self, definition)
1539 }
1540
1541 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1542 &self,
1543 name: &str,
1544 table: &BtreeMut<K, V>,
1545 length: u64,
1546 ) {
1547 self.tables.lock().close_table(name, table, length);
1548 }
1549
1550 pub fn rename_table(
1552 &self,
1553 definition: impl TableHandle,
1554 new_name: impl TableHandle,
1555 ) -> Result<(), TableError> {
1556 let name = definition.name().to_string();
1557 drop(definition);
1559 self.tables
1560 .lock()
1561 .rename_table(self, &name, new_name.name())
1562 }
1563
1564 pub fn rename_multimap_table(
1566 &self,
1567 definition: impl MultimapTableHandle,
1568 new_name: impl MultimapTableHandle,
1569 ) -> Result<(), TableError> {
1570 let name = definition.name().to_string();
1571 drop(definition);
1573 self.tables
1574 .lock()
1575 .rename_multimap_table(self, &name, new_name.name())
1576 }
1577
1578 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1582 let name = definition.name().to_string();
1583 drop(definition);
1585 self.tables.lock().delete_table(self, &name)
1586 }
1587
1588 pub fn delete_multimap_table(
1592 &self,
1593 definition: impl MultimapTableHandle,
1594 ) -> Result<bool, TableError> {
1595 let name = definition.name().to_string();
1596 drop(definition);
1598 self.tables.lock().delete_multimap_table(self, &name)
1599 }
1600
1601 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1603 self.tables
1604 .lock()
1605 .table_tree
1606 .list_tables(TableType::Normal)
1607 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1608 }
1609
1610 pub fn list_multimap_tables(
1612 &self,
1613 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1614 self.tables
1615 .lock()
1616 .table_tree
1617 .list_tables(TableType::Multimap)
1618 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1619 }
1620
1621 pub fn store_blob(
1633 &self,
1634 data: &[u8],
1635 content_type: ContentType,
1636 label: &str,
1637 opts: StoreOptions,
1638 ) -> Result<BlobId> {
1639 if self.blob_writer_active.load(Ordering::Acquire) {
1640 return Err(StorageError::BlobWriterActive);
1641 }
1642 let mut blob_state = self.mem.get_blob_state();
1644
1645 if blob_state.region_offset == 0 {
1647 let file_len = self.mem.file_len()?;
1648 blob_state.region_offset = file_len;
1649 }
1650
1651 let sequence = blob_state.next_sequence;
1653 blob_state.next_sequence = sequence + 1;
1654
1655 let prefix_len = data.len().min(4096);
1656 let content_prefix_hash = xxh3_hash64(&data[..prefix_len]);
1657 let blob_id = BlobId::new(sequence, content_prefix_hash);
1658
1659 let checksum = xxh3_hash128(data);
1661
1662 let dedup_eligible =
1664 self.blob_dedup_config.enabled && data.len() >= self.blob_dedup_config.min_size;
1665 let sha_key = if dedup_eligible {
1666 let hash: [u8; 32] = Sha256::digest(data).into();
1667 Some(Sha256Key(hash))
1668 } else {
1669 None
1670 };
1671
1672 let dedup_hit = if let Some(ref sha_key) = sha_key {
1673 let mut system_tables = self.system_tables.lock();
1674 let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1675 dedup_table.get(sha_key)?.map(|g| g.value())
1676 } else {
1677 None
1678 };
1679
1680 let mut dedup_saved = 0u64;
1681 let blob_ref = if let Some(existing) = dedup_hit
1682 && existing.checksum == checksum
1683 && existing.length == data.len() as u64
1684 {
1685 dedup_saved = existing.length;
1690 BlobRef {
1691 offset: existing.offset,
1692 length: existing.length,
1693 checksum,
1694 ref_count: 1,
1695 content_type: content_type.as_byte(),
1696 compression: 0,
1697 }
1698 } else {
1699 let blob_offset = blob_state.region_length;
1701 let file_offset = blob_state.region_offset + blob_offset;
1702 self.mem.blob_write(file_offset, data)?;
1703 blob_state.region_length += data.len() as u64;
1704
1705 BlobRef {
1706 offset: blob_offset,
1707 length: data.len() as u64,
1708 checksum,
1709 ref_count: 1,
1710 content_type: content_type.as_byte(),
1711 compression: 0,
1712 }
1713 };
1714
1715 let hlc = HybridLogicalClock::from_raw(blob_state.hlc_state).advance();
1717 blob_state.hlc_state = hlc.to_raw();
1718
1719 #[allow(clippy::cast_possible_truncation)]
1722 let wall_clock_ns = {
1723 #[cfg(feature = "std")]
1724 {
1725 std::time::SystemTime::now()
1728 .duration_since(std::time::UNIX_EPOCH)
1729 .unwrap_or_default()
1730 .as_nanos() as u64
1731 }
1732 #[cfg(not(feature = "std"))]
1733 {
1734 0u64
1736 }
1737 };
1738
1739 let causal_parent = opts.causal_link.as_ref().map(|l| l.parent);
1741 let meta = BlobMeta::new(blob_ref, wall_clock_ns, hlc.to_raw(), causal_parent, label);
1742
1743 {
1745 let mut system_tables = self.system_tables.lock();
1746
1747 let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
1748 blob_table.insert(&blob_id, &meta)?;
1749 drop(blob_table);
1750
1751 let temporal_key = TemporalKey::new(wall_clock_ns, hlc, blob_id);
1752 let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
1753 temporal_table.insert(&temporal_key, &())?;
1754 drop(temporal_table);
1755
1756 if let Some(link) = &opts.causal_link {
1757 let edge = CausalEdge::new(blob_id, link.relation, &link.context);
1758 let edge_key = CausalEdgeKey::new(link.parent, blob_id);
1759 let mut causal_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
1760 causal_table.insert(&edge_key, &edge)?;
1761 drop(causal_table);
1762 }
1763
1764 Self::index_tags_and_namespace(
1765 &mut system_tables,
1766 self,
1767 blob_id,
1768 &opts.tags,
1769 opts.namespace.as_deref(),
1770 )?;
1771
1772 if let Some(sha_key) = sha_key {
1774 if let Some(existing) = dedup_hit {
1775 let mut dedup_table =
1777 system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1778 let updated = DedupVal {
1779 ref_count: existing.ref_count + 1,
1780 ..existing
1781 };
1782 dedup_table.insert(&sha_key, &updated)?;
1783 drop(dedup_table);
1784 } else {
1785 let mut dedup_table =
1787 system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1788 let entry = DedupVal {
1789 offset: blob_ref.offset,
1790 length: blob_ref.length,
1791 checksum: blob_ref.checksum,
1792 ref_count: 1,
1793 };
1794 dedup_table.insert(&sha_key, &entry)?;
1795 drop(dedup_table);
1796 }
1797
1798 let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
1800 dedup_map.insert(&blob_id, &sha_key)?;
1801 drop(dedup_map);
1802 }
1803 }
1804
1805 self.mem.set_pending_blob_state(blob_state);
1807 self.dirty.store(true, Ordering::Release);
1808
1809 if dedup_saved > 0 {
1811 self.observer.on_blob_dedup(blob_id.sequence, dedup_saved);
1812 #[cfg(feature = "metrics")]
1813 self.db_metrics
1814 .blob_dedup_hits
1815 .fetch_add(1, portable_atomic::Ordering::Relaxed);
1816 } else {
1817 self.observer
1818 .on_blob_write(blob_id.sequence, blob_ref.length);
1819 #[cfg(feature = "metrics")]
1820 self.db_metrics
1821 .blob_writes
1822 .fetch_add(1, portable_atomic::Ordering::Relaxed);
1823 }
1824
1825 Ok(blob_id)
1826 }
1827
1828 pub fn blob_writer(
1835 &self,
1836 content_type: ContentType,
1837 label: &str,
1838 opts: StoreOptions,
1839 ) -> Result<BlobWriter<'_>> {
1840 if self
1841 .blob_writer_active
1842 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1843 .is_err()
1844 {
1845 return Err(StorageError::BlobWriterActive);
1846 }
1847
1848 let mut blob_state = self.mem.get_blob_state();
1849
1850 if blob_state.region_offset == 0 {
1851 let file_len = self.mem.file_len()?;
1852 blob_state.region_offset = file_len;
1853 }
1854
1855 let sequence = blob_state.next_sequence;
1856 blob_state.next_sequence = sequence + 1;
1857
1858 let blob_region_start = blob_state.region_length;
1859 let blob_file_offset = blob_state.region_offset + blob_region_start;
1860
1861 self.mem.set_pending_blob_state(blob_state);
1864
1865 Ok(BlobWriter::new(
1866 self,
1867 sequence,
1868 content_type,
1869 label,
1870 opts,
1871 blob_file_offset,
1872 blob_region_start,
1873 self.blob_dedup_config.enabled,
1874 ))
1875 }
1876
1877 pub(crate) fn blob_write_raw(&self, file_offset: u64, data: &[u8]) -> Result {
1880 self.mem.blob_write(file_offset, data)
1881 }
1882
1883 pub(crate) fn finalize_blob_writer(
1886 &self,
1887 blob_id: BlobId,
1888 mut meta: BlobMeta,
1889 bytes_written: u64,
1890 opts: StoreOptions,
1891 sha_key: Option<Sha256Key>,
1892 ) -> Result {
1893 let mut blob_state = self.mem.get_blob_state();
1894
1895 let hlc = HybridLogicalClock::from_raw(blob_state.hlc_state).advance();
1897 blob_state.hlc_state = hlc.to_raw();
1898
1899 meta.hlc = hlc.to_raw();
1901
1902 blob_state.region_length = meta.blob_ref.offset + bytes_written;
1904
1905 {
1907 let mut system_tables = self.system_tables.lock();
1908
1909 let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
1910 blob_table.insert(&blob_id, &meta)?;
1911 drop(blob_table);
1912
1913 let temporal_key = TemporalKey::new(meta.wall_clock_ns, hlc, blob_id);
1914 let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
1915 temporal_table.insert(&temporal_key, &())?;
1916 drop(temporal_table);
1917
1918 if let Some(link) = &opts.causal_link {
1919 let edge = CausalEdge::new(blob_id, link.relation, &link.context);
1920 let edge_key = CausalEdgeKey::new(link.parent, blob_id);
1921 let mut causal_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
1922 causal_table.insert(&edge_key, &edge)?;
1923 drop(causal_table);
1924 }
1925
1926 Self::index_tags_and_namespace(
1927 &mut system_tables,
1928 self,
1929 blob_id,
1930 &opts.tags,
1931 opts.namespace.as_deref(),
1932 )?;
1933
1934 if let Some(sha_key) = sha_key {
1936 let existing = {
1937 let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1938 dedup_table.get(&sha_key)?.map(|g| g.value())
1939 };
1940
1941 if let Some(existing) = existing {
1942 let mut dedup_table =
1944 system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1945 let updated = DedupVal {
1946 ref_count: existing.ref_count + 1,
1947 ..existing
1948 };
1949 dedup_table.insert(&sha_key, &updated)?;
1950 drop(dedup_table);
1951 } else {
1952 let mut dedup_table =
1954 system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
1955 let entry = DedupVal {
1956 offset: meta.blob_ref.offset,
1957 length: meta.blob_ref.length,
1958 checksum: meta.blob_ref.checksum,
1959 ref_count: 1,
1960 };
1961 dedup_table.insert(&sha_key, &entry)?;
1962 drop(dedup_table);
1963 }
1964
1965 let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
1966 dedup_map.insert(&blob_id, &sha_key)?;
1967 drop(dedup_map);
1968 }
1969 }
1970
1971 self.mem.set_pending_blob_state(blob_state);
1972 self.dirty.store(true, Ordering::Release);
1973
1974 Ok(())
1975 }
1976
1977 pub(crate) fn blob_writer_active(&self) -> &AtomicBool {
1979 &self.blob_writer_active
1980 }
1981
1982 fn index_tags_and_namespace(
1985 system_tables: &mut SystemNamespace<'_>,
1986 txn: &WriteTransaction,
1987 blob_id: BlobId,
1988 tags: &[String],
1989 namespace: Option<&str>,
1990 ) -> Result {
1991 let tag_count = tags.len().min(MAX_TAGS_PER_BLOB);
1992 if tag_count > 0 {
1993 let mut tag_table = system_tables.open_system_table(txn, BLOB_TAG_INDEX)?;
1994 for tag in &tags[..tag_count] {
1995 let tag_key = TagKey::new(tag, blob_id);
1996 tag_table.insert(&tag_key, &())?;
1997 }
1998 drop(tag_table);
1999 }
2000
2001 if let Some(ns) = namespace {
2002 let ns_val = NamespaceVal::new(ns);
2003 let mut ns_table = system_tables.open_system_table(txn, BLOB_NAMESPACE)?;
2004 ns_table.insert(&blob_id, &ns_val)?;
2005 drop(ns_table);
2006
2007 let ns_key = NamespaceKey::new(ns, blob_id);
2008 let mut ns_idx = system_tables.open_system_table(txn, BLOB_NAMESPACE_INDEX)?;
2009 ns_idx.insert(&ns_key, &())?;
2010 drop(ns_idx);
2011 }
2012
2013 Ok(())
2014 }
2015
2016 pub fn blob_tags(&self, blob_id: &BlobId) -> Result<Vec<String>> {
2018 let mut system_tables = self.system_tables.lock();
2019 let tag_table = system_tables.open_system_table(self, BLOB_TAG_INDEX)?;
2020
2021 let mut tags = Vec::new();
2022 let range = tag_table.range::<TagKey>(..)?;
2026 for entry in range {
2027 let (key_guard, _) = entry?;
2028 let key = key_guard.value();
2029 if key.blob_id == *blob_id {
2030 tags.push(key.tag_str().to_string());
2031 }
2032 }
2033 Ok(tags)
2034 }
2035
2036 pub fn blob_namespace(&self, blob_id: &BlobId) -> Result<Option<String>> {
2038 let mut system_tables = self.system_tables.lock();
2039 let ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
2040 match ns_table.get(blob_id)? {
2041 Some(g) => Ok(Some(g.value().namespace_str().to_string())),
2042 None => Ok(None),
2043 }
2044 }
2045
2046 pub fn get_blob(&self, blob_id: &BlobId) -> Result<Option<(Vec<u8>, BlobMeta)>> {
2050 let meta = {
2051 let mut system_tables = self.system_tables.lock();
2052 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2053 match blob_table.get(blob_id)? {
2054 Some(g) => g.value(),
2055 None => return Ok(None),
2056 }
2057 };
2058
2059 let blob_state = self.mem.get_blob_state();
2060 let file_offset = blob_state.region_offset + meta.blob_ref.offset;
2061 #[allow(clippy::cast_possible_truncation)]
2062 let data = self
2063 .mem
2064 .blob_read(file_offset, meta.blob_ref.length as usize)?;
2065
2066 let actual = xxh3_hash128(&data);
2067 if actual != meta.blob_ref.checksum {
2068 return Err(StorageError::BlobChecksumMismatch {
2069 sequence: blob_id.sequence,
2070 expected: meta.blob_ref.checksum,
2071 actual,
2072 });
2073 }
2074
2075 Ok(Some((data, meta)))
2076 }
2077
2078 pub fn get_blob_meta(&self, blob_id: &BlobId) -> Result<Option<BlobMeta>> {
2080 let mut system_tables = self.system_tables.lock();
2081 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2082
2083 let guard = blob_table.get(blob_id)?;
2084 Ok(guard.map(|g| g.value()))
2085 }
2086
2087 pub fn read_blob_range(
2093 &self,
2094 blob_id: &BlobId,
2095 offset: u64,
2096 length: u64,
2097 ) -> Result<Option<Vec<u8>>> {
2098 let meta = {
2099 let mut system_tables = self.system_tables.lock();
2100 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2101 match blob_table.get(blob_id)? {
2102 Some(g) => g.value(),
2103 None => return Ok(None),
2104 }
2105 };
2106
2107 if length == 0 {
2108 return Ok(Some(Vec::new()));
2109 }
2110
2111 let end = offset.saturating_add(length);
2112 if end > meta.blob_ref.length {
2113 return Err(StorageError::BlobRangeOutOfBounds {
2114 blob_length: meta.blob_ref.length,
2115 requested_offset: offset,
2116 requested_length: length,
2117 });
2118 }
2119
2120 let blob_state = self.mem.get_blob_state();
2121 let file_offset = blob_state.region_offset + meta.blob_ref.offset + offset;
2122 #[allow(clippy::cast_possible_truncation)]
2123 let data = self.mem.blob_read(file_offset, length as usize)?;
2124
2125 Ok(Some(data))
2126 }
2127
2128 pub fn blob_reader(&self, blob_id: &BlobId) -> Result<Option<BlobReader>> {
2136 let meta = {
2137 let mut system_tables = self.system_tables.lock();
2138 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2139 match blob_table.get(blob_id)? {
2140 Some(g) => g.value(),
2141 None => return Ok(None),
2142 }
2143 };
2144
2145 let blob_state = self.mem.get_blob_state();
2146 let file_offset = blob_state.region_offset + meta.blob_ref.offset;
2147
2148 Ok(Some(BlobReader::new(
2149 Arc::clone(&self.mem),
2150 file_offset,
2151 meta.blob_ref.length,
2152 )))
2153 }
2154
2155 pub fn delete_blob(&self, blob_id: &BlobId) -> Result<bool> {
2157 let mut system_tables = self.system_tables.lock();
2158
2159 let meta = {
2161 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2162 match blob_table.get(blob_id)? {
2163 Some(g) => g.value(),
2164 None => return Ok(false),
2165 }
2166 };
2167
2168 let temporal_key = TemporalKey::new(
2170 meta.wall_clock_ns,
2171 HybridLogicalClock::from_raw(meta.hlc),
2172 *blob_id,
2173 );
2174
2175 let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
2176 temporal_table.remove(&temporal_key)?;
2177 drop(temporal_table);
2178
2179 if let Some(parent) = meta.causal_parent {
2181 let mut edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
2182 edges_table.remove(&CausalEdgeKey::new(parent, *blob_id))?;
2183 drop(edges_table);
2184
2185 let mut legacy_table = system_tables.open_system_table(self, BLOB_CAUSAL_CHILDREN)?;
2187 let should_remove_legacy = legacy_table
2188 .get(&parent)?
2189 .is_some_and(|g| g.value() == *blob_id);
2190 if should_remove_legacy {
2191 legacy_table.remove(&parent)?;
2192 }
2193 drop(legacy_table);
2194 }
2195
2196 {
2200 let edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
2201 let start = CausalEdgeKey::new(*blob_id, BlobId::MIN);
2202 let end = CausalEdgeKey::new(*blob_id, BlobId::MAX);
2203 let mut outgoing = Vec::new();
2204 if let Ok(range) = edges_table.range(start..=end) {
2205 for entry in range {
2206 let (key_guard, _) = entry?;
2207 outgoing.push(key_guard.value());
2208 }
2209 }
2210 drop(edges_table);
2211
2212 if !outgoing.is_empty() {
2213 let mut edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
2214 for key in &outgoing {
2215 edges_table.remove(key)?;
2216 }
2217 drop(edges_table);
2218 }
2219 }
2220
2221 {
2223 let mut tag_table = system_tables.open_system_table(self, BLOB_TAG_INDEX)?;
2224 let mut to_remove = Vec::new();
2225 let range = tag_table.range::<TagKey>(..)?;
2226 for entry in range {
2227 let (key_guard, _) = entry?;
2228 let key = key_guard.value();
2229 if key.blob_id == *blob_id {
2230 to_remove.push(key);
2231 }
2232 }
2233 for key in &to_remove {
2234 tag_table.remove(key)?;
2235 }
2236 drop(tag_table);
2237 }
2238
2239 {
2241 let ns_key = {
2242 let ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
2243 match ns_table.get(blob_id)? {
2244 Some(ns_guard) => {
2245 let ns_str = ns_guard.value().namespace_str().to_string();
2246 Some(NamespaceKey::new(&ns_str, *blob_id))
2247 }
2248 None => None,
2249 }
2250 };
2251
2252 if let Some(ns_key) = ns_key {
2253 let mut ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
2254 ns_table.remove(blob_id)?;
2255 drop(ns_table);
2256
2257 let mut ns_idx = system_tables.open_system_table(self, BLOB_NAMESPACE_INDEX)?;
2258 ns_idx.remove(&ns_key)?;
2259 drop(ns_idx);
2260 }
2261 }
2262
2263 let sha_key = {
2265 let dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
2266 let result = dedup_map.get(blob_id)?.map(|g| g.value());
2267 drop(dedup_map);
2268 result
2269 };
2270
2271 if let Some(sha_key) = sha_key {
2272 let dedup_val = {
2274 let dedup_idx = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2275 let result = dedup_idx.get(&sha_key)?.map(|g| g.value());
2276 drop(dedup_idx);
2277 result
2278 };
2279
2280 if let Some(val) = dedup_val {
2281 let mut dedup_idx = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2282 if val.ref_count > 1 {
2283 let updated = DedupVal {
2284 offset: val.offset,
2285 length: val.length,
2286 checksum: val.checksum,
2287 ref_count: val.ref_count - 1,
2288 };
2289 dedup_idx.insert(&sha_key, &updated)?;
2290 } else {
2291 dedup_idx.remove(&sha_key)?;
2292 }
2293 drop(dedup_idx);
2294 }
2295
2296 let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
2297 dedup_map.remove(blob_id)?;
2298 drop(dedup_map);
2299 }
2300
2301 let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2303 blob_table.remove(blob_id)?;
2304 drop(blob_table);
2305
2306 self.dirty.store(true, Ordering::Release);
2307 Ok(true)
2308 }
2309
2310 pub fn blob_stats(&self) -> Result<BlobStats> {
2315 let blob_state = self.mem.get_blob_state();
2316 let region_bytes = blob_state.region_length;
2317
2318 if region_bytes == 0 {
2319 return Ok(BlobStats {
2320 blob_count: 0,
2321 live_bytes: 0,
2322 region_bytes: 0,
2323 dead_bytes: 0,
2324 fragmentation_ratio: 0.0,
2325 });
2326 }
2327
2328 let mut system_tables = self.system_tables.lock();
2329 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2330
2331 let mut blob_count: u64 = 0;
2332 let mut unique_offsets = crate::compat::HashSet::new();
2333 let mut live_bytes: u64 = 0;
2334
2335 let range = blob_table.range::<BlobId>(..)?;
2336 for entry in range {
2337 let (_, value_guard) = entry?;
2338 let meta = value_guard.value();
2339 blob_count += 1;
2340 if unique_offsets.insert(meta.blob_ref.offset) {
2343 live_bytes += meta.blob_ref.length;
2344 }
2345 }
2346 drop(blob_table);
2347
2348 let dead_bytes = region_bytes.saturating_sub(live_bytes);
2349 #[allow(clippy::cast_precision_loss)]
2350 let fragmentation_ratio = if region_bytes > 0 {
2351 dead_bytes as f64 / region_bytes as f64
2352 } else {
2353 0.0
2354 };
2355
2356 Ok(BlobStats {
2357 blob_count,
2358 live_bytes,
2359 region_bytes,
2360 dead_bytes,
2361 fragmentation_ratio,
2362 })
2363 }
2364
2365 pub(crate) fn compact_blobs_pass(&self, write_from_zero: bool) -> Result<(u64, u64)> {
2376 let mut blob_state = self.mem.get_blob_state();
2377 let region_offset = blob_state.region_offset;
2378 let old_region_length = blob_state.region_length;
2379
2380 if old_region_length == 0 {
2381 return Ok((0, 0));
2382 }
2383
2384 let mut live_blobs: Vec<(BlobId, BlobMeta)> = Vec::new();
2386 {
2387 let mut system_tables = self.system_tables.lock();
2388 let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2389 let range = blob_table.range::<BlobId>(..)?;
2390 for entry in range {
2391 let (key_guard, value_guard) = entry?;
2392 live_blobs.push((key_guard.value(), value_guard.value()));
2393 }
2394 drop(blob_table);
2395 }
2396
2397 if live_blobs.is_empty() {
2398 blob_state.region_length = 0;
2400 self.mem.set_pending_blob_state(blob_state);
2401 self.dirty.store(true, Ordering::Release);
2402 return Ok((0, 0));
2403 }
2404
2405 let mut unique_physical: Vec<(u64, u64)> = Vec::new(); {
2408 let mut seen = crate::compat::HashSet::new();
2409 for (_, meta) in &live_blobs {
2410 if seen.insert(meta.blob_ref.offset) {
2411 unique_physical.push((meta.blob_ref.offset, meta.blob_ref.length));
2412 }
2413 }
2414 }
2415 unique_physical.sort_by_key(|&(offset, _)| offset);
2416
2417 let write_base = if write_from_zero {
2419 0
2420 } else {
2421 old_region_length
2422 };
2423 let mut offset_map = crate::compat::HashMap::new();
2424 let mut write_cursor: u64 = 0;
2425
2426 for &(old_offset, length) in &unique_physical {
2427 let src_file_offset = region_offset + old_offset;
2428 let new_offset = write_base + write_cursor;
2429 let dst_file_offset = region_offset + new_offset;
2430
2431 #[allow(clippy::cast_possible_truncation)]
2432 let data = self.mem.blob_read(src_file_offset, length as usize)?;
2433 self.mem.blob_write(dst_file_offset, &data)?;
2434
2435 offset_map.insert(old_offset, new_offset);
2436 write_cursor += length;
2437 }
2438
2439 let total_live_size = write_cursor;
2440 let blobs_relocated = unique_physical.len() as u64;
2441
2442 {
2444 let mut system_tables = self.system_tables.lock();
2445 let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
2446 for (blob_id, meta) in &live_blobs {
2447 if let Some(&new_offset) = offset_map.get(&meta.blob_ref.offset)
2448 && new_offset != meta.blob_ref.offset
2449 {
2450 let mut updated_meta = meta.clone();
2451 updated_meta.blob_ref.offset = new_offset;
2452 blob_table.insert(blob_id, &updated_meta)?;
2453 }
2454 }
2455 drop(blob_table);
2456
2457 let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2459 let mut dedup_entries: Vec<(Sha256Key, DedupVal)> = Vec::new();
2460 let range = dedup_table.range::<Sha256Key>(..)?;
2461 for entry in range {
2462 let (key_guard, value_guard) = entry?;
2463 dedup_entries.push((key_guard.value(), value_guard.value()));
2464 }
2465 drop(dedup_table);
2466
2467 if !dedup_entries.is_empty() {
2468 let mut dedup_table_mut =
2469 system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
2470 for (sha_key, val) in &dedup_entries {
2471 if let Some(&new_offset) = offset_map.get(&val.offset)
2472 && new_offset != val.offset
2473 {
2474 let updated = DedupVal {
2475 offset: new_offset,
2476 length: val.length,
2477 checksum: val.checksum,
2478 ref_count: val.ref_count,
2479 };
2480 dedup_table_mut.insert(sha_key, &updated)?;
2481 }
2482 }
2483 drop(dedup_table_mut);
2484 }
2485 }
2486
2487 if write_from_zero {
2489 blob_state.region_length = total_live_size;
2490 } else {
2491 blob_state.region_length = old_region_length + total_live_size;
2492 }
2493 self.mem.set_pending_blob_state(blob_state);
2494 self.dirty.store(true, Ordering::Release);
2495
2496 Ok((blobs_relocated, total_live_size))
2497 }
2498
2499 pub fn commit(mut self) -> Result<(), CommitError> {
2504 self.completed = true;
2506 self.commit_inner()
2507 }
2508
2509 fn commit_inner(&mut self) -> Result<(), CommitError> {
2510 #[cfg(feature = "std")]
2511 let commit_start = std::time::Instant::now();
2512
2513 if self.quick_repair {
2515 self.two_phase_commit = true;
2516 }
2517
2518 if matches!(self.durability, InternalDurability::Immediate) {
2526 let free_until_transaction = self
2527 .transaction_tracker
2528 .oldest_live_read_transaction()?
2529 .map(|x| x.next())
2530 .transpose()?
2531 .unwrap_or(self.transaction_id);
2532 if let Err(err) = self.process_freed_pages(free_until_transaction) {
2533 self.tables.lock().table_tree.clear_root_updates_and_close();
2534 return Err(err.into());
2535 }
2536 }
2537
2538 let (user_root, allocated_pages, data_freed) =
2539 self.tables.lock().table_tree.flush_and_close()?;
2540
2541 let dirty_page_count = allocated_pages.len() as u64;
2542 self.store_data_freed_pages(data_freed)?;
2543 self.store_allocated_pages(allocated_pages.into_iter().collect())?;
2544 self.flush_cdc_log()?;
2545
2546 #[cfg(feature = "logging")]
2547 debug!(
2548 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
2549 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
2550 );
2551 match self.durability {
2552 InternalDurability::None => self.non_durable_commit(user_root)?,
2553 InternalDurability::Immediate => self.durable_commit(user_root)?,
2554 }
2555
2556 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().iter() {
2557 self.transaction_tracker
2558 .deallocate_savepoint(*savepoint, *transaction)?;
2559 }
2560
2561 debug_assert!(
2562 self.system_tables
2563 .lock()
2564 .system_freed_pages()
2565 .lock()
2566 .is_empty()
2567 );
2568 debug_assert!(self.tables.lock().freed_pages.lock().is_empty());
2569
2570 #[cfg(feature = "logging")]
2571 debug!(
2572 "Finished commit of transaction id={:?}",
2573 self.transaction_id
2574 );
2575
2576 let info = crate::observer::CommitInfo {
2577 transaction_id: self.transaction_id.raw_id(),
2578 dirty_page_count,
2579 two_phase: self.two_phase_commit,
2580 #[cfg(feature = "std")]
2581 commit_duration: commit_start.elapsed(),
2582 };
2583 self.observer.on_write_commit(&info);
2584 #[cfg(feature = "metrics")]
2585 self.db_metrics
2586 .write_txn_committed
2587 .fetch_add(1, portable_atomic::Ordering::Relaxed);
2588
2589 Ok(())
2590 }
2591
2592 fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
2593 let mut system_tables = self.system_tables.lock();
2594 let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
2595 let mut pagination_counter = 0;
2596 while !freed_pages.is_empty() {
2597 let chunk_size = 400;
2598 let buffer_size = PageList::required_bytes(chunk_size);
2599 let key = TransactionIdWithPagination {
2600 transaction_id: self.transaction_id.raw_id(),
2601 pagination_id: pagination_counter,
2602 };
2603 let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
2604
2605 let len = freed_pages.len();
2606 access_guard.as_mut().clear();
2607 for page in freed_pages.drain(len - min(len, chunk_size)..) {
2608 debug_assert!(
2610 self.mem.is_allocated(page),
2611 "Page is not allocated: {page:?}"
2612 );
2613 debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
2614 access_guard.as_mut().push_back(page);
2615 }
2616
2617 pagination_counter += 1;
2618 }
2619
2620 Ok(())
2621 }
2622
2623 fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
2624 let mut system_tables = self.system_tables.lock();
2625 let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
2626 let mut pagination_counter = 0;
2627 while !data_allocated_pages.is_empty() {
2628 let chunk_size = 400;
2629 let buffer_size = PageList::required_bytes(chunk_size);
2630 let key = TransactionIdWithPagination {
2631 transaction_id: self.transaction_id.raw_id(),
2632 pagination_id: pagination_counter,
2633 };
2634 let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
2635
2636 let len = data_allocated_pages.len();
2637 access_guard.as_mut().clear();
2638 for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
2639 debug_assert!(
2643 self.mem.is_allocated(page),
2644 "Page is not allocated: {page:?}"
2645 );
2646 debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
2647 access_guard.as_mut().push_back(page);
2648 }
2649
2650 pagination_counter += 1;
2651 }
2652
2653 let oldest = self
2655 .transaction_tracker
2656 .oldest_savepoint()?
2657 .map_or(u64::MAX, |(_, x)| x.raw_id());
2658 let key = TransactionIdWithPagination {
2659 transaction_id: oldest,
2660 pagination_id: 0,
2661 };
2662 for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
2663 entry?;
2664 }
2665
2666 Ok(())
2667 }
2668
2669 fn flush_cdc_log(&self) -> Result {
2670 let events = match self.cdc_log {
2671 Some(ref log) => {
2672 let mut guard = log.lock();
2673 if guard.is_empty() {
2674 return Ok(());
2675 }
2676 core::mem::take(&mut *guard)
2677 }
2678 None => return Ok(()),
2679 };
2680
2681 let txn_id = self.transaction_id.raw_id();
2682 let mut system_tables = self.system_tables.lock();
2683 let mut cdc_table = system_tables.open_system_table(self, CDC_LOG_TABLE)?;
2684
2685 for (seq, event) in events.iter().enumerate() {
2686 let key = CdcKey::new(txn_id, u32::try_from(seq).unwrap_or(u32::MAX));
2687 let record = CdcRecord::from_event(event)?;
2688 cdc_table.insert(&key, &record)?;
2689 }
2690
2691 if self.cdc_config.retention_max_txns > 0 && txn_id > self.cdc_config.retention_max_txns {
2696 let retention_cutoff = txn_id - self.cdc_config.retention_max_txns;
2697
2698 drop(cdc_table);
2701
2702 let oldest_cursor = {
2704 let cursor_table = system_tables.open_system_table(self, CDC_CURSOR_TABLE)?;
2705 let mut oldest: Option<u64> = None;
2706 for entry in cursor_table.range::<&str>(..)? {
2707 let (_, val_guard) = entry?;
2708 let pos = val_guard.value();
2709 oldest = Some(oldest.map_or(pos, |o: u64| o.min(pos)));
2710 }
2711 drop(cursor_table);
2712 oldest
2713 };
2714
2715 let effective_cutoff = match oldest_cursor {
2720 Some(cursor_pos) => retention_cutoff.min(cursor_pos),
2721 None => retention_cutoff,
2722 };
2723
2724 if effective_cutoff > 0 {
2725 let mut cdc_table = system_tables.open_system_table(self, CDC_LOG_TABLE)?;
2726 let end_key = CdcKey::new(effective_cutoff, u32::MAX);
2727 for entry in cdc_table.extract_from_if(..=end_key, |_, _| true)? {
2728 entry?;
2729 }
2730 }
2731 }
2732
2733 Ok(())
2734 }
2735
2736 pub fn advance_cdc_cursor(&self, name: &str, up_to_txn: u64) -> Result {
2742 let mut system_tables = self.system_tables.lock();
2743 let mut cursor_table = system_tables.open_system_table(self, CDC_CURSOR_TABLE)?;
2744 cursor_table.insert(name, &up_to_txn)?;
2745 Ok(())
2746 }
2747
2748 pub(crate) fn list_history_snapshot_ids(&self) -> Result<Vec<u64>> {
2749 let mut system_tables = self.system_tables.lock();
2750 let history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
2751 let mut ids = Vec::new();
2752 for entry in history_table.range::<u64>(..)? {
2753 let (key_guard, _) = entry?;
2754 ids.push(key_guard.value());
2755 }
2756 Ok(ids)
2757 }
2758
2759 pub(crate) fn purge_all_history_snapshots(&self) -> Result {
2760 let mut system_tables = self.system_tables.lock();
2761 let mut history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
2762 let keys: Vec<u64> = history_table
2763 .range::<u64>(..)?
2764 .map(|entry| entry.map(|(k, _)| k.value()))
2765 .collect::<Result<Vec<_>>>()?;
2766 for key in &keys {
2767 history_table.remove(key)?;
2768 }
2769 Ok(())
2770 }
2771
2772 pub fn abort(mut self) -> Result {
2776 self.completed = true;
2778 self.abort_inner()
2779 }
2780
2781 fn abort_inner(&mut self) -> Result {
2782 #[cfg(feature = "logging")]
2783 debug!("Aborting transaction id={:?}", self.transaction_id);
2784 self.tables.lock().table_tree.clear_root_updates_and_close();
2785 for savepoint in self.created_persistent_savepoints.lock().iter() {
2786 match self.delete_persistent_savepoint(savepoint.0) {
2787 Ok(_) => {}
2788 Err(err) => match err {
2789 SavepointError::InvalidSavepoint => {
2790 return Err(StorageError::Internal(
2791 "invalid savepoint encountered during transaction abort".to_string(),
2792 ));
2793 }
2794 SavepointError::Storage(storage_err) => {
2795 return Err(storage_err);
2796 }
2797 },
2798 }
2799 }
2800 self.mem.rollback_uncommitted_writes()?;
2801 #[cfg(feature = "logging")]
2802 debug!("Finished abort of transaction id={:?}", self.transaction_id);
2803
2804 self.observer.on_write_abort(self.transaction_id.raw_id());
2805 #[cfg(feature = "metrics")]
2806 self.db_metrics
2807 .write_txn_aborted
2808 .fetch_add(1, portable_atomic::Ordering::Relaxed);
2809
2810 Ok(())
2811 }
2812
2813 pub(crate) fn durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
2814 let mut system_tables = self.system_tables.lock();
2818
2819 if self.history_retention > 0 && !self.quick_repair {
2823 #[cfg(feature = "std")]
2824 #[allow(clippy::cast_possible_truncation)]
2825 let timestamp_ms = std::time::SystemTime::now()
2826 .duration_since(std::time::UNIX_EPOCH)
2827 .unwrap_or(std::time::Duration::ZERO)
2828 .as_millis() as u64;
2829 #[cfg(not(feature = "std"))]
2830 let timestamp_ms = 0u64;
2831
2832 let blob_state = self.mem.get_blob_state();
2833 let snapshot = HistorySnapshot::new(
2834 user_root,
2835 timestamp_ms,
2836 blob_state.region_offset,
2837 blob_state.region_length,
2838 blob_state.next_sequence,
2839 blob_state.hlc_state,
2840 );
2841 let mut history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
2842 history_table.insert(&self.transaction_id.raw_id(), &snapshot)?;
2843 self.transaction_tracker
2844 .register_history_hold(self.transaction_id)?;
2845
2846 let mut all_keys = Vec::new();
2848 for entry in history_table.range::<u64>(..)? {
2849 let (key_guard, _) = entry?;
2850 all_keys.push(key_guard.value());
2851 }
2852 let retention = usize::try_from(self.history_retention).unwrap_or(usize::MAX);
2853 if all_keys.len() > retention {
2854 let to_remove = all_keys.len() - retention;
2855 for key in &all_keys[..to_remove] {
2856 history_table.remove(key)?;
2857 self.transaction_tracker
2858 .deallocate_history_hold(TransactionId::new(*key))?;
2859 }
2860 }
2861 }
2862
2863 let system_freed_pages = system_tables.system_freed_pages();
2864 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
2865 system_tree
2866 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
2867 .map_err(|e| e.into_storage_error_or_internal("Unexpected TableError"))?;
2868
2869 if self.quick_repair {
2870 system_tree.create_table_and_flush_table_root(
2871 ALLOCATOR_STATE_TABLE_NAME,
2872 |system_tree_ref, tree: &mut AllocatorStateTreeMut| {
2873 let mut pagination_counter = 0;
2874
2875 loop {
2876 let num_regions = self
2877 .mem
2878 .reserve_allocator_state(tree, self.transaction_id)?;
2879
2880 self.store_system_freed_pages(
2884 system_tree_ref,
2885 system_freed_pages.clone(),
2886 None,
2887 &mut pagination_counter,
2888 )?;
2889
2890 if self.mem.try_save_allocator_state(tree, num_regions)? {
2891 return Ok(());
2892 }
2893
2894 while let Some(guards) = tree.last()? {
2899 let key = guards.0.value();
2900 drop(guards);
2901 tree.remove(&key)?;
2902 }
2903 }
2904 },
2905 )?;
2906 }
2907
2908 let system_root = system_tree.finalize_dirty_checksums()?;
2909
2910 self.mem.commit(
2911 user_root,
2912 system_root,
2913 self.transaction_id,
2914 self.two_phase_commit,
2915 self.shrink_policy,
2916 )?;
2917
2918 self.transaction_tracker
2920 .clear_pending_non_durable_commits()?;
2921
2922 for page in system_freed_pages.lock().drain(..) {
2925 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
2926 }
2927
2928 Ok(())
2929 }
2930
2931 pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
2933 let mut free_until_transaction = self
2934 .transaction_tracker
2935 .oldest_live_read_nondurable_transaction()?
2936 .map(|x| x.next())
2937 .transpose()?
2938 .unwrap_or(self.transaction_id);
2939 if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint()? {
2940 free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
2941 }
2942 self.process_freed_pages_nondurable(free_until_transaction)?;
2943
2944 let mut post_commit_frees = vec![];
2945
2946 let system_root = {
2947 let mut system_tables = self.system_tables.lock();
2948 let system_freed_pages = system_tables.system_freed_pages();
2949 system_tables.table_tree.flush_table_root_updates()?;
2950 for page in system_freed_pages
2951 .lock()
2952 .extract_if(.., |p| self.mem.unpersisted(*p))
2953 {
2954 post_commit_frees.push(page);
2955 }
2956 self.store_system_freed_pages(
2959 &mut system_tables.table_tree,
2960 system_freed_pages,
2961 Some(&mut post_commit_frees),
2962 &mut 0,
2963 )?;
2964
2965 system_tables
2966 .table_tree
2967 .flush_table_root_updates()?
2968 .finalize_dirty_checksums()?
2969 };
2970
2971 self.mem
2972 .non_durable_commit(user_root, system_root, self.transaction_id)?;
2973 self.transaction_tracker.register_non_durable_commit(
2976 self.transaction_id,
2977 self.mem.get_last_durable_transaction_id()?,
2978 )?;
2979
2980 for page in post_commit_frees {
2981 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
2982 }
2983
2984 Ok(())
2985 }
2986
2987 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
2990 let mut progress = false;
2991
2992 let mut highest_pages = BTreeMap::new();
2994 let mut tables = self.tables.lock();
2995 let table_tree = &mut tables.table_tree;
2996 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
2997 let mut system_tables = self.system_tables.lock();
2998 let system_table_tree = &mut system_tables.table_tree;
2999 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
3000
3001 let mut relocation_map = HashMap::new();
3003 for path in highest_pages.into_values().rev() {
3004 if relocation_map.contains_key(&path.page_number()) {
3005 continue;
3006 }
3007 let old_page = self.mem.get_page(path.page_number())?;
3008 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
3009 let new_page_number = new_page.get_page_number();
3010 new_page.memory_mut()?[0] = old_page.memory()[0];
3013 drop(new_page);
3014 if new_page_number < path.page_number() {
3016 relocation_map.insert(path.page_number(), new_page_number);
3017 for parent in path.parents() {
3018 if relocation_map.contains_key(parent) {
3019 continue;
3020 }
3021 let old_parent = self.mem.get_page(*parent)?;
3022 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
3023 let new_page_number = new_page.get_page_number();
3024 new_page.memory_mut()?[0] = old_parent.memory()[0];
3027 drop(new_page);
3028 relocation_map.insert(*parent, new_page_number);
3029 }
3030 } else {
3031 self.mem
3032 .free(new_page_number, &mut PageTrackerPolicy::Ignore);
3033 break;
3034 }
3035 }
3036
3037 if !relocation_map.is_empty() {
3038 progress = true;
3039 }
3040
3041 table_tree.relocate_tables(&relocation_map)?;
3042 system_table_tree.relocate_tables(&relocation_map)?;
3043
3044 Ok(progress)
3045 }
3046
3047 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
3050 assert_eq!(PageNumber::serialized_size(), 8);
3052
3053 let mut system_tables = self.system_tables.lock();
3055 {
3056 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
3057 let key = TransactionIdWithPagination {
3058 transaction_id: free_until.raw_id(),
3059 pagination_id: 0,
3060 };
3061 for entry in data_freed.extract_from_if(..key, |_, _| true)? {
3062 let (_, page_list) = entry?;
3063 for i in 0..page_list.value().len() {
3064 self.mem
3065 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
3066 }
3067 }
3068 }
3069
3070 {
3072 let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
3073 let key = TransactionIdWithPagination {
3074 transaction_id: free_until.raw_id(),
3075 pagination_id: 0,
3076 };
3077 for entry in system_freed.extract_from_if(..key, |_, _| true)? {
3078 let (_, page_list) = entry?;
3079 for i in 0..page_list.value().len() {
3080 self.mem
3081 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
3082 }
3083 }
3084 }
3085
3086 Ok(())
3087 }
3088
3089 fn process_freed_pages_nondurable_helper(
3090 &mut self,
3091 free_until: TransactionId,
3092 definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
3093 ) -> Result<Vec<TransactionId>> {
3094 let mut processed = vec![];
3095 let mut system_tables = self.system_tables.lock();
3096
3097 let last_key = TransactionIdWithPagination {
3098 transaction_id: free_until.raw_id(),
3099 pagination_id: 0,
3100 };
3101 let oldest_unprocessed = self
3102 .transaction_tracker
3103 .oldest_unprocessed_non_durable_commit()?
3104 .map_or(free_until.raw_id(), |x| x.raw_id());
3105 let first_key = TransactionIdWithPagination {
3106 transaction_id: oldest_unprocessed,
3107 pagination_id: 0,
3108 };
3109 let mut data_freed = system_tables.open_system_table(self, definition)?;
3110
3111 let mut candidate_transactions = vec![];
3112 for entry in data_freed.range(first_key..last_key)? {
3113 let (key, _) = entry?;
3114 let transaction_id = TransactionId::new(key.value().transaction_id);
3115 if self
3116 .transaction_tracker
3117 .is_unprocessed_non_durable_commit(transaction_id)?
3118 {
3119 candidate_transactions.push(transaction_id);
3120 }
3121 }
3122 for transaction_id in candidate_transactions {
3123 let mut key = TransactionIdWithPagination {
3124 transaction_id: transaction_id.raw_id(),
3125 pagination_id: 0,
3126 };
3127 loop {
3128 let Some(entry) = data_freed.get(&key)? else {
3129 break;
3130 };
3131 let pages = entry.value();
3132 let mut new_pages = vec![];
3133 for i in 0..pages.len() {
3134 let page = pages.get(i);
3135 if !self
3136 .mem
3137 .free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
3138 {
3139 new_pages.push(page);
3140 }
3141 }
3142 if new_pages.len() != pages.len() {
3143 drop(entry);
3144 if new_pages.is_empty() {
3145 data_freed.remove(&key)?;
3146 } else {
3147 let required = PageList::required_bytes(new_pages.len());
3148 let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
3149 for page in new_pages {
3150 page_list_mut.as_mut().push_back(page);
3151 }
3152 }
3153 }
3154 key.pagination_id += 1;
3155 }
3156 processed.push(transaction_id);
3157 }
3158
3159 Ok(processed)
3160 }
3161
3162 fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
3169 assert_eq!(PageNumber::serialized_size(), 8);
3171
3172 let mut processed =
3174 self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
3175
3176 processed
3178 .extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
3179
3180 for transaction_id in processed {
3181 self.transaction_tracker
3182 .mark_unprocessed_non_durable_commit(transaction_id)?;
3183 }
3184
3185 Ok(())
3186 }
3187
3188 fn store_system_freed_pages(
3189 &self,
3190 system_tree: &mut TableTreeMut,
3191 system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
3192 mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
3193 pagination_counter: &mut u64,
3194 ) -> Result {
3195 assert_eq!(PageNumber::serialized_size(), 8); system_tree.open_table_and_flush_table_root(
3198 SYSTEM_FREED_TABLE.name(),
3199 |system_freed_tree: &mut SystemFreedTree| {
3200 while !system_freed_pages.lock().is_empty() {
3201 let chunk_size = 200;
3202 let buffer_size = PageList::required_bytes(chunk_size);
3203 let key = TransactionIdWithPagination {
3204 transaction_id: self.transaction_id.raw_id(),
3205 pagination_id: *pagination_counter,
3206 };
3207 let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
3208
3209 let mut freed_pages = system_freed_pages.lock();
3210 let len = freed_pages.len();
3211 access_guard.as_mut().clear();
3212 for page in freed_pages.drain(len - min(len, chunk_size)..) {
3213 if let Some(ref mut unpersisted_pages) = unpersisted_pages
3214 && self.mem.unpersisted(page)
3215 {
3216 unpersisted_pages.push(page);
3217 } else {
3218 access_guard.as_mut().push_back(page);
3219 }
3220 }
3221 drop(access_guard);
3222
3223 *pagination_counter += 1;
3224 }
3225 Ok(())
3226 },
3227 )?;
3228
3229 Ok(())
3230 }
3231
3232 pub fn stats(&self) -> Result<DatabaseStats> {
3234 let tables = self.tables.lock();
3235 let table_tree = &tables.table_tree;
3236 let data_tree_stats = table_tree.stats()?;
3237
3238 let system_tables = self.system_tables.lock();
3239 let system_table_tree = &system_tables.table_tree;
3240 let system_tree_stats = system_table_tree.stats()?;
3241
3242 let total_metadata_bytes = data_tree_stats.metadata_bytes()
3243 + system_tree_stats.metadata_bytes
3244 + system_tree_stats.stored_leaf_bytes;
3245 let total_fragmented = data_tree_stats.fragmented_bytes()
3246 + system_tree_stats.fragmented_bytes
3247 + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
3248
3249 Ok(DatabaseStats {
3250 tree_height: data_tree_stats.tree_height(),
3251 allocated_pages: self.mem.count_allocated_pages()?,
3252 free_pages: self.mem.count_free_pages()?,
3253 trailing_free_pages: self.mem.trailing_free_pages()?,
3254 leaf_pages: data_tree_stats.leaf_pages(),
3255 branch_pages: data_tree_stats.branch_pages(),
3256 stored_leaf_bytes: data_tree_stats.stored_bytes(),
3257 metadata_bytes: total_metadata_bytes,
3258 fragmented_bytes: total_fragmented,
3259 page_size: self.mem.get_page_size(),
3260 })
3261 }
3262
3263 #[allow(dead_code)]
3264 #[cfg(feature = "std")]
3265 pub(crate) fn print_debug(&self) -> Result {
3266 let mut tables = self.tables.lock();
3268 if let Some(page) = tables
3269 .table_tree
3270 .flush_table_root_updates()?
3271 .finalize_dirty_checksums()?
3272 {
3273 eprintln!("Master tree:");
3274 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new_uncompressed(
3275 Some(page),
3276 PageHint::None,
3277 self.transaction_guard.clone(),
3278 self.mem.clone(),
3279 )?;
3280 master_tree.print_debug(true)?;
3281 }
3282
3283 let mut system_tables = self.system_tables.lock();
3285 if let Some(page) = system_tables
3286 .table_tree
3287 .flush_table_root_updates()?
3288 .finalize_dirty_checksums()?
3289 {
3290 eprintln!("System tree:");
3291 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new_uncompressed(
3292 Some(page),
3293 PageHint::None,
3294 self.transaction_guard.clone(),
3295 self.mem.clone(),
3296 )?;
3297 master_tree.print_debug(true)?;
3298 }
3299
3300 Ok(())
3301 }
3302}
3303
3304impl Drop for WriteTransaction {
3305 fn drop(&mut self) {
3306 let is_panicking = {
3307 #[cfg(feature = "std")]
3308 {
3309 std::thread::panicking()
3310 }
3311 #[cfg(not(feature = "std"))]
3312 {
3313 false
3314 }
3315 };
3316 if !self.completed && !is_panicking && !self.mem.storage_failure() {
3317 #[allow(unused_variables)]
3318 if let Err(error) = self.abort_inner() {
3319 #[cfg(feature = "logging")]
3320 warn!("Failure automatically aborting transaction: {error}");
3321 }
3322 } else if !self.completed && self.mem.storage_failure() {
3323 self.tables.lock().table_tree.clear_root_updates_and_close();
3324 }
3325 }
3326}
3327
3328pub struct ReadTransaction {
3332 mem: Arc<TransactionalMemory>,
3333 tree: TableTree,
3334 transaction_id: Option<u64>,
3335 observer: Arc<dyn crate::observer::DatabaseObserver>,
3336 #[cfg(feature = "metrics")]
3337 db_metrics: Arc<crate::observer::DbMetrics>,
3338}
3339
3340impl ReadTransaction {
3341 pub(crate) fn new(
3342 mem: Arc<TransactionalMemory>,
3343 guard: TransactionGuard,
3344 observer: Arc<dyn crate::observer::DatabaseObserver>,
3345 #[cfg(feature = "metrics")] db_metrics: Arc<crate::observer::DbMetrics>,
3346 ) -> Result<Self, TransactionError> {
3347 let root_page = mem.get_data_root();
3348 let txn_id = guard.id().ok().map(|id| id.raw_id());
3349 let guard = Arc::new(guard);
3350 Ok(Self {
3351 mem: mem.clone(),
3352 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
3353 .map_err(TransactionError::Storage)?,
3354 transaction_id: txn_id,
3355 observer,
3356 #[cfg(feature = "metrics")]
3357 db_metrics,
3358 })
3359 }
3360
3361 pub(crate) fn new_historical(
3363 mem: Arc<TransactionalMemory>,
3364 guard: TransactionGuard,
3365 user_root: Option<BtreeHeader>,
3366 observer: Arc<dyn crate::observer::DatabaseObserver>,
3367 #[cfg(feature = "metrics")] db_metrics: Arc<crate::observer::DbMetrics>,
3368 ) -> Result<Self, TransactionError> {
3369 let txn_id = guard.id().ok().map(|id| id.raw_id());
3370 let guard = Arc::new(guard);
3371 Ok(Self {
3372 mem: mem.clone(),
3373 tree: TableTree::new(user_root, PageHint::Clean, guard, mem)
3374 .map_err(TransactionError::Storage)?,
3375 transaction_id: txn_id,
3376 observer,
3377 #[cfg(feature = "metrics")]
3378 db_metrics,
3379 })
3380 }
3381
3382 #[cfg(feature = "std")]
3383 pub(crate) fn table_tree(&self) -> &TableTree {
3384 &self.tree
3385 }
3386
3387 pub fn open_table<K: Key + 'static, V: Value + 'static>(
3389 &self,
3390 definition: TableDefinition<K, V>,
3391 ) -> Result<ReadOnlyTable<K, V>, TableError> {
3392 let header = self
3393 .tree
3394 .get_table::<K, V>(definition.name(), TableType::Normal)?
3395 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
3396
3397 match header {
3398 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
3399 definition.name().to_string(),
3400 table_root,
3401 PageHint::Clean,
3402 self.tree.transaction_guard().clone(),
3403 self.mem.clone(),
3404 )?),
3405 InternalTableDefinition::Multimap { .. } => {
3406 Err(TableError::Storage(StorageError::Internal(
3407 "unexpected multimap table type when opening normal table".to_string(),
3408 )))
3409 }
3410 }
3411 }
3412
3413 #[cfg(feature = "std")]
3417 pub fn open_ttl_table<K: Key + 'static, V: Value + 'static>(
3418 &self,
3419 definition: crate::ttl_table::TtlTableDefinition<K, V>,
3420 ) -> Result<crate::ttl_table::ReadOnlyTtlTable<K, V>, TableError> {
3421 let inner = self.open_table(definition.inner_def())?;
3422 Ok(crate::ttl_table::ReadOnlyTtlTable::new(inner))
3423 }
3424
3425 pub fn open_ivfpq_index(
3429 &self,
3430 definition: &crate::ivfpq::config::IvfPqIndexDefinition,
3431 ) -> Result<crate::ivfpq::index::ReadOnlyIvfPqIndex, TableError> {
3432 crate::ivfpq::index::ReadOnlyIvfPqIndex::open(
3433 self,
3434 definition,
3435 #[cfg(feature = "metrics")]
3436 Arc::clone(&self.db_metrics),
3437 )
3438 .map_err(TableError::Storage)
3439 }
3440
3441 pub fn open_untyped_table(
3443 &self,
3444 handle: impl TableHandle,
3445 ) -> Result<ReadOnlyUntypedTable, TableError> {
3446 let header = self
3447 .tree
3448 .get_table_untyped(handle.name(), TableType::Normal)?
3449 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
3450
3451 match header {
3452 InternalTableDefinition::Normal {
3453 table_root,
3454 fixed_key_size,
3455 fixed_value_size,
3456 ..
3457 } => Ok(ReadOnlyUntypedTable::new(
3458 table_root,
3459 fixed_key_size,
3460 fixed_value_size,
3461 self.mem.clone(),
3462 )),
3463 InternalTableDefinition::Multimap { .. } => {
3464 Err(TableError::Storage(StorageError::Internal(
3465 "unexpected multimap table type when opening untyped normal table".to_string(),
3466 )))
3467 }
3468 }
3469 }
3470
3471 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
3473 &self,
3474 definition: MultimapTableDefinition<K, V>,
3475 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
3476 let header = self
3477 .tree
3478 .get_table::<K, V>(definition.name(), TableType::Multimap)?
3479 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
3480
3481 match header {
3482 InternalTableDefinition::Normal { .. } => {
3483 Err(TableError::Storage(StorageError::Internal(
3484 "unexpected normal table type when opening multimap table".to_string(),
3485 )))
3486 }
3487 InternalTableDefinition::Multimap {
3488 table_root,
3489 table_length,
3490 ..
3491 } => Ok(ReadOnlyMultimapTable::new(
3492 table_root,
3493 table_length,
3494 PageHint::Clean,
3495 self.tree.transaction_guard().clone(),
3496 self.mem.clone(),
3497 )?),
3498 }
3499 }
3500
3501 pub fn open_untyped_multimap_table(
3503 &self,
3504 handle: impl MultimapTableHandle,
3505 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
3506 let header = self
3507 .tree
3508 .get_table_untyped(handle.name(), TableType::Multimap)?
3509 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
3510
3511 match header {
3512 InternalTableDefinition::Normal { .. } => {
3513 Err(TableError::Storage(StorageError::Internal(
3514 "unexpected normal table type when opening untyped multimap table".to_string(),
3515 )))
3516 }
3517 InternalTableDefinition::Multimap {
3518 table_root,
3519 table_length,
3520 fixed_key_size,
3521 fixed_value_size,
3522 ..
3523 } => Ok(ReadOnlyUntypedMultimapTable::new(
3524 table_root,
3525 table_length,
3526 fixed_key_size,
3527 fixed_value_size,
3528 self.mem.clone(),
3529 )),
3530 }
3531 }
3532
3533 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
3535 self.tree
3536 .list_tables(TableType::Normal)
3537 .map(|x| x.into_iter().map(UntypedTableHandle::new))
3538 }
3539
3540 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
3542 self.tree
3543 .list_tables(TableType::Multimap)
3544 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
3545 }
3546
3547 fn open_system_btree<K: Key + 'static, V: Value + 'static>(
3551 &self,
3552 definition: SystemTableDefinition<K, V>,
3553 ) -> Result<Option<Btree<K, V>>> {
3554 let system_root = self.mem.get_system_root();
3555 let system_tree = TableTree::new(
3556 system_root,
3557 PageHint::Clean,
3558 self.tree.transaction_guard().clone(),
3559 self.mem.clone(),
3560 )?;
3561
3562 let header = system_tree.get_table::<K, V>(definition.name(), TableType::Normal);
3563 match header {
3564 Ok(Some(InternalTableDefinition::Normal { table_root, .. })) => {
3565 let btree = Btree::new_uncompressed(
3566 table_root,
3567 PageHint::Clean,
3568 self.tree.transaction_guard().clone(),
3569 self.mem.clone(),
3570 )?;
3571 Ok(Some(btree))
3572 }
3573 Ok(Some(InternalTableDefinition::Multimap { .. })) => Err(StorageError::Internal(
3574 "unexpected multimap table type in system table lookup".to_string(),
3575 )),
3576 Ok(None) => Ok(None),
3577 Err(e) => {
3578 Err(e.into_storage_error_or_internal("Internal error: blob system table corrupted"))
3579 }
3580 }
3581 }
3582
3583 pub(crate) fn get_history_snapshot_ro(
3585 &self,
3586 transaction_id: u64,
3587 ) -> Result<Option<HistorySnapshot>> {
3588 let Some(btree) = self.open_system_btree(HISTORY_TABLE)? else {
3589 return Ok(None);
3590 };
3591 Ok(btree.get(&transaction_id)?.map(|guard| guard.value()))
3592 }
3593
3594 pub(crate) fn list_history_snapshot_ids_ro(&self) -> Result<Vec<u64>> {
3596 let Some(btree) = self.open_system_btree(HISTORY_TABLE)? else {
3597 return Ok(Vec::new());
3598 };
3599 let mut ids = Vec::new();
3600 for entry in btree.range::<core::ops::RangeFull, u64>(&(..))? {
3601 let guard = entry?;
3602 ids.push(guard.key());
3603 }
3604 Ok(ids)
3605 }
3606
3607 pub fn get_blob(&self, blob_id: &BlobId) -> Result<Option<(Vec<u8>, BlobMeta)>> {
3611 let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3612 return Ok(None);
3613 };
3614
3615 let Some(guard) = btree.get(blob_id)? else {
3616 return Ok(None);
3617 };
3618 let meta = guard.value();
3619
3620 let blob_state = self.mem.get_committed_blob_state();
3621 let file_offset = blob_state.region_offset + meta.blob_ref.offset;
3622 #[allow(clippy::cast_possible_truncation)]
3623 let data = self
3624 .mem
3625 .blob_read(file_offset, meta.blob_ref.length as usize)?;
3626
3627 let actual = xxh3_hash128(&data);
3628 if actual != meta.blob_ref.checksum {
3629 return Err(StorageError::BlobChecksumMismatch {
3630 sequence: blob_id.sequence,
3631 expected: meta.blob_ref.checksum,
3632 actual,
3633 });
3634 }
3635
3636 Ok(Some((data, meta)))
3637 }
3638
3639 pub fn get_blob_meta(&self, blob_id: &BlobId) -> Result<Option<BlobMeta>> {
3641 let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3642 return Ok(None);
3643 };
3644
3645 Ok(btree.get(blob_id)?.map(|g| g.value()))
3646 }
3647
3648 pub fn blob_by_sequence(&self, seq: u64) -> Result<Option<(BlobId, BlobMeta)>> {
3657 let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3658 return Ok(None);
3659 };
3660
3661 let start = BlobId::new(seq, 0);
3662 let end = BlobId::new(seq, u64::MAX);
3663 let mut range = btree.range::<core::ops::RangeInclusive<BlobId>, BlobId>(&(start..=end))?;
3664 match range.next() {
3665 Some(entry) => {
3666 let entry = entry?;
3667 Ok(Some((entry.key(), entry.value())))
3668 }
3669 None => Ok(None),
3670 }
3671 }
3672
3673 pub fn composite_query(&self) -> crate::composite::CompositeQuery<'_> {
3679 crate::composite::CompositeQuery::new(self)
3680 }
3681
3682 pub fn read_blob_range(
3688 &self,
3689 blob_id: &BlobId,
3690 offset: u64,
3691 length: u64,
3692 ) -> Result<Option<Vec<u8>>> {
3693 let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3694 return Ok(None);
3695 };
3696
3697 let Some(guard) = btree.get(blob_id)? else {
3698 return Ok(None);
3699 };
3700 let meta = guard.value();
3701
3702 if length == 0 {
3703 return Ok(Some(Vec::new()));
3704 }
3705
3706 let end = offset.saturating_add(length);
3707 if end > meta.blob_ref.length {
3708 return Err(StorageError::BlobRangeOutOfBounds {
3709 blob_length: meta.blob_ref.length,
3710 requested_offset: offset,
3711 requested_length: length,
3712 });
3713 }
3714
3715 let blob_state = self.mem.get_committed_blob_state();
3716 let file_offset = blob_state.region_offset + meta.blob_ref.offset + offset;
3717 #[allow(clippy::cast_possible_truncation)]
3718 let data = self.mem.blob_read(file_offset, length as usize)?;
3719
3720 Ok(Some(data))
3721 }
3722
3723 pub fn blob_reader(&self, blob_id: &BlobId) -> Result<Option<BlobReader>> {
3731 let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3732 return Ok(None);
3733 };
3734
3735 let Some(guard) = btree.get(blob_id)? else {
3736 return Ok(None);
3737 };
3738 let meta = guard.value();
3739
3740 let blob_state = self.mem.get_committed_blob_state();
3741 let file_offset = blob_state.region_offset + meta.blob_ref.offset;
3742
3743 Ok(Some(BlobReader::new(
3744 Arc::clone(&self.mem),
3745 file_offset,
3746 meta.blob_ref.length,
3747 )))
3748 }
3749
3750 pub fn dedup_stats(&self) -> Result<DedupStats> {
3756 let Some(btree) = self.open_system_btree(BLOB_DEDUP_INDEX)? else {
3757 return Ok(DedupStats {
3758 total_dedup_entries: 0,
3759 total_ref_count: 0,
3760 bytes_saved: 0,
3761 });
3762 };
3763
3764 let mut total_dedup_entries: u64 = 0;
3765 let mut total_ref_count: u64 = 0;
3766 let mut bytes_saved: u64 = 0;
3767
3768 let range = btree.range::<RangeFull, Sha256Key>(&(..))?;
3769 for entry in range {
3770 let entry = entry?;
3771 let val = entry.value();
3772 total_dedup_entries += 1;
3773 total_ref_count += u64::from(val.ref_count);
3774 if val.ref_count > 1 {
3776 bytes_saved += val.length * u64::from(val.ref_count - 1);
3777 }
3778 }
3779
3780 Ok(DedupStats {
3781 total_dedup_entries,
3782 total_ref_count,
3783 bytes_saved,
3784 })
3785 }
3786
3787 pub fn blob_stats(&self) -> Result<BlobStats> {
3792 let blob_state = self.mem.get_committed_blob_state();
3793 let region_bytes = blob_state.region_length;
3794
3795 if region_bytes == 0 {
3796 return Ok(BlobStats {
3797 blob_count: 0,
3798 live_bytes: 0,
3799 region_bytes: 0,
3800 dead_bytes: 0,
3801 fragmentation_ratio: 0.0,
3802 });
3803 }
3804
3805 let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
3806 return Ok(BlobStats {
3807 blob_count: 0,
3808 live_bytes: 0,
3809 region_bytes,
3810 dead_bytes: region_bytes,
3811 fragmentation_ratio: 1.0,
3812 });
3813 };
3814
3815 let mut blob_count: u64 = 0;
3816 let mut unique_offsets = crate::compat::HashSet::new();
3817 let mut live_bytes: u64 = 0;
3818
3819 let range = btree.range::<RangeFull, BlobId>(&(..))?;
3820 for entry in range {
3821 let entry = entry?;
3822 let meta = entry.value();
3823 blob_count += 1;
3824 if unique_offsets.insert(meta.blob_ref.offset) {
3825 live_bytes += meta.blob_ref.length;
3826 }
3827 }
3828
3829 let dead_bytes = region_bytes.saturating_sub(live_bytes);
3830 #[allow(clippy::cast_precision_loss)]
3831 let fragmentation_ratio = if region_bytes > 0 {
3832 dead_bytes as f64 / region_bytes as f64
3833 } else {
3834 0.0
3835 };
3836
3837 Ok(BlobStats {
3838 blob_count,
3839 live_bytes,
3840 region_bytes,
3841 dead_bytes,
3842 fragmentation_ratio,
3843 })
3844 }
3845
3846 pub fn blobs_in_time_range(
3852 &self,
3853 start_ns: u64,
3854 end_ns: u64,
3855 ) -> Result<Vec<(TemporalKey, BlobMeta)>> {
3856 if start_ns > end_ns {
3857 return Ok(Vec::new());
3858 }
3859 let Some(temporal_btree) = self.open_system_btree(BLOB_TEMPORAL_INDEX)? else {
3860 return Ok(Vec::new());
3861 };
3862 let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
3863 return Ok(Vec::new());
3864 };
3865
3866 let start = TemporalKey::range_start(start_ns);
3867 let end = TemporalKey::range_end(end_ns);
3868 let range = temporal_btree.range(&(start..=end))?;
3869
3870 let mut results = Vec::new();
3871 for entry in range {
3872 let entry = entry?;
3873 let temporal_key = entry.key();
3874 if let Some(meta_guard) = blob_btree.get(&temporal_key.blob_id)? {
3875 results.push((temporal_key, meta_guard.value()));
3876 }
3877 }
3878 Ok(results)
3879 }
3880
3881 pub fn blobs_near(
3886 &self,
3887 reference: &BlobId,
3888 window_ns: u64,
3889 ) -> Result<Vec<(TemporalKey, BlobMeta)>> {
3890 let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
3891 return Ok(Vec::new());
3892 };
3893
3894 let Some(guard) = blob_btree.get(reference)? else {
3895 return Ok(Vec::new());
3896 };
3897 let ref_meta = guard.value();
3898
3899 let half_window = window_ns / 2;
3900 let start_ns = ref_meta.wall_clock_ns.saturating_sub(half_window);
3901 let end_ns = ref_meta.wall_clock_ns.saturating_add(half_window);
3902
3903 self.blobs_in_time_range(start_ns, end_ns)
3904 }
3905
3906 pub fn causal_chain(
3913 &self,
3914 blob_id: &BlobId,
3915 max_hops: usize,
3916 ) -> Result<Vec<(BlobId, BlobMeta, Option<CausalEdge>)>> {
3917 let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
3918 return Ok(Vec::new());
3919 };
3920
3921 let edges_btree = self.open_system_btree(BLOB_CAUSAL_EDGES)?;
3922 let legacy_btree = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?;
3923
3924 let mut chain = Vec::new();
3925 let mut current = *blob_id;
3926
3927 for _ in 0..=max_hops {
3928 let Some(g) = blob_btree.get(¤t)? else {
3929 break;
3930 };
3931 let meta = g.value();
3932 let parent = meta.causal_parent;
3933
3934 let edge = if let Some(parent_id) = parent {
3936 Self::lookup_causal_edge(
3937 &parent_id,
3938 ¤t,
3939 edges_btree.as_ref(),
3940 legacy_btree.as_ref(),
3941 )?
3942 } else {
3943 None
3944 };
3945
3946 chain.push((current, meta, edge));
3947 match parent {
3948 Some(p) => current = p,
3949 None => break,
3950 }
3951 }
3952
3953 Ok(chain)
3954 }
3955
3956 pub fn causal_children(&self, blob_id: &BlobId) -> Result<Vec<CausalEdge>> {
3961 if let Some(edges_btree) = self.open_system_btree(BLOB_CAUSAL_EDGES)? {
3963 let start_key = CausalEdgeKey::new(*blob_id, BlobId::MIN);
3964 let end_key = CausalEdgeKey::new(*blob_id, BlobId::MAX);
3965 let mut children = Vec::new();
3966 let range = edges_btree.range(&(start_key..=end_key))?;
3967 for entry in range {
3968 let entry = entry?;
3969 children.push(entry.value());
3970 }
3971 if !children.is_empty() {
3972 return Ok(children);
3973 }
3974 }
3975
3976 if let Some(legacy_btree) = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?
3978 && let Some(g) = legacy_btree.get(blob_id)?
3979 {
3980 return Ok(vec![CausalEdge::legacy(g.value())]);
3981 }
3982
3983 Ok(Vec::new())
3984 }
3985
3986 pub fn causal_path(
3994 &self,
3995 from: &BlobId,
3996 to: &BlobId,
3997 max_depth: usize,
3998 ) -> Result<Option<CausalPath>> {
3999 if from == to {
4000 return Ok(Some(vec![(*from, None)]));
4001 }
4002
4003 let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
4004 return Ok(None);
4005 };
4006
4007 let edges_btree = self.open_system_btree(BLOB_CAUSAL_EDGES)?;
4008 let legacy_btree = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?;
4009
4010 let mut path: CausalPath = Vec::new();
4012 let mut current = *to;
4013 for _ in 0..max_depth {
4015 let Some(g) = blob_btree.get(¤t)? else {
4016 break;
4017 };
4018 let meta = g.value();
4019 let Some(parent) = meta.causal_parent else {
4020 break;
4021 };
4022
4023 let edge = Self::lookup_causal_edge(
4025 &parent,
4026 ¤t,
4027 edges_btree.as_ref(),
4028 legacy_btree.as_ref(),
4029 )?;
4030 path.push((current, edge));
4031
4032 if parent == *from {
4033 path.push((*from, None));
4034 path.reverse();
4035 return Ok(Some(path));
4036 }
4037 current = parent;
4038 }
4039
4040 Ok(None)
4041 }
4042
4043 fn lookup_causal_edge(
4045 parent: &BlobId,
4046 child: &BlobId,
4047 edges_btree: Option<&Btree<CausalEdgeKey, CausalEdge>>,
4048 legacy_btree: Option<&Btree<BlobId, BlobId>>,
4049 ) -> Result<Option<CausalEdge>> {
4050 if let Some(bt) = edges_btree {
4051 let key = CausalEdgeKey::new(*parent, *child);
4052 if let Some(g) = bt.get(&key)? {
4053 return Ok(Some(g.value()));
4054 }
4055 }
4056 if let Some(bt) = legacy_btree
4057 && let Some(g) = bt.get(parent)?
4058 {
4059 return Ok(Some(CausalEdge::legacy(g.value())));
4060 }
4061 Ok(None)
4062 }
4063
4064 pub fn blobs_by_tag(&self, tag: &str) -> Result<Vec<BlobId>> {
4069 let Some(tag_btree) = self.open_system_btree(BLOB_TAG_INDEX)? else {
4070 return Ok(Vec::new());
4071 };
4072
4073 let start = TagKey::range_start(tag);
4074 let end = TagKey::range_end(tag);
4075 let range = tag_btree.range(&(start..=end))?;
4076
4077 let mut results = Vec::new();
4078 for entry in range {
4079 let entry = entry?;
4080 results.push(entry.key().blob_id);
4081 }
4082 Ok(results)
4083 }
4084
4085 pub fn blobs_in_namespace(&self, namespace: &str) -> Result<Vec<(BlobId, BlobMeta)>> {
4089 let Some(ns_idx_btree) = self.open_system_btree(BLOB_NAMESPACE_INDEX)? else {
4090 return Ok(Vec::new());
4091 };
4092 let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
4093 return Ok(Vec::new());
4094 };
4095
4096 let start = NamespaceKey::range_start(namespace);
4097 let end = NamespaceKey::range_end(namespace);
4098 let range = ns_idx_btree.range(&(start..=end))?;
4099
4100 let mut results = Vec::new();
4101 for entry in range {
4102 let entry = entry?;
4103 let blob_id = entry.key().blob_id;
4104 if let Some(meta_guard) = blob_btree.get(&blob_id)? {
4105 results.push((blob_id, meta_guard.value()));
4106 }
4107 }
4108 Ok(results)
4109 }
4110
4111 pub fn blobs_in_time_range_ns(
4116 &self,
4117 start_ns: u64,
4118 end_ns: u64,
4119 namespace: Option<&str>,
4120 ) -> Result<Vec<(TemporalKey, BlobMeta)>> {
4121 let results = self.blobs_in_time_range(start_ns, end_ns)?;
4122 let Some(ns) = namespace else {
4123 return Ok(results);
4124 };
4125
4126 let Some(ns_btree) = self.open_system_btree(BLOB_NAMESPACE)? else {
4127 return Ok(Vec::new());
4128 };
4129
4130 let mut filtered = Vec::new();
4131 for (tk, meta) in results {
4132 if let Some(ns_guard) = ns_btree.get(&tk.blob_id)?
4133 && ns_guard.value().namespace_str() == ns
4134 {
4135 filtered.push((tk, meta));
4136 }
4137 }
4138 Ok(filtered)
4139 }
4140
4141 pub fn blob_tags(&self, blob_id: &BlobId) -> Result<Vec<String>> {
4143 let Some(tag_btree) = self.open_system_btree(BLOB_TAG_INDEX)? else {
4144 return Ok(Vec::new());
4145 };
4146
4147 let start = TagKey::new("", BlobId::MIN);
4151 let end = TagKey {
4152 tag: [0xFF; 32],
4153 tag_len: 32,
4154 blob_id: BlobId::MAX,
4155 };
4156 let range = tag_btree.range(&(start..=end))?;
4157 let mut tags = Vec::new();
4158 for entry in range {
4159 let entry = entry?;
4160 let key = entry.key();
4161 if key.blob_id == *blob_id {
4162 tags.push(key.tag_str().to_string());
4163 }
4164 }
4165 Ok(tags)
4166 }
4167
4168 pub fn blob_namespace(&self, blob_id: &BlobId) -> Result<Option<String>> {
4170 let Some(ns_btree) = self.open_system_btree(BLOB_NAMESPACE)? else {
4171 return Ok(None);
4172 };
4173
4174 match ns_btree.get(blob_id)? {
4175 Some(g) => Ok(Some(g.value().namespace_str().to_string())),
4176 None => Ok(None),
4177 }
4178 }
4179
4180 pub fn read_cdc_since(&self, after_txn_id: u64) -> Result<Vec<ChangeStream>> {
4186 let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
4187 return Ok(Vec::new());
4188 };
4189
4190 if after_txn_id > 0 {
4194 let mut all_range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
4195 if let Some(first) = all_range.next() {
4196 let first = first?;
4197 let oldest_txn = first.key().transaction_id;
4198 if oldest_txn > after_txn_id.saturating_add(1) {
4199 return Err(StorageError::CdcCursorBehindRetention {
4200 cursor_txn_id: after_txn_id,
4201 oldest_retained_txn_id: oldest_txn,
4202 });
4203 }
4204 }
4205 }
4206
4207 let start = CdcKey::new(after_txn_id.saturating_add(1), 0);
4208 let end = CdcKey::new(u64::MAX, u32::MAX);
4209 let range = btree.range::<core::ops::RangeInclusive<CdcKey>, CdcKey>(&(start..=end))?;
4210
4211 let mut results = Vec::new();
4212 for entry in range {
4213 let entry = entry?;
4214 let record = CdcRecord::deserialize(entry.value_data())?;
4215 results.push(ChangeStream::from_key_record(entry.key(), record));
4216 }
4217 Ok(results)
4218 }
4219
4220 pub fn read_cdc_range(&self, start_txn: u64, end_txn: u64) -> Result<Vec<ChangeStream>> {
4225 if start_txn > end_txn {
4226 return Ok(Vec::new());
4227 }
4228 let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
4229 return Ok(Vec::new());
4230 };
4231
4232 if start_txn > 0 {
4234 let mut all_range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
4235 if let Some(first) = all_range.next() {
4236 let first = first?;
4237 let oldest_txn = first.key().transaction_id;
4238 if oldest_txn > start_txn {
4239 return Err(StorageError::CdcCursorBehindRetention {
4240 cursor_txn_id: start_txn,
4241 oldest_retained_txn_id: oldest_txn,
4242 });
4243 }
4244 }
4245 }
4246
4247 let start = CdcKey::new(start_txn, 0);
4248 let end = CdcKey::new(end_txn, u32::MAX);
4249 let range = btree.range::<core::ops::RangeInclusive<CdcKey>, CdcKey>(&(start..=end))?;
4250
4251 let mut results = Vec::new();
4252 for entry in range {
4253 let entry = entry?;
4254 let record = CdcRecord::deserialize(entry.value_data())?;
4255 results.push(ChangeStream::from_key_record(entry.key(), record));
4256 }
4257 Ok(results)
4258 }
4259
4260 pub fn cdc_cursor(&self, name: &str) -> Result<Option<u64>> {
4265 let Some(btree) = self.open_system_btree(CDC_CURSOR_TABLE)? else {
4266 return Ok(None);
4267 };
4268 match btree.get(&name)? {
4269 Some(guard) => Ok(Some(guard.value())),
4270 None => Ok(None),
4271 }
4272 }
4273
4274 pub fn latest_cdc_transaction_id(&self) -> Result<Option<u64>> {
4276 let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
4277 return Ok(None);
4278 };
4279 let mut range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
4280 match range.next_back() {
4281 Some(entry) => {
4282 let entry = entry?;
4283 Ok(Some(entry.key().transaction_id))
4284 }
4285 None => Ok(None),
4286 }
4287 }
4288
4289 pub fn close(self) -> Result<(), TransactionError> {
4297 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
4298 return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
4299 }
4300 Ok(())
4302 }
4303}
4304
4305impl Drop for ReadTransaction {
4306 fn drop(&mut self) {
4307 if let Some(id) = self.transaction_id {
4308 self.observer.on_read_end(id);
4309 #[cfg(feature = "metrics")]
4310 self.db_metrics
4311 .read_txn_closed
4312 .fetch_add(1, portable_atomic::Ordering::Relaxed);
4313 }
4314 }
4315}
4316
4317impl Debug for ReadTransaction {
4318 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
4319 f.write_str("ReadTransaction")
4320 }
4321}
4322
4323impl crate::storage_traits::StorageWrite for WriteTransaction {
4328 type Table<'txn, K: Key + 'static, V: Value + 'static>
4329 = Table<'txn, K, V>
4330 where
4331 Self: 'txn;
4332
4333 fn open_storage_table<K: Key + 'static, V: Value + 'static>(
4334 &self,
4335 definition: TableDefinition<K, V>,
4336 ) -> Result<Self::Table<'_, K, V>> {
4337 self.open_table(definition)
4338 .map_err(|e| e.into_storage_error_or_internal("open_storage_table"))
4339 }
4340}
4341
4342impl crate::storage_traits::StorageRead for ReadTransaction {
4347 type Table<'txn, K: Key + 'static, V: Value + 'static>
4348 = ReadOnlyTable<K, V>
4349 where
4350 Self: 'txn;
4351
4352 fn open_storage_table<K: Key + 'static, V: Value + 'static>(
4353 &self,
4354 definition: TableDefinition<K, V>,
4355 ) -> Result<Self::Table<'_, K, V>> {
4356 self.open_table(definition)
4357 .map_err(|e| e.into_storage_error_or_internal("open_storage_table"))
4358 }
4359}
4360
4361#[cfg(test)]
4362mod test {
4363 use crate::{Database, TableDefinition};
4364
4365 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
4366
4367 #[test]
4368 fn transaction_id_persistence() {
4369 let tmpfile = crate::create_tempfile();
4370 let db = Database::create(tmpfile.path()).unwrap();
4371 let write_txn = db.begin_write().unwrap();
4372 {
4373 let mut table = write_txn.open_table(X).unwrap();
4374 table.insert("hello", "world").unwrap();
4375 }
4376 let first_txn_id = write_txn.transaction_id;
4377 write_txn.commit().unwrap();
4378 drop(db);
4379
4380 let db2 = Database::create(tmpfile.path()).unwrap();
4381 let write_txn = db2.begin_write().unwrap();
4382 assert!(write_txn.transaction_id > first_txn_id);
4383 }
4384}