use crate::blob_store::reader::BlobReader;
use crate::blob_store::types::{
BlobDedupConfig, BlobId, BlobMeta, BlobRef, BlobStats, CausalEdge, CausalEdgeKey, CausalPath,
ContentType, DedupStats, DedupVal, MAX_TAGS_PER_BLOB, NamespaceKey, NamespaceVal, Sha256Key,
StoreOptions, TagKey, TemporalKey,
};
use crate::blob_store::writer::BlobWriter;
use crate::cdc::CdcConfig;
use crate::cdc::types::{CdcEvent, CdcKey, CdcRecord, ChangeStream};
use crate::compat::{HashMap, HashSet, Mutex};
use crate::db::TransactionGuard;
use crate::error::CommitError;
use crate::multimap_table::ReadOnlyUntypedMultimapTable;
use crate::sealed::Sealed;
use crate::table::ReadOnlyUntypedTable;
use crate::temporal::HybridLogicalClock;
use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
use crate::tree_store::{
Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
TableTree, TableTreeMut, TableType, TransactionalMemory, hash64_with_seed, hash128_with_seed,
};
use crate::types::{Key, Value};
use crate::{
AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
UntypedTableHandle,
};
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use core::borrow::Borrow;
use core::cmp::min;
use core::fmt::{Debug, Display, Formatter};
use core::marker::PhantomData;
use core::mem::size_of;
use core::ops::{RangeBounds, RangeFull};
use core::panic;
use core::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "logging")]
use log::{debug, warn};
use sha2::{Digest, Sha256};
const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
fn xxh3_hash64(data: &[u8]) -> u64 {
hash64_with_seed(data, 0)
}
fn xxh3_hash128(data: &[u8]) -> u128 {
hash128_with_seed(data, 0)
}
const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
SystemTableDefinition::new("next_savepoint_id");
pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
SystemTableDefinition::new("persistent_savepoints");
pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
TransactionIdWithPagination,
PageList,
> = SystemTableDefinition::new("data_pages_allocated");
pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
SystemTableDefinition::new("data_pages_unreachable");
pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
SystemTableDefinition::new("system_pages_unreachable");
const BLOB_TABLE: SystemTableDefinition<BlobId, BlobMeta> =
SystemTableDefinition::new("blob_store");
const BLOB_TEMPORAL_INDEX: SystemTableDefinition<TemporalKey, ()> =
SystemTableDefinition::new("blob_temporal_idx");
const BLOB_CAUSAL_CHILDREN: SystemTableDefinition<BlobId, BlobId> =
SystemTableDefinition::new("blob_causal_children");
const BLOB_CAUSAL_EDGES: SystemTableDefinition<CausalEdgeKey, CausalEdge> =
SystemTableDefinition::new("blob_causal_edges_v2");
const BLOB_TAG_INDEX: SystemTableDefinition<TagKey, ()> =
SystemTableDefinition::new("blob_tag_idx");
const BLOB_NAMESPACE: SystemTableDefinition<BlobId, NamespaceVal> =
SystemTableDefinition::new("blob_namespace");
const BLOB_NAMESPACE_INDEX: SystemTableDefinition<NamespaceKey, ()> =
SystemTableDefinition::new("blob_namespace_idx");
const BLOB_DEDUP_INDEX: SystemTableDefinition<Sha256Key, DedupVal> =
SystemTableDefinition::new("blob_dedup_idx");
const BLOB_DEDUP_MAP: SystemTableDefinition<BlobId, Sha256Key> =
SystemTableDefinition::new("blob_dedup_map");
const CDC_LOG_TABLE: SystemTableDefinition<CdcKey, CdcRecord> =
SystemTableDefinition::new("cdc_log");
const CDC_CURSOR_TABLE: SystemTableDefinition<&str, u64> =
SystemTableDefinition::new("cdc_cursors");
const HISTORY_TABLE: SystemTableDefinition<u64, HistorySnapshot> =
SystemTableDefinition::new("transaction_history");
#[derive(Debug, Clone)]
pub(crate) struct HistorySnapshot {
data: [u8; Self::SIZE],
}
impl HistorySnapshot {
const SIZE: usize = 1 + BtreeHeader::serialized_size() + 5 * size_of::<u64>();
const USER_ROOT_FLAG: usize = 0;
const USER_ROOT: usize = 1;
const TIMESTAMP: usize = 1 + BtreeHeader::serialized_size();
const BLOB_OFFSET: usize = Self::TIMESTAMP + size_of::<u64>();
const BLOB_LENGTH: usize = Self::BLOB_OFFSET + size_of::<u64>();
const BLOB_SEQUENCE: usize = Self::BLOB_LENGTH + size_of::<u64>();
const BLOB_HLC: usize = Self::BLOB_SEQUENCE + size_of::<u64>();
const USER_ROOT_END: usize = Self::USER_ROOT + BtreeHeader::serialized_size();
pub(crate) fn new(
user_root: Option<BtreeHeader>,
timestamp_ms: u64,
blob_region_offset: u64,
blob_region_length: u64,
blob_next_sequence: u64,
blob_hlc_state: u64,
) -> Self {
let mut data = [0u8; Self::SIZE];
if let Some(root) = user_root {
data[Self::USER_ROOT_FLAG] = 1;
data[Self::USER_ROOT..Self::USER_ROOT_END].copy_from_slice(&root.to_le_bytes());
}
data[Self::TIMESTAMP..Self::TIMESTAMP + 8].copy_from_slice(×tamp_ms.to_le_bytes());
data[Self::BLOB_OFFSET..Self::BLOB_OFFSET + 8]
.copy_from_slice(&blob_region_offset.to_le_bytes());
data[Self::BLOB_LENGTH..Self::BLOB_LENGTH + 8]
.copy_from_slice(&blob_region_length.to_le_bytes());
data[Self::BLOB_SEQUENCE..Self::BLOB_SEQUENCE + 8]
.copy_from_slice(&blob_next_sequence.to_le_bytes());
data[Self::BLOB_HLC..Self::BLOB_HLC + 8].copy_from_slice(&blob_hlc_state.to_le_bytes());
Self { data }
}
pub(crate) fn user_root(&self) -> Option<BtreeHeader> {
if self.data[Self::USER_ROOT_FLAG] != 0 {
self.data[Self::USER_ROOT..Self::USER_ROOT_END]
.try_into()
.ok()
.map(BtreeHeader::from_le_bytes)
} else {
None
}
}
pub(crate) fn timestamp_ms(&self) -> u64 {
self.data[Self::TIMESTAMP..Self::TIMESTAMP + 8]
.try_into()
.map(u64::from_le_bytes)
.unwrap_or(0)
}
}
impl Value for HistorySnapshot {
type SelfType<'a>
= HistorySnapshot
where
Self: 'a;
type AsBytes<'a>
= [u8; HistorySnapshot::SIZE]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(Self::SIZE)
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
let mut buf = [0u8; Self::SIZE];
buf.copy_from_slice(&data[..Self::SIZE]);
Self { data: buf }
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'b,
{
value.data
}
fn type_name() -> TypeName {
TypeName::internal("redb::HistorySnapshot")
}
}
pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
#[derive(Debug)]
pub(crate) struct PageList<'a> {
data: &'a [u8],
}
impl PageList<'_> {
fn required_bytes(len: usize) -> usize {
2 + PageNumber::serialized_size() * len
}
pub(crate) fn len(&self) -> usize {
self.data[..size_of::<u16>()]
.try_into()
.map(|b| u16::from_le_bytes(b) as usize)
.unwrap_or(0)
}
pub(crate) fn get(&self, index: usize) -> PageNumber {
let start = size_of::<u16>() + PageNumber::serialized_size() * index;
self.data[start..(start + PageNumber::serialized_size())]
.try_into()
.map(PageNumber::from_le_bytes)
.unwrap_or(PageNumber::new(0, 0, 0))
}
}
impl Value for PageList<'_> {
type SelfType<'a>
= PageList<'a>
where
Self: 'a;
type AsBytes<'a>
= &'a [u8]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
None
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
PageList { data }
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
where
Self: 'b,
{
value.data
}
fn type_name() -> TypeName {
TypeName::internal("redb::PageList")
}
}
impl MutInPlaceValue for PageList<'_> {
type BaseRefType = PageListMut;
fn initialize(data: &mut [u8]) {
debug_assert!(data.len() >= 8);
if data.len() >= 8 {
data[..8].fill(0);
}
}
fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
unsafe { &mut *(core::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
}
}
#[derive(Debug)]
pub(crate) struct TransactionIdWithPagination {
pub(crate) transaction_id: u64,
pub(crate) pagination_id: u64,
}
impl Value for TransactionIdWithPagination {
type SelfType<'a>
= TransactionIdWithPagination
where
Self: 'a;
type AsBytes<'a>
= [u8; 2 * size_of::<u64>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(2 * size_of::<u64>())
}
#[allow(clippy::big_endian_bytes)]
fn from_bytes<'a>(data: &'a [u8]) -> Self
where
Self: 'a,
{
let transaction_id = data[..size_of::<u64>()]
.try_into()
.map(u64::from_be_bytes)
.unwrap_or(0);
let pagination_id = data[size_of::<u64>()..]
.try_into()
.map(u64::from_be_bytes)
.unwrap_or(0);
Self {
transaction_id,
pagination_id,
}
}
#[allow(clippy::big_endian_bytes)]
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
where
Self: 'b,
{
let mut result = [0u8; 2 * size_of::<u64>()];
result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_be_bytes());
result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_be_bytes());
result
}
fn type_name() -> TypeName {
TypeName::internal("redb::TransactionIdWithPagination")
}
}
impl Key for TransactionIdWithPagination {
fn compare(data1: &[u8], data2: &[u8]) -> core::cmp::Ordering {
let len = (2 * size_of::<u64>()).min(data1.len()).min(data2.len());
data1[..len]
.cmp(&data2[..len])
.then_with(|| data1.len().cmp(&data2.len()))
}
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
pub(crate) enum AllocatorStateKey {
Deprecated,
Region(u32),
RegionTracker,
TransactionId,
}
impl Value for AllocatorStateKey {
type SelfType<'a> = Self;
type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
fn fixed_width() -> Option<usize> {
Some(1 + size_of::<u32>())
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
match data[0] {
3 => Self::Region(data[1..].try_into().map(u32::from_le_bytes).unwrap_or(0)),
4 => Self::RegionTracker,
5 => Self::TransactionId,
_ => Self::Deprecated,
}
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
let mut result = Self::AsBytes::default();
match value {
Self::Region(region) => {
result[0] = 3;
result[1..].copy_from_slice(&u32::to_le_bytes(*region));
}
Self::RegionTracker => {
result[0] = 4;
}
Self::TransactionId => {
result[0] = 5;
}
AllocatorStateKey::Deprecated => {
result[0] = 0;
}
}
result
}
fn type_name() -> TypeName {
TypeName::internal("redb::AllocatorStateKey")
}
}
impl Key for AllocatorStateKey {
fn compare(data1: &[u8], data2: &[u8]) -> core::cmp::Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
fn name(&self) -> &str {
self.name
}
}
impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
#[derive(Debug, Clone, Copy)]
pub struct DatabaseStats {
pub(crate) tree_height: u32,
pub(crate) allocated_pages: u64,
pub(crate) free_pages: u64,
pub(crate) leaf_pages: u64,
pub(crate) branch_pages: u64,
pub(crate) stored_leaf_bytes: u64,
pub(crate) metadata_bytes: u64,
pub(crate) fragmented_bytes: u64,
pub(crate) page_size: usize,
}
impl DatabaseStats {
pub fn tree_height(&self) -> u32 {
self.tree_height
}
pub fn allocated_pages(&self) -> u64 {
self.allocated_pages
}
pub fn free_pages(&self) -> u64 {
self.free_pages
}
pub fn leaf_pages(&self) -> u64 {
self.leaf_pages
}
pub fn branch_pages(&self) -> u64 {
self.branch_pages
}
pub fn stored_bytes(&self) -> u64 {
self.stored_leaf_bytes
}
pub fn metadata_bytes(&self) -> u64 {
self.metadata_bytes
}
pub fn fragmented_bytes(&self) -> u64 {
self.fragmented_bytes
}
pub fn page_size(&self) -> usize {
self.page_size
}
}
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum Durability {
None,
Immediate,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum InternalDurability {
None,
Immediate,
}
pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
name: String,
namespace: &'s mut SystemNamespace<'db>,
tree: BtreeMut<'s, K, V>,
transaction_guard: Arc<TransactionGuard>,
}
impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
fn new(
name: &str,
table_root: Option<BtreeHeader>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
namespace: &'s mut SystemNamespace<'db>,
) -> SystemTable<'db, 's, K, V> {
let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
SystemTable {
name: name.to_string(),
namespace,
tree: BtreeMut::new_uncompressed(table_root, guard.clone(), mem, freed_pages, ignore),
transaction_guard: guard,
}
}
fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
where
K: 'a,
{
self.tree.get(key.borrow())
}
fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree
.range(&range)
.map(|x| Range::new(x, self.transaction_guard.clone()))
}
pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
range: impl RangeBounds<KR> + 'a,
predicate: F,
) -> Result<ExtractIf<'_, K, V, F>>
where
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree
.extract_from_if(&range, predicate)
.map(ExtractIf::new)
}
pub fn insert<'k, 'v>(
&mut self,
key: impl Borrow<K::SelfType<'k>>,
value: impl Borrow<V::SelfType<'v>>,
) -> Result<Option<AccessGuard<'_, V>>> {
let value_len = V::as_bytes(value.borrow()).as_ref().len();
if value_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(value_len));
}
let key_len = K::as_bytes(key.borrow()).as_ref().len();
if key_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(key_len));
}
if value_len + key_len > MAX_PAIR_LENGTH {
return Err(StorageError::ValueTooLarge(value_len + key_len));
}
self.tree.insert(key.borrow(), value.borrow())
}
pub fn remove<'a>(
&mut self,
key: impl Borrow<K::SelfType<'a>>,
) -> Result<Option<AccessGuard<'_, V>>>
where
K: 'a,
{
self.tree.remove(key.borrow())
}
}
impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
pub fn insert_reserve<'a>(
&mut self,
key: impl Borrow<K::SelfType<'a>>,
value_length: usize,
) -> Result<AccessGuardMutInPlace<'_, V>> {
if value_length > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(value_length));
}
let key_len = K::as_bytes(key.borrow()).as_ref().len();
if key_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(key_len));
}
if value_length + key_len > MAX_PAIR_LENGTH {
return Err(StorageError::ValueTooLarge(value_length + key_len));
}
self.tree.insert_reserve(key.borrow(), value_length)
}
}
impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
fn drop(&mut self) {
self.namespace.close_table(
&self.name,
&self.tree,
self.tree.get_root().map(|x| x.length).unwrap_or_default(),
);
}
}
struct SystemNamespace<'db> {
table_tree: TableTreeMut<'db>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
transaction_guard: Arc<TransactionGuard>,
}
impl<'db> SystemNamespace<'db> {
fn new(
root_page: Option<BtreeHeader>,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
) -> Self {
let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
let freed_pages = Arc::new(Mutex::new(vec![]));
Self {
table_tree: TableTreeMut::new(
root_page,
guard.clone(),
mem,
freed_pages.clone(),
ignore,
),
freed_pages,
transaction_guard: guard.clone(),
}
}
fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
self.freed_pages.clone()
}
fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
&'s mut self,
transaction: &'txn WriteTransaction,
definition: SystemTableDefinition<K, V>,
) -> Result<SystemTable<'db, 's, K, V>> {
let (root, _) = self
.table_tree
.get_or_create_table::<K, V>(definition.name(), TableType::Normal)
.map_err(|e| {
e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
})?;
transaction.dirty.store(true, Ordering::Release);
Ok(SystemTable::new(
definition.name(),
root,
self.freed_pages.clone(),
self.transaction_guard.clone(),
transaction.mem.clone(),
self,
))
}
fn close_table<K: Key + 'static, V: Value + 'static>(
&mut self,
name: &str,
table: &BtreeMut<K, V>,
length: u64,
) {
self.table_tree
.stage_update_table_root(name, table.get_root(), length);
}
}
struct TableNamespace<'db> {
open_tables: HashMap<String, &'static panic::Location<'static>>,
allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
table_tree: TableTreeMut<'db>,
}
impl TableNamespace<'_> {
fn new(
root_page: Option<BtreeHeader>,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
) -> Self {
let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
let freed_pages = Arc::new(Mutex::new(vec![]));
let table_tree = TableTreeMut::new(
root_page,
guard,
mem,
freed_pages.clone(),
allocated.clone(),
);
Self {
open_tables: Default::default(),
table_tree,
freed_pages,
allocated_pages: allocated,
}
}
fn set_dirty(&mut self, transaction: &WriteTransaction) {
transaction.dirty.store(true, Ordering::Release);
if !transaction
.transaction_tracker
.any_savepoint_exists()
.unwrap_or(true)
{
*self.allocated_pages.lock() = PageTrackerPolicy::Ignore;
}
}
fn set_root(&mut self, root: Option<BtreeHeader>) {
debug_assert!(self.open_tables.is_empty());
self.table_tree.set_root(root);
}
#[track_caller]
fn inner_open<K: Key + 'static, V: Value + 'static>(
&mut self,
name: &str,
table_type: TableType,
) -> Result<(Option<BtreeHeader>, u64), TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
let root = self
.table_tree
.get_or_create_table::<K, V>(name, table_type)?;
self.open_tables
.insert(name.to_string(), panic::Location::caller());
Ok(root)
}
#[track_caller]
pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
&mut self,
transaction: &'txn WriteTransaction,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'txn, K, V>, TableError> {
#[cfg(feature = "logging")]
debug!("Opening multimap table: {definition}");
let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
self.set_dirty(transaction);
Ok(MultimapTable::new(
definition.name(),
root,
length,
self.freed_pages.clone(),
self.allocated_pages.clone(),
transaction.mem.clone(),
transaction,
))
}
#[track_caller]
pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
&mut self,
transaction: &'txn WriteTransaction,
definition: TableDefinition<K, V>,
) -> Result<Table<'txn, K, V>, TableError> {
#[cfg(feature = "logging")]
debug!("Opening table: {definition}");
let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
self.set_dirty(transaction);
Ok(Table::new(
definition.name(),
root,
self.freed_pages.clone(),
self.allocated_pages.clone(),
transaction.mem.clone(),
transaction,
))
}
#[track_caller]
fn inner_rename(
&mut self,
name: &str,
new_name: &str,
table_type: TableType,
) -> Result<(), TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
self.table_tree.rename_table(name, new_name, table_type)
}
#[track_caller]
fn rename_table(
&mut self,
transaction: &WriteTransaction,
name: &str,
new_name: &str,
) -> Result<(), TableError> {
#[cfg(feature = "logging")]
debug!("Renaming table: {name} to {new_name}");
self.set_dirty(transaction);
self.inner_rename(name, new_name, TableType::Normal)
}
#[track_caller]
fn rename_multimap_table(
&mut self,
transaction: &WriteTransaction,
name: &str,
new_name: &str,
) -> Result<(), TableError> {
#[cfg(feature = "logging")]
debug!("Renaming multimap table: {name} to {new_name}");
self.set_dirty(transaction);
self.inner_rename(name, new_name, TableType::Multimap)
}
#[track_caller]
fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
self.table_tree.delete_table(name, table_type)
}
#[track_caller]
fn delete_table(
&mut self,
transaction: &WriteTransaction,
name: &str,
) -> Result<bool, TableError> {
#[cfg(feature = "logging")]
debug!("Deleting table: {name}");
self.set_dirty(transaction);
self.inner_delete(name, TableType::Normal)
}
#[track_caller]
fn delete_multimap_table(
&mut self,
transaction: &WriteTransaction,
name: &str,
) -> Result<bool, TableError> {
#[cfg(feature = "logging")]
debug!("Deleting multimap table: {name}");
self.set_dirty(transaction);
self.inner_delete(name, TableType::Multimap)
}
pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
&mut self,
name: &str,
table: &BtreeMut<K, V>,
length: u64,
) {
let _ = self.open_tables.remove(name);
self.table_tree
.stage_update_table_root(name, table.get_root(), length);
}
}
pub struct WriteTransaction {
transaction_tracker: Arc<TransactionTracker>,
mem: Arc<TransactionalMemory>,
transaction_guard: Arc<TransactionGuard>,
transaction_id: TransactionId,
tables: Mutex<TableNamespace<'static>>,
system_tables: Mutex<SystemNamespace<'static>>,
completed: bool,
dirty: AtomicBool,
durability: InternalDurability,
two_phase_commit: bool,
shrink_policy: ShrinkPolicy,
quick_repair: bool,
created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
blob_writer_active: AtomicBool,
blob_dedup_config: BlobDedupConfig,
pub(crate) cdc_log: Option<Mutex<Vec<CdcEvent>>>,
cdc_config: CdcConfig,
history_retention: u64,
}
impl WriteTransaction {
pub(crate) fn new(
guard: TransactionGuard,
transaction_tracker: Arc<TransactionTracker>,
mem: Arc<TransactionalMemory>,
blob_dedup_config: BlobDedupConfig,
cdc_config: CdcConfig,
history_retention: u64,
) -> Result<Self> {
let transaction_id = guard.id()?;
let guard = Arc::new(guard);
let root_page = mem.get_data_root();
let system_page = mem.get_system_root();
let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
Ok(Self {
transaction_tracker,
mem: mem.clone(),
transaction_guard: guard.clone(),
transaction_id,
tables: Mutex::new(tables),
system_tables: Mutex::new(system_tables),
completed: false,
dirty: AtomicBool::new(false),
durability: InternalDurability::Immediate,
two_phase_commit: false,
quick_repair: false,
shrink_policy: ShrinkPolicy::Default,
created_persistent_savepoints: Mutex::new(Default::default()),
deleted_persistent_savepoints: Mutex::new(vec![]),
blob_writer_active: AtomicBool::new(false),
blob_dedup_config,
cdc_log: if cdc_config.enabled {
Some(Mutex::new(Vec::new()))
} else {
None
},
cdc_config,
history_retention,
})
}
pub(crate) fn record_cdc(&self, event: CdcEvent) {
if let Some(ref log) = self.cdc_log {
log.lock().push(event);
}
}
pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
self.shrink_policy = shrink_policy;
}
pub(crate) fn pending_free_pages(&self) -> Result<bool> {
let mut system_tables = self.system_tables.lock();
if system_tables
.open_system_table(self, DATA_FREED_TABLE)?
.tree
.get_root()
.is_some()
{
return Ok(true);
}
if system_tables
.open_system_table(self, SYSTEM_FREED_TABLE)?
.tree
.get_root()
.is_some()
{
return Ok(true);
}
Ok(false)
}
#[cfg(all(debug_assertions, feature = "std"))]
pub fn print_allocated_page_debug(&self) {
let mut all_allocated: HashSet<PageNumber> =
HashSet::from_iter(self.mem.all_allocated_pages());
self.mem.debug_check_allocator_consistency();
let mut table_pages = vec![];
self.tables
.lock()
.table_tree
.visit_all_pages(|path| {
table_pages.push(path.page_number());
Ok(())
})
.unwrap();
println!("Tables");
for p in table_pages {
assert!(all_allocated.remove(&p));
println!("{p:?}");
}
let mut system_table_pages = vec![];
self.system_tables
.lock()
.table_tree
.visit_all_pages(|path| {
system_table_pages.push(path.page_number());
Ok(())
})
.unwrap();
println!("System tables");
for p in system_table_pages {
assert!(all_allocated.remove(&p));
println!("{p:?}");
}
{
println!("Pending free (in data freed table)");
let mut system_tables = self.system_tables.lock();
let data_freed = system_tables
.open_system_table(self, DATA_FREED_TABLE)
.unwrap();
for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
let (_, entry) = entry.unwrap();
let value = entry.value();
for i in 0..value.len() {
let p = value.get(i);
assert!(all_allocated.remove(&p));
println!("{p:?}");
}
}
}
{
println!("Pending free (in system freed table)");
let mut system_tables = self.system_tables.lock();
let system_freed = system_tables
.open_system_table(self, SYSTEM_FREED_TABLE)
.unwrap();
for entry in system_freed
.range::<TransactionIdWithPagination>(..)
.unwrap()
{
let (_, entry) = entry.unwrap();
let value = entry.value();
for i in 0..value.len() {
let p = value.get(i);
assert!(all_allocated.remove(&p));
println!("{p:?}");
}
}
}
{
let tables = self.tables.lock();
let pages = tables.freed_pages.lock();
if !pages.is_empty() {
println!("Pages in in-memory data freed_pages");
for p in pages.iter() {
println!("{p:?}");
assert!(all_allocated.remove(p));
}
}
}
{
let system_tables = self.system_tables.lock();
let pages = system_tables.freed_pages.lock();
if !pages.is_empty() {
println!("Pages in in-memory system freed_pages");
for p in pages.iter() {
println!("{p:?}");
assert!(all_allocated.remove(p));
}
}
}
if !all_allocated.is_empty() {
println!("Leaked pages");
for p in all_allocated {
println!("{p:?}");
}
}
}
pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
if self.durability != InternalDurability::Immediate {
return Err(SavepointError::InvalidSavepoint);
}
let mut savepoint = self.ephemeral_savepoint()?;
let mut system_tables = self.system_tables.lock();
let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
next_table.insert((), savepoint.get_id().next()?)?;
drop(next_table);
let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
savepoint_table.insert(
savepoint.get_id(),
SerializedSavepoint::from_savepoint(&savepoint),
)?;
savepoint.set_persistent();
self.created_persistent_savepoints
.lock()
.insert(savepoint.get_id());
Ok(savepoint.get_id().0)
}
pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
self.transaction_guard.clone()
}
pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
let mut system_tables = self.system_tables.lock();
let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
let value = next_table.get(())?;
if let Some(next_id) = value {
Ok(Some(next_id.value()))
} else {
Ok(None)
}
}
pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
let mut system_tables = self.system_tables.lock();
let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let value = table.get(SavepointId(id))?;
value
.map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
.ok_or(SavepointError::InvalidSavepoint)
}
pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
if self.durability != InternalDurability::Immediate {
return Err(SavepointError::InvalidSavepoint);
}
let mut system_tables = self.system_tables.lock();
let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let savepoint = table.remove(SavepointId(id))?;
if let Some(serialized) = savepoint {
let savepoint = serialized
.value()
.to_savepoint(self.transaction_tracker.clone());
self.deleted_persistent_savepoints
.lock()
.push((savepoint.get_id(), savepoint.get_transaction_id()));
Ok(true)
} else {
Ok(false)
}
}
pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
let mut system_tables = self.system_tables.lock();
let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let mut savepoints = vec![];
for savepoint in table.range::<SavepointId>(..)? {
savepoints.push(savepoint?.0.value().0);
}
Ok(savepoints.into_iter())
}
fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
let id = self
.transaction_tracker
.register_read_transaction(&self.mem)?;
Ok(TransactionGuard::new_read(
id,
self.transaction_tracker.clone(),
))
}
fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
let transaction_id = self.allocate_read_transaction()?.leak()?;
let id = self
.transaction_tracker
.allocate_savepoint(transaction_id)?;
Ok((id, transaction_id))
}
pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
if self.dirty.load(Ordering::Acquire) {
return Err(SavepointError::InvalidSavepoint);
}
let (id, transaction_id) = self.allocate_savepoint()?;
#[cfg(feature = "logging")]
debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
let root = self.mem.get_data_root();
let savepoint = Savepoint::new_ephemeral(
&self.mem,
self.transaction_tracker.clone(),
id,
transaction_id,
root,
);
Ok(savepoint)
}
pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
assert_eq!(
core::ptr::from_ref(self.transaction_tracker.as_ref()),
savepoint.db_address()
);
if !self
.transaction_tracker
.is_valid_savepoint(savepoint.get_id())?
{
return Err(SavepointError::InvalidSavepoint);
}
#[cfg(feature = "logging")]
debug!(
"Beginning savepoint restore (id={:?}) in transaction id={:?}",
savepoint.get_id(),
self.transaction_id
);
assert_eq!(self.mem.get_version(), savepoint.get_version());
self.dirty.store(true, Ordering::Release);
{
self.tables.lock().set_root(savepoint.get_user_root());
}
let txn_id = savepoint.get_transaction_id().next()?.raw_id();
{
let lower = TransactionIdWithPagination {
transaction_id: txn_id,
pagination_id: 0,
};
let mut system_tables = self.system_tables.lock();
let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
entry?;
}
}
{
let tables = self.tables.lock();
let mut data_freed_pages = tables.freed_pages.lock();
let mut system_tables = self.system_tables.lock();
let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
let lower = TransactionIdWithPagination {
transaction_id: txn_id,
pagination_id: 0,
};
for entry in data_allocated.range(lower..)? {
let (_, value) = entry?;
for i in 0..value.value().len() {
data_freed_pages.push(value.value().get(i));
}
}
}
self.transaction_tracker
.invalidate_savepoints_after(savepoint.get_id())?;
for persistent_savepoint in self.list_persistent_savepoints()? {
if persistent_savepoint > savepoint.get_id().0 {
self.delete_persistent_savepoint(persistent_savepoint)?;
}
}
Ok(())
}
pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
let created = !self.created_persistent_savepoints.lock().is_empty();
let deleted = !self.deleted_persistent_savepoints.lock().is_empty();
if (created || deleted) && !matches!(durability, Durability::Immediate) {
return Err(SetDurabilityError::PersistentSavepointModified);
}
self.durability = match durability {
Durability::None => InternalDurability::None,
Durability::Immediate => InternalDurability::Immediate,
};
Ok(())
}
pub fn set_two_phase_commit(&mut self, enabled: bool) {
self.two_phase_commit = enabled;
}
pub fn set_quick_repair(&mut self, enabled: bool) {
self.quick_repair = enabled;
}
#[track_caller]
pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
&'txn self,
definition: TableDefinition<K, V>,
) -> Result<Table<'txn, K, V>, TableError> {
self.tables.lock().open_table(self, definition)
}
#[cfg(feature = "std")]
#[track_caller]
pub fn open_ttl_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: crate::ttl_table::TtlTableDefinition<K, V>,
) -> Result<crate::ttl_table::TtlTable<'_, K, V>, TableError> {
let inner = self.open_table(definition.inner_def())?;
Ok(crate::ttl_table::TtlTable::new(inner))
}
#[track_caller]
pub fn open_ivfpq_index(
&self,
definition: &crate::ivfpq::config::IvfPqIndexDefinition,
) -> Result<crate::ivfpq::index::IvfPqIndex<'_, Self>, TableError> {
crate::ivfpq::index::IvfPqIndex::open(self, definition).map_err(TableError::Storage)
}
pub fn open_fractal_index(
&self,
definition: &crate::fractal::config::FractalIndexDefinition,
) -> Result<crate::fractal::index::FractalIndex<'_, Self>, TableError> {
crate::fractal::index::FractalIndex::open(self, definition).map_err(TableError::Storage)
}
#[track_caller]
pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
&'txn self,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'txn, K, V>, TableError> {
self.tables.lock().open_multimap_table(self, definition)
}
pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
&self,
name: &str,
table: &BtreeMut<K, V>,
length: u64,
) {
self.tables.lock().close_table(name, table, length);
}
pub fn rename_table(
&self,
definition: impl TableHandle,
new_name: impl TableHandle,
) -> Result<(), TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables
.lock()
.rename_table(self, &name, new_name.name())
}
pub fn rename_multimap_table(
&self,
definition: impl MultimapTableHandle,
new_name: impl MultimapTableHandle,
) -> Result<(), TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables
.lock()
.rename_multimap_table(self, &name, new_name.name())
}
pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables.lock().delete_table(self, &name)
}
pub fn delete_multimap_table(
&self,
definition: impl MultimapTableHandle,
) -> Result<bool, TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables.lock().delete_multimap_table(self, &name)
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
self.tables
.lock()
.table_tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter().map(UntypedTableHandle::new))
}
pub fn list_multimap_tables(
&self,
) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
self.tables
.lock()
.table_tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
}
pub fn store_blob(
&self,
data: &[u8],
content_type: ContentType,
label: &str,
opts: StoreOptions,
) -> Result<BlobId> {
if self.blob_writer_active.load(Ordering::Acquire) {
return Err(StorageError::BlobWriterActive);
}
let mut blob_state = self.mem.get_blob_state();
if blob_state.region_offset == 0 {
let file_len = self.mem.file_len()?;
blob_state.region_offset = file_len;
}
let sequence = blob_state.next_sequence;
blob_state.next_sequence = sequence + 1;
let prefix_len = data.len().min(4096);
let content_prefix_hash = xxh3_hash64(&data[..prefix_len]);
let blob_id = BlobId::new(sequence, content_prefix_hash);
let checksum = xxh3_hash128(data);
let dedup_eligible =
self.blob_dedup_config.enabled && data.len() >= self.blob_dedup_config.min_size;
let sha_key = if dedup_eligible {
let hash: [u8; 32] = Sha256::digest(data).into();
Some(Sha256Key(hash))
} else {
None
};
let dedup_hit = if let Some(ref sha_key) = sha_key {
let mut system_tables = self.system_tables.lock();
let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
dedup_table.get(sha_key)?.map(|g| g.value())
} else {
None
};
let blob_ref = if let Some(existing) = dedup_hit {
BlobRef {
offset: existing.offset,
length: existing.length,
checksum: existing.checksum,
ref_count: 1,
content_type: content_type.as_byte(),
compression: 0,
}
} else {
let blob_offset = blob_state.region_length;
let file_offset = blob_state.region_offset + blob_offset;
self.mem.blob_write(file_offset, data)?;
blob_state.region_length += data.len() as u64;
BlobRef {
offset: blob_offset,
length: data.len() as u64,
checksum,
ref_count: 1,
content_type: content_type.as_byte(),
compression: 0,
}
};
let hlc = HybridLogicalClock::from_raw(blob_state.hlc_state).advance();
blob_state.hlc_state = hlc.to_raw();
#[allow(clippy::cast_possible_truncation)]
let wall_clock_ns = {
#[cfg(feature = "std")]
{
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
#[cfg(not(feature = "std"))]
{
0u64
}
};
let causal_parent = opts.causal_link.as_ref().map(|l| l.parent);
let meta = BlobMeta::new(blob_ref, wall_clock_ns, hlc.to_raw(), causal_parent, label);
{
let mut system_tables = self.system_tables.lock();
let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
blob_table.insert(&blob_id, &meta)?;
drop(blob_table);
let temporal_key = TemporalKey::new(wall_clock_ns, hlc, blob_id);
let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
temporal_table.insert(&temporal_key, &())?;
drop(temporal_table);
if let Some(link) = &opts.causal_link {
let edge = CausalEdge::new(blob_id, link.relation, &link.context);
let edge_key = CausalEdgeKey::new(link.parent, blob_id);
let mut causal_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
causal_table.insert(&edge_key, &edge)?;
drop(causal_table);
}
Self::index_tags_and_namespace(
&mut system_tables,
self,
blob_id,
&opts.tags,
opts.namespace.as_deref(),
)?;
if let Some(sha_key) = sha_key {
if let Some(existing) = dedup_hit {
let mut dedup_table =
system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
let updated = DedupVal {
ref_count: existing.ref_count + 1,
..existing
};
dedup_table.insert(&sha_key, &updated)?;
drop(dedup_table);
} else {
let mut dedup_table =
system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
let entry = DedupVal {
offset: blob_ref.offset,
length: blob_ref.length,
checksum: blob_ref.checksum,
ref_count: 1,
};
dedup_table.insert(&sha_key, &entry)?;
drop(dedup_table);
}
let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
dedup_map.insert(&blob_id, &sha_key)?;
drop(dedup_map);
}
}
self.mem.set_pending_blob_state(blob_state);
self.dirty.store(true, Ordering::Release);
Ok(blob_id)
}
pub fn blob_writer(
&self,
content_type: ContentType,
label: &str,
opts: StoreOptions,
) -> Result<BlobWriter<'_>> {
if self
.blob_writer_active
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Err(StorageError::BlobWriterActive);
}
let mut blob_state = self.mem.get_blob_state();
if blob_state.region_offset == 0 {
let file_len = self.mem.file_len()?;
blob_state.region_offset = file_len;
}
let sequence = blob_state.next_sequence;
blob_state.next_sequence = sequence + 1;
let blob_region_start = blob_state.region_length;
let blob_file_offset = blob_state.region_offset + blob_region_start;
self.mem.set_pending_blob_state(blob_state);
Ok(BlobWriter::new(
self,
sequence,
content_type,
label,
opts,
blob_file_offset,
blob_region_start,
self.blob_dedup_config.enabled,
))
}
pub(crate) fn blob_write_raw(&self, file_offset: u64, data: &[u8]) -> Result {
self.mem.blob_write(file_offset, data)
}
pub(crate) fn finalize_blob_writer(
&self,
blob_id: BlobId,
mut meta: BlobMeta,
bytes_written: u64,
opts: StoreOptions,
sha_key: Option<Sha256Key>,
) -> Result {
let mut blob_state = self.mem.get_blob_state();
let hlc = HybridLogicalClock::from_raw(blob_state.hlc_state).advance();
blob_state.hlc_state = hlc.to_raw();
meta.hlc = hlc.to_raw();
blob_state.region_length = meta.blob_ref.offset + bytes_written;
{
let mut system_tables = self.system_tables.lock();
let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
blob_table.insert(&blob_id, &meta)?;
drop(blob_table);
let temporal_key = TemporalKey::new(meta.wall_clock_ns, hlc, blob_id);
let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
temporal_table.insert(&temporal_key, &())?;
drop(temporal_table);
if let Some(link) = &opts.causal_link {
let edge = CausalEdge::new(blob_id, link.relation, &link.context);
let edge_key = CausalEdgeKey::new(link.parent, blob_id);
let mut causal_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
causal_table.insert(&edge_key, &edge)?;
drop(causal_table);
}
Self::index_tags_and_namespace(
&mut system_tables,
self,
blob_id,
&opts.tags,
opts.namespace.as_deref(),
)?;
if let Some(sha_key) = sha_key {
let existing = {
let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
dedup_table.get(&sha_key)?.map(|g| g.value())
};
if let Some(existing) = existing {
let mut dedup_table =
system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
let updated = DedupVal {
ref_count: existing.ref_count + 1,
..existing
};
dedup_table.insert(&sha_key, &updated)?;
drop(dedup_table);
} else {
let mut dedup_table =
system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
let entry = DedupVal {
offset: meta.blob_ref.offset,
length: meta.blob_ref.length,
checksum: meta.blob_ref.checksum,
ref_count: 1,
};
dedup_table.insert(&sha_key, &entry)?;
drop(dedup_table);
}
let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
dedup_map.insert(&blob_id, &sha_key)?;
drop(dedup_map);
}
}
self.mem.set_pending_blob_state(blob_state);
self.dirty.store(true, Ordering::Release);
Ok(())
}
pub(crate) fn blob_writer_active(&self) -> &AtomicBool {
&self.blob_writer_active
}
fn index_tags_and_namespace(
system_tables: &mut SystemNamespace<'_>,
txn: &WriteTransaction,
blob_id: BlobId,
tags: &[String],
namespace: Option<&str>,
) -> Result {
let tag_count = tags.len().min(MAX_TAGS_PER_BLOB);
if tag_count > 0 {
let mut tag_table = system_tables.open_system_table(txn, BLOB_TAG_INDEX)?;
for tag in &tags[..tag_count] {
let tag_key = TagKey::new(tag, blob_id);
tag_table.insert(&tag_key, &())?;
}
drop(tag_table);
}
if let Some(ns) = namespace {
let ns_val = NamespaceVal::new(ns);
let mut ns_table = system_tables.open_system_table(txn, BLOB_NAMESPACE)?;
ns_table.insert(&blob_id, &ns_val)?;
drop(ns_table);
let ns_key = NamespaceKey::new(ns, blob_id);
let mut ns_idx = system_tables.open_system_table(txn, BLOB_NAMESPACE_INDEX)?;
ns_idx.insert(&ns_key, &())?;
drop(ns_idx);
}
Ok(())
}
pub fn blob_tags(&self, blob_id: &BlobId) -> Result<Vec<String>> {
let mut system_tables = self.system_tables.lock();
let tag_table = system_tables.open_system_table(self, BLOB_TAG_INDEX)?;
let mut tags = Vec::new();
let range = tag_table.range::<TagKey>(..)?;
for entry in range {
let (key_guard, _) = entry?;
let key = key_guard.value();
if key.blob_id == *blob_id {
tags.push(key.tag_str().to_string());
}
}
Ok(tags)
}
pub fn blob_namespace(&self, blob_id: &BlobId) -> Result<Option<String>> {
let mut system_tables = self.system_tables.lock();
let ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
match ns_table.get(blob_id)? {
Some(g) => Ok(Some(g.value().namespace_str().to_string())),
None => Ok(None),
}
}
pub fn get_blob(&self, blob_id: &BlobId) -> Result<Option<(Vec<u8>, BlobMeta)>> {
let meta = {
let mut system_tables = self.system_tables.lock();
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
match blob_table.get(blob_id)? {
Some(g) => g.value(),
None => return Ok(None),
}
};
let blob_state = self.mem.get_blob_state();
let file_offset = blob_state.region_offset + meta.blob_ref.offset;
#[allow(clippy::cast_possible_truncation)]
let data = self
.mem
.blob_read(file_offset, meta.blob_ref.length as usize)?;
let actual = xxh3_hash128(&data);
if actual != meta.blob_ref.checksum {
return Err(StorageError::BlobChecksumMismatch {
sequence: blob_id.sequence,
expected: meta.blob_ref.checksum,
actual,
});
}
Ok(Some((data, meta)))
}
pub fn get_blob_meta(&self, blob_id: &BlobId) -> Result<Option<BlobMeta>> {
let mut system_tables = self.system_tables.lock();
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
let guard = blob_table.get(blob_id)?;
Ok(guard.map(|g| g.value()))
}
pub fn read_blob_range(
&self,
blob_id: &BlobId,
offset: u64,
length: u64,
) -> Result<Option<Vec<u8>>> {
let meta = {
let mut system_tables = self.system_tables.lock();
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
match blob_table.get(blob_id)? {
Some(g) => g.value(),
None => return Ok(None),
}
};
if length == 0 {
return Ok(Some(Vec::new()));
}
let end = offset.saturating_add(length);
if end > meta.blob_ref.length {
return Err(StorageError::BlobRangeOutOfBounds {
blob_length: meta.blob_ref.length,
requested_offset: offset,
requested_length: length,
});
}
let blob_state = self.mem.get_blob_state();
let file_offset = blob_state.region_offset + meta.blob_ref.offset + offset;
#[allow(clippy::cast_possible_truncation)]
let data = self.mem.blob_read(file_offset, length as usize)?;
Ok(Some(data))
}
pub fn blob_reader(&self, blob_id: &BlobId) -> Result<Option<BlobReader>> {
let meta = {
let mut system_tables = self.system_tables.lock();
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
match blob_table.get(blob_id)? {
Some(g) => g.value(),
None => return Ok(None),
}
};
let blob_state = self.mem.get_blob_state();
let file_offset = blob_state.region_offset + meta.blob_ref.offset;
Ok(Some(BlobReader::new(
Arc::clone(&self.mem),
file_offset,
meta.blob_ref.length,
)))
}
pub fn delete_blob(&self, blob_id: &BlobId) -> Result<bool> {
let mut system_tables = self.system_tables.lock();
let meta = {
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
match blob_table.get(blob_id)? {
Some(g) => g.value(),
None => return Ok(false),
}
};
let temporal_key = TemporalKey::new(
meta.wall_clock_ns,
HybridLogicalClock::from_raw(meta.hlc),
*blob_id,
);
let mut temporal_table = system_tables.open_system_table(self, BLOB_TEMPORAL_INDEX)?;
temporal_table.remove(&temporal_key)?;
drop(temporal_table);
if let Some(parent) = meta.causal_parent {
let mut edges_table = system_tables.open_system_table(self, BLOB_CAUSAL_EDGES)?;
edges_table.remove(&CausalEdgeKey::new(parent, *blob_id))?;
drop(edges_table);
let mut legacy_table = system_tables.open_system_table(self, BLOB_CAUSAL_CHILDREN)?;
let should_remove_legacy = legacy_table
.get(&parent)?
.is_some_and(|g| g.value() == *blob_id);
if should_remove_legacy {
legacy_table.remove(&parent)?;
}
drop(legacy_table);
}
{
let mut tag_table = system_tables.open_system_table(self, BLOB_TAG_INDEX)?;
let mut to_remove = Vec::new();
let range = tag_table.range::<TagKey>(..)?;
for entry in range {
let (key_guard, _) = entry?;
let key = key_guard.value();
if key.blob_id == *blob_id {
to_remove.push(key);
}
}
for key in &to_remove {
tag_table.remove(key)?;
}
drop(tag_table);
}
{
let ns_key = {
let ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
match ns_table.get(blob_id)? {
Some(ns_guard) => {
let ns_str = ns_guard.value().namespace_str().to_string();
Some(NamespaceKey::new(&ns_str, *blob_id))
}
None => None,
}
};
if let Some(ns_key) = ns_key {
let mut ns_table = system_tables.open_system_table(self, BLOB_NAMESPACE)?;
ns_table.remove(blob_id)?;
drop(ns_table);
let mut ns_idx = system_tables.open_system_table(self, BLOB_NAMESPACE_INDEX)?;
ns_idx.remove(&ns_key)?;
drop(ns_idx);
}
}
let sha_key = {
let dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
let result = dedup_map.get(blob_id)?.map(|g| g.value());
drop(dedup_map);
result
};
if let Some(sha_key) = sha_key {
let dedup_val = {
let dedup_idx = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
let result = dedup_idx.get(&sha_key)?.map(|g| g.value());
drop(dedup_idx);
result
};
if let Some(val) = dedup_val {
let mut dedup_idx = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
if val.ref_count > 1 {
let updated = DedupVal {
offset: val.offset,
length: val.length,
checksum: val.checksum,
ref_count: val.ref_count - 1,
};
dedup_idx.insert(&sha_key, &updated)?;
} else {
dedup_idx.remove(&sha_key)?;
}
drop(dedup_idx);
}
let mut dedup_map = system_tables.open_system_table(self, BLOB_DEDUP_MAP)?;
dedup_map.remove(blob_id)?;
drop(dedup_map);
}
let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
blob_table.remove(blob_id)?;
drop(blob_table);
self.dirty.store(true, Ordering::Release);
Ok(true)
}
pub fn blob_stats(&self) -> Result<BlobStats> {
let blob_state = self.mem.get_blob_state();
let region_bytes = blob_state.region_length;
if region_bytes == 0 {
return Ok(BlobStats {
blob_count: 0,
live_bytes: 0,
region_bytes: 0,
dead_bytes: 0,
fragmentation_ratio: 0.0,
});
}
let mut system_tables = self.system_tables.lock();
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
let mut blob_count: u64 = 0;
let mut unique_offsets = crate::compat::HashSet::new();
let mut live_bytes: u64 = 0;
let range = blob_table.range::<BlobId>(..)?;
for entry in range {
let (_, value_guard) = entry?;
let meta = value_guard.value();
blob_count += 1;
if unique_offsets.insert(meta.blob_ref.offset) {
live_bytes += meta.blob_ref.length;
}
}
drop(blob_table);
let dead_bytes = region_bytes.saturating_sub(live_bytes);
#[allow(clippy::cast_precision_loss)]
let fragmentation_ratio = if region_bytes > 0 {
dead_bytes as f64 / region_bytes as f64
} else {
0.0
};
Ok(BlobStats {
blob_count,
live_bytes,
region_bytes,
dead_bytes,
fragmentation_ratio,
})
}
pub(crate) fn compact_blobs_pass(&self, write_from_zero: bool) -> Result<(u64, u64)> {
let mut blob_state = self.mem.get_blob_state();
let region_offset = blob_state.region_offset;
let old_region_length = blob_state.region_length;
if old_region_length == 0 {
return Ok((0, 0));
}
let mut live_blobs: Vec<(BlobId, BlobMeta)> = Vec::new();
{
let mut system_tables = self.system_tables.lock();
let blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
let range = blob_table.range::<BlobId>(..)?;
for entry in range {
let (key_guard, value_guard) = entry?;
live_blobs.push((key_guard.value(), value_guard.value()));
}
drop(blob_table);
}
if live_blobs.is_empty() {
blob_state.region_length = 0;
self.mem.set_pending_blob_state(blob_state);
self.dirty.store(true, Ordering::Release);
return Ok((0, 0));
}
let mut unique_physical: Vec<(u64, u64)> = Vec::new(); {
let mut seen = crate::compat::HashSet::new();
for (_, meta) in &live_blobs {
if seen.insert(meta.blob_ref.offset) {
unique_physical.push((meta.blob_ref.offset, meta.blob_ref.length));
}
}
}
unique_physical.sort_by_key(|&(offset, _)| offset);
let write_base = if write_from_zero {
0
} else {
old_region_length
};
let mut offset_map = crate::compat::HashMap::new();
let mut write_cursor: u64 = 0;
for &(old_offset, length) in &unique_physical {
let src_file_offset = region_offset + old_offset;
let new_offset = write_base + write_cursor;
let dst_file_offset = region_offset + new_offset;
#[allow(clippy::cast_possible_truncation)]
let data = self.mem.blob_read(src_file_offset, length as usize)?;
self.mem.blob_write(dst_file_offset, &data)?;
offset_map.insert(old_offset, new_offset);
write_cursor += length;
}
let total_live_size = write_cursor;
let blobs_relocated = unique_physical.len() as u64;
{
let mut system_tables = self.system_tables.lock();
let mut blob_table = system_tables.open_system_table(self, BLOB_TABLE)?;
for (blob_id, meta) in &live_blobs {
if let Some(&new_offset) = offset_map.get(&meta.blob_ref.offset)
&& new_offset != meta.blob_ref.offset
{
let mut updated_meta = meta.clone();
updated_meta.blob_ref.offset = new_offset;
blob_table.insert(blob_id, &updated_meta)?;
}
}
drop(blob_table);
let dedup_table = system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
let mut dedup_entries: Vec<(Sha256Key, DedupVal)> = Vec::new();
let range = dedup_table.range::<Sha256Key>(..)?;
for entry in range {
let (key_guard, value_guard) = entry?;
dedup_entries.push((key_guard.value(), value_guard.value()));
}
drop(dedup_table);
if !dedup_entries.is_empty() {
let mut dedup_table_mut =
system_tables.open_system_table(self, BLOB_DEDUP_INDEX)?;
for (sha_key, val) in &dedup_entries {
if let Some(&new_offset) = offset_map.get(&val.offset)
&& new_offset != val.offset
{
let updated = DedupVal {
offset: new_offset,
length: val.length,
checksum: val.checksum,
ref_count: val.ref_count,
};
dedup_table_mut.insert(sha_key, &updated)?;
}
}
drop(dedup_table_mut);
}
}
if write_from_zero {
blob_state.region_length = total_live_size;
} else {
blob_state.region_length = old_region_length + total_live_size;
}
self.mem.set_pending_blob_state(blob_state);
self.dirty.store(true, Ordering::Release);
Ok((blobs_relocated, total_live_size))
}
pub fn commit(mut self) -> Result<(), CommitError> {
self.completed = true;
self.commit_inner()
}
fn commit_inner(&mut self) -> Result<(), CommitError> {
if self.quick_repair {
self.two_phase_commit = true;
}
if matches!(self.durability, InternalDurability::Immediate) {
let free_until_transaction = self
.transaction_tracker
.oldest_live_read_transaction()?
.map(|x| x.next())
.transpose()?
.unwrap_or(self.transaction_id);
if let Err(err) = self.process_freed_pages(free_until_transaction) {
self.tables.lock().table_tree.clear_root_updates_and_close();
return Err(err.into());
}
}
let (user_root, allocated_pages, data_freed) =
self.tables.lock().table_tree.flush_and_close()?;
self.store_data_freed_pages(data_freed)?;
self.store_allocated_pages(allocated_pages.into_iter().collect())?;
self.flush_cdc_log()?;
#[cfg(feature = "logging")]
debug!(
"Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
);
match self.durability {
InternalDurability::None => self.non_durable_commit(user_root)?,
InternalDurability::Immediate => self.durable_commit(user_root)?,
}
for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().iter() {
self.transaction_tracker
.deallocate_savepoint(*savepoint, *transaction)?;
}
debug_assert!(
self.system_tables
.lock()
.system_freed_pages()
.lock()
.is_empty()
);
debug_assert!(self.tables.lock().freed_pages.lock().is_empty());
#[cfg(feature = "logging")]
debug!(
"Finished commit of transaction id={:?}",
self.transaction_id
);
Ok(())
}
fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
let mut system_tables = self.system_tables.lock();
let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
let mut pagination_counter = 0;
while !freed_pages.is_empty() {
let chunk_size = 400;
let buffer_size = PageList::required_bytes(chunk_size);
let key = TransactionIdWithPagination {
transaction_id: self.transaction_id.raw_id(),
pagination_id: pagination_counter,
};
let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
let len = freed_pages.len();
access_guard.as_mut().clear();
for page in freed_pages.drain(len - min(len, chunk_size)..) {
debug_assert!(
self.mem.is_allocated(page),
"Page is not allocated: {page:?}"
);
debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
access_guard.as_mut().push_back(page);
}
pagination_counter += 1;
}
Ok(())
}
fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
let mut system_tables = self.system_tables.lock();
let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
let mut pagination_counter = 0;
while !data_allocated_pages.is_empty() {
let chunk_size = 400;
let buffer_size = PageList::required_bytes(chunk_size);
let key = TransactionIdWithPagination {
transaction_id: self.transaction_id.raw_id(),
pagination_id: pagination_counter,
};
let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
let len = data_allocated_pages.len();
access_guard.as_mut().clear();
for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
debug_assert!(
self.mem.is_allocated(page),
"Page is not allocated: {page:?}"
);
debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
access_guard.as_mut().push_back(page);
}
pagination_counter += 1;
}
let oldest = self
.transaction_tracker
.oldest_savepoint()?
.map_or(u64::MAX, |(_, x)| x.raw_id());
let key = TransactionIdWithPagination {
transaction_id: oldest,
pagination_id: 0,
};
for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
entry?;
}
Ok(())
}
fn flush_cdc_log(&self) -> Result {
let events = match self.cdc_log {
Some(ref log) => {
let mut guard = log.lock();
if guard.is_empty() {
return Ok(());
}
core::mem::take(&mut *guard)
}
None => return Ok(()),
};
let txn_id = self.transaction_id.raw_id();
let mut system_tables = self.system_tables.lock();
let mut cdc_table = system_tables.open_system_table(self, CDC_LOG_TABLE)?;
for (seq, event) in events.iter().enumerate() {
let key = CdcKey::new(txn_id, u32::try_from(seq).unwrap_or(u32::MAX));
let record = CdcRecord::from_event(event)?;
cdc_table.insert(&key, &record)?;
}
if self.cdc_config.retention_max_txns > 0 && txn_id > self.cdc_config.retention_max_txns {
let cutoff_txn = txn_id - self.cdc_config.retention_max_txns;
let end_key = CdcKey::new(cutoff_txn, u32::MAX);
for entry in cdc_table.extract_from_if(..=end_key, |_, _| true)? {
entry?;
}
}
Ok(())
}
pub fn advance_cdc_cursor(&self, name: &str, up_to_txn: u64) -> Result {
let mut system_tables = self.system_tables.lock();
let mut cursor_table = system_tables.open_system_table(self, CDC_CURSOR_TABLE)?;
cursor_table.insert(name, &up_to_txn)?;
Ok(())
}
pub(crate) fn list_history_snapshot_ids(&self) -> Result<Vec<u64>> {
let mut system_tables = self.system_tables.lock();
let history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
let mut ids = Vec::new();
for entry in history_table.range::<u64>(..)? {
let (key_guard, _) = entry?;
ids.push(key_guard.value());
}
Ok(ids)
}
pub(crate) fn purge_all_history_snapshots(&self) -> Result {
let mut system_tables = self.system_tables.lock();
let mut history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
let keys: Vec<u64> = history_table
.range::<u64>(..)?
.map(|entry| entry.map(|(k, _)| k.value()))
.collect::<Result<Vec<_>>>()?;
for key in &keys {
history_table.remove(key)?;
}
Ok(())
}
pub fn abort(mut self) -> Result {
self.completed = true;
self.abort_inner()
}
fn abort_inner(&mut self) -> Result {
#[cfg(feature = "logging")]
debug!("Aborting transaction id={:?}", self.transaction_id);
self.tables.lock().table_tree.clear_root_updates_and_close();
for savepoint in self.created_persistent_savepoints.lock().iter() {
match self.delete_persistent_savepoint(savepoint.0) {
Ok(_) => {}
Err(err) => match err {
SavepointError::InvalidSavepoint => {
return Err(StorageError::Corrupted(
"invalid savepoint encountered during transaction abort".to_string(),
));
}
SavepointError::Storage(storage_err) => {
return Err(storage_err);
}
},
}
}
self.mem.rollback_uncommitted_writes()?;
#[cfg(feature = "logging")]
debug!("Finished abort of transaction id={:?}", self.transaction_id);
Ok(())
}
pub(crate) fn durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
let mut system_tables = self.system_tables.lock();
if self.history_retention > 0 && !self.quick_repair {
#[cfg(feature = "std")]
#[allow(clippy::cast_possible_truncation)]
let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::ZERO)
.as_millis() as u64;
#[cfg(not(feature = "std"))]
let timestamp_ms = 0u64;
let blob_state = self.mem.get_blob_state();
let snapshot = HistorySnapshot::new(
user_root,
timestamp_ms,
blob_state.region_offset,
blob_state.region_length,
blob_state.next_sequence,
blob_state.hlc_state,
);
let mut history_table = system_tables.open_system_table(self, HISTORY_TABLE)?;
history_table.insert(&self.transaction_id.raw_id(), &snapshot)?;
self.transaction_tracker
.register_history_hold(self.transaction_id)?;
let mut all_keys = Vec::new();
for entry in history_table.range::<u64>(..)? {
let (key_guard, _) = entry?;
all_keys.push(key_guard.value());
}
let retention = usize::try_from(self.history_retention).unwrap_or(usize::MAX);
if all_keys.len() > retention {
let to_remove = all_keys.len() - retention;
for key in &all_keys[..to_remove] {
history_table.remove(key)?;
self.transaction_tracker
.deallocate_history_hold(TransactionId::new(*key))?;
}
}
}
let system_freed_pages = system_tables.system_freed_pages();
let system_tree = system_tables.table_tree.flush_table_root_updates()?;
system_tree
.delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
.map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
if self.quick_repair {
system_tree.create_table_and_flush_table_root(
ALLOCATOR_STATE_TABLE_NAME,
|system_tree_ref, tree: &mut AllocatorStateTreeMut| {
let mut pagination_counter = 0;
loop {
let num_regions = self
.mem
.reserve_allocator_state(tree, self.transaction_id)?;
self.store_system_freed_pages(
system_tree_ref,
system_freed_pages.clone(),
None,
&mut pagination_counter,
)?;
if self.mem.try_save_allocator_state(tree, num_regions)? {
return Ok(());
}
while let Some(guards) = tree.last()? {
let key = guards.0.value();
drop(guards);
tree.remove(&key)?;
}
}
},
)?;
}
let system_root = system_tree.finalize_dirty_checksums()?;
self.mem.commit(
user_root,
system_root,
self.transaction_id,
self.two_phase_commit,
self.shrink_policy,
)?;
self.transaction_tracker
.clear_pending_non_durable_commits()?;
for page in system_freed_pages.lock().drain(..) {
self.mem.free(page, &mut PageTrackerPolicy::Ignore);
}
Ok(())
}
pub(crate) fn non_durable_commit(&mut self, user_root: Option<BtreeHeader>) -> Result {
let mut free_until_transaction = self
.transaction_tracker
.oldest_live_read_nondurable_transaction()?
.map(|x| x.next())
.transpose()?
.unwrap_or(self.transaction_id);
if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint()? {
free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
}
self.process_freed_pages_nondurable(free_until_transaction)?;
let mut post_commit_frees = vec![];
let system_root = {
let mut system_tables = self.system_tables.lock();
let system_freed_pages = system_tables.system_freed_pages();
system_tables.table_tree.flush_table_root_updates()?;
for page in system_freed_pages
.lock()
.extract_if(.., |p| self.mem.unpersisted(*p))
{
post_commit_frees.push(page);
}
self.store_system_freed_pages(
&mut system_tables.table_tree,
system_freed_pages,
Some(&mut post_commit_frees),
&mut 0,
)?;
system_tables
.table_tree
.flush_table_root_updates()?
.finalize_dirty_checksums()?
};
self.mem
.non_durable_commit(user_root, system_root, self.transaction_id)?;
self.transaction_tracker.register_non_durable_commit(
self.transaction_id,
self.mem.get_last_durable_transaction_id()?,
)?;
for page in post_commit_frees {
self.mem.free(page, &mut PageTrackerPolicy::Ignore);
}
Ok(())
}
pub(crate) fn compact_pages(&mut self) -> Result<bool> {
let mut progress = false;
let mut highest_pages = BTreeMap::new();
let mut tables = self.tables.lock();
let table_tree = &mut tables.table_tree;
table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
let mut system_tables = self.system_tables.lock();
let system_table_tree = &mut system_tables.table_tree;
system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
let mut relocation_map = HashMap::new();
for path in highest_pages.into_values().rev() {
if relocation_map.contains_key(&path.page_number()) {
continue;
}
let old_page = self.mem.get_page(path.page_number())?;
let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
let new_page_number = new_page.get_page_number();
new_page.memory_mut()?[0] = old_page.memory()[0];
drop(new_page);
if new_page_number < path.page_number() {
relocation_map.insert(path.page_number(), new_page_number);
for parent in path.parents() {
if relocation_map.contains_key(parent) {
continue;
}
let old_parent = self.mem.get_page(*parent)?;
let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
let new_page_number = new_page.get_page_number();
new_page.memory_mut()?[0] = old_parent.memory()[0];
drop(new_page);
relocation_map.insert(*parent, new_page_number);
}
} else {
self.mem
.free(new_page_number, &mut PageTrackerPolicy::Ignore);
break;
}
}
if !relocation_map.is_empty() {
progress = true;
}
table_tree.relocate_tables(&relocation_map)?;
system_table_tree.relocate_tables(&relocation_map)?;
Ok(progress)
}
fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
let mut system_tables = self.system_tables.lock();
{
let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
let key = TransactionIdWithPagination {
transaction_id: free_until.raw_id(),
pagination_id: 0,
};
for entry in data_freed.extract_from_if(..key, |_, _| true)? {
let (_, page_list) = entry?;
for i in 0..page_list.value().len() {
self.mem
.free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
}
}
}
{
let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
let key = TransactionIdWithPagination {
transaction_id: free_until.raw_id(),
pagination_id: 0,
};
for entry in system_freed.extract_from_if(..key, |_, _| true)? {
let (_, page_list) = entry?;
for i in 0..page_list.value().len() {
self.mem
.free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
}
}
}
Ok(())
}
fn process_freed_pages_nondurable_helper(
&mut self,
free_until: TransactionId,
definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
) -> Result<Vec<TransactionId>> {
let mut processed = vec![];
let mut system_tables = self.system_tables.lock();
let last_key = TransactionIdWithPagination {
transaction_id: free_until.raw_id(),
pagination_id: 0,
};
let oldest_unprocessed = self
.transaction_tracker
.oldest_unprocessed_non_durable_commit()?
.map_or(free_until.raw_id(), |x| x.raw_id());
let first_key = TransactionIdWithPagination {
transaction_id: oldest_unprocessed,
pagination_id: 0,
};
let mut data_freed = system_tables.open_system_table(self, definition)?;
let mut candidate_transactions = vec![];
for entry in data_freed.range(first_key..last_key)? {
let (key, _) = entry?;
let transaction_id = TransactionId::new(key.value().transaction_id);
if self
.transaction_tracker
.is_unprocessed_non_durable_commit(transaction_id)?
{
candidate_transactions.push(transaction_id);
}
}
for transaction_id in candidate_transactions {
let mut key = TransactionIdWithPagination {
transaction_id: transaction_id.raw_id(),
pagination_id: 0,
};
loop {
let Some(entry) = data_freed.get(&key)? else {
break;
};
let pages = entry.value();
let mut new_pages = vec![];
for i in 0..pages.len() {
let page = pages.get(i);
if !self
.mem
.free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
{
new_pages.push(page);
}
}
if new_pages.len() != pages.len() {
drop(entry);
if new_pages.is_empty() {
data_freed.remove(&key)?;
} else {
let required = PageList::required_bytes(new_pages.len());
let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
for page in new_pages {
page_list_mut.as_mut().push_back(page);
}
}
}
key.pagination_id += 1;
}
processed.push(transaction_id);
}
Ok(processed)
}
fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
let mut processed =
self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
processed
.extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
for transaction_id in processed {
self.transaction_tracker
.mark_unprocessed_non_durable_commit(transaction_id)?;
}
Ok(())
}
fn store_system_freed_pages(
&self,
system_tree: &mut TableTreeMut,
system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
pagination_counter: &mut u64,
) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
system_tree.open_table_and_flush_table_root(
SYSTEM_FREED_TABLE.name(),
|system_freed_tree: &mut SystemFreedTree| {
while !system_freed_pages.lock().is_empty() {
let chunk_size = 200;
let buffer_size = PageList::required_bytes(chunk_size);
let key = TransactionIdWithPagination {
transaction_id: self.transaction_id.raw_id(),
pagination_id: *pagination_counter,
};
let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
let mut freed_pages = system_freed_pages.lock();
let len = freed_pages.len();
access_guard.as_mut().clear();
for page in freed_pages.drain(len - min(len, chunk_size)..) {
if let Some(ref mut unpersisted_pages) = unpersisted_pages
&& self.mem.unpersisted(page)
{
unpersisted_pages.push(page);
} else {
access_guard.as_mut().push_back(page);
}
}
drop(access_guard);
*pagination_counter += 1;
}
Ok(())
},
)?;
Ok(())
}
pub fn stats(&self) -> Result<DatabaseStats> {
let tables = self.tables.lock();
let table_tree = &tables.table_tree;
let data_tree_stats = table_tree.stats()?;
let system_tables = self.system_tables.lock();
let system_table_tree = &system_tables.table_tree;
let system_tree_stats = system_table_tree.stats()?;
let total_metadata_bytes = data_tree_stats.metadata_bytes()
+ system_tree_stats.metadata_bytes
+ system_tree_stats.stored_leaf_bytes;
let total_fragmented = data_tree_stats.fragmented_bytes()
+ system_tree_stats.fragmented_bytes
+ self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
Ok(DatabaseStats {
tree_height: data_tree_stats.tree_height(),
allocated_pages: self.mem.count_allocated_pages()?,
free_pages: self.mem.count_free_pages()?,
leaf_pages: data_tree_stats.leaf_pages(),
branch_pages: data_tree_stats.branch_pages(),
stored_leaf_bytes: data_tree_stats.stored_bytes(),
metadata_bytes: total_metadata_bytes,
fragmented_bytes: total_fragmented,
page_size: self.mem.get_page_size(),
})
}
#[allow(dead_code)]
#[cfg(feature = "std")]
pub(crate) fn print_debug(&self) -> Result {
let mut tables = self.tables.lock();
if let Some(page) = tables
.table_tree
.flush_table_root_updates()?
.finalize_dirty_checksums()?
{
eprintln!("Master tree:");
let master_tree: Btree<&str, InternalTableDefinition> = Btree::new_uncompressed(
Some(page),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
)?;
master_tree.print_debug(true)?;
}
let mut system_tables = self.system_tables.lock();
if let Some(page) = system_tables
.table_tree
.flush_table_root_updates()?
.finalize_dirty_checksums()?
{
eprintln!("System tree:");
let master_tree: Btree<&str, InternalTableDefinition> = Btree::new_uncompressed(
Some(page),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
)?;
master_tree.print_debug(true)?;
}
Ok(())
}
}
impl Drop for WriteTransaction {
fn drop(&mut self) {
let is_panicking = {
#[cfg(feature = "std")]
{
std::thread::panicking()
}
#[cfg(not(feature = "std"))]
{
false
}
};
if !self.completed && !is_panicking && !self.mem.storage_failure() {
#[allow(unused_variables)]
if let Err(error) = self.abort_inner() {
#[cfg(feature = "logging")]
warn!("Failure automatically aborting transaction: {error}");
}
} else if !self.completed && self.mem.storage_failure() {
self.tables.lock().table_tree.clear_root_updates_and_close();
}
}
}
pub struct ReadTransaction {
mem: Arc<TransactionalMemory>,
tree: TableTree,
}
impl ReadTransaction {
pub(crate) fn new(
mem: Arc<TransactionalMemory>,
guard: TransactionGuard,
) -> Result<Self, TransactionError> {
let root_page = mem.get_data_root();
let guard = Arc::new(guard);
Ok(Self {
mem: mem.clone(),
tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
.map_err(TransactionError::Storage)?,
})
}
pub(crate) fn new_historical(
mem: Arc<TransactionalMemory>,
guard: TransactionGuard,
user_root: Option<BtreeHeader>,
) -> Result<Self, TransactionError> {
let guard = Arc::new(guard);
Ok(Self {
mem: mem.clone(),
tree: TableTree::new(user_root, PageHint::Clean, guard, mem)
.map_err(TransactionError::Storage)?,
})
}
#[cfg(feature = "std")]
pub(crate) fn table_tree(&self) -> &TableTree {
&self.tree
}
pub fn open_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: TableDefinition<K, V>,
) -> Result<ReadOnlyTable<K, V>, TableError> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Normal)?
.ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
match header {
InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
definition.name().to_string(),
table_root,
PageHint::Clean,
self.tree.transaction_guard().clone(),
self.mem.clone(),
)?),
InternalTableDefinition::Multimap { .. } => {
Err(TableError::Storage(StorageError::Corrupted(
"unexpected multimap table type when opening normal table".to_string(),
)))
}
}
}
#[cfg(feature = "std")]
pub fn open_ttl_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: crate::ttl_table::TtlTableDefinition<K, V>,
) -> Result<crate::ttl_table::ReadOnlyTtlTable<K, V>, TableError> {
let inner = self.open_table(definition.inner_def())?;
Ok(crate::ttl_table::ReadOnlyTtlTable::new(inner))
}
pub fn open_ivfpq_index(
&self,
definition: &crate::ivfpq::config::IvfPqIndexDefinition,
) -> Result<crate::ivfpq::index::ReadOnlyIvfPqIndex, TableError> {
crate::ivfpq::index::ReadOnlyIvfPqIndex::open(self, definition).map_err(TableError::Storage)
}
pub fn open_fractal_index(
&self,
definition: &crate::fractal::config::FractalIndexDefinition,
) -> Result<crate::fractal::index::ReadOnlyFractalIndex, TableError> {
crate::fractal::index::ReadOnlyFractalIndex::open(self, definition)
.map_err(TableError::Storage)
}
pub fn open_untyped_table(
&self,
handle: impl TableHandle,
) -> Result<ReadOnlyUntypedTable, TableError> {
let header = self
.tree
.get_table_untyped(handle.name(), TableType::Normal)?
.ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
match header {
InternalTableDefinition::Normal {
table_root,
fixed_key_size,
fixed_value_size,
..
} => Ok(ReadOnlyUntypedTable::new(
table_root,
fixed_key_size,
fixed_value_size,
self.mem.clone(),
)),
InternalTableDefinition::Multimap { .. } => {
Err(TableError::Storage(StorageError::Corrupted(
"unexpected multimap table type when opening untyped normal table".to_string(),
)))
}
}
}
pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
&self,
definition: MultimapTableDefinition<K, V>,
) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Multimap)?
.ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
match header {
InternalTableDefinition::Normal { .. } => {
Err(TableError::Storage(StorageError::Corrupted(
"unexpected normal table type when opening multimap table".to_string(),
)))
}
InternalTableDefinition::Multimap {
table_root,
table_length,
..
} => Ok(ReadOnlyMultimapTable::new(
table_root,
table_length,
PageHint::Clean,
self.tree.transaction_guard().clone(),
self.mem.clone(),
)?),
}
}
pub fn open_untyped_multimap_table(
&self,
handle: impl MultimapTableHandle,
) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
let header = self
.tree
.get_table_untyped(handle.name(), TableType::Multimap)?
.ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
match header {
InternalTableDefinition::Normal { .. } => {
Err(TableError::Storage(StorageError::Corrupted(
"unexpected normal table type when opening untyped multimap table".to_string(),
)))
}
InternalTableDefinition::Multimap {
table_root,
table_length,
fixed_key_size,
fixed_value_size,
..
} => Ok(ReadOnlyUntypedMultimapTable::new(
table_root,
table_length,
fixed_key_size,
fixed_value_size,
self.mem.clone(),
)),
}
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
self.tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter().map(UntypedTableHandle::new))
}
pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
self.tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
}
fn open_system_btree<K: Key + 'static, V: Value + 'static>(
&self,
definition: SystemTableDefinition<K, V>,
) -> Result<Option<Btree<K, V>>> {
let system_root = self.mem.get_system_root();
let system_tree = TableTree::new(
system_root,
PageHint::Clean,
self.tree.transaction_guard().clone(),
self.mem.clone(),
)?;
let header = system_tree.get_table::<K, V>(definition.name(), TableType::Normal);
match header {
Ok(Some(InternalTableDefinition::Normal { table_root, .. })) => {
let btree = Btree::new_uncompressed(
table_root,
PageHint::Clean,
self.tree.transaction_guard().clone(),
self.mem.clone(),
)?;
Ok(Some(btree))
}
Ok(Some(InternalTableDefinition::Multimap { .. })) => Err(StorageError::Corrupted(
"unexpected multimap table type in system table lookup".to_string(),
)),
Ok(None) => Ok(None),
Err(e) => {
Err(e
.into_storage_error_or_corrupted("Internal error: blob system table corrupted"))
}
}
}
pub(crate) fn get_history_snapshot_ro(
&self,
transaction_id: u64,
) -> Result<Option<HistorySnapshot>> {
let Some(btree) = self.open_system_btree(HISTORY_TABLE)? else {
return Ok(None);
};
Ok(btree.get(&transaction_id)?.map(|guard| guard.value()))
}
pub(crate) fn list_history_snapshot_ids_ro(&self) -> Result<Vec<u64>> {
let Some(btree) = self.open_system_btree(HISTORY_TABLE)? else {
return Ok(Vec::new());
};
let mut ids = Vec::new();
for entry in btree.range::<core::ops::RangeFull, u64>(&(..))? {
let guard = entry?;
ids.push(guard.key());
}
Ok(ids)
}
pub fn get_blob(&self, blob_id: &BlobId) -> Result<Option<(Vec<u8>, BlobMeta)>> {
let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(None);
};
let Some(guard) = btree.get(blob_id)? else {
return Ok(None);
};
let meta = guard.value();
let blob_state = self.mem.get_committed_blob_state();
let file_offset = blob_state.region_offset + meta.blob_ref.offset;
#[allow(clippy::cast_possible_truncation)]
let data = self
.mem
.blob_read(file_offset, meta.blob_ref.length as usize)?;
let actual = xxh3_hash128(&data);
if actual != meta.blob_ref.checksum {
return Err(StorageError::BlobChecksumMismatch {
sequence: blob_id.sequence,
expected: meta.blob_ref.checksum,
actual,
});
}
Ok(Some((data, meta)))
}
pub fn get_blob_meta(&self, blob_id: &BlobId) -> Result<Option<BlobMeta>> {
let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(None);
};
Ok(btree.get(blob_id)?.map(|g| g.value()))
}
pub fn blob_by_sequence(&self, seq: u64) -> Result<Option<(BlobId, BlobMeta)>> {
let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(None);
};
let start = BlobId::new(seq, 0);
let end = BlobId::new(seq, u64::MAX);
let mut range = btree.range::<core::ops::RangeInclusive<BlobId>, BlobId>(&(start..=end))?;
match range.next() {
Some(entry) => {
let entry = entry?;
Ok(Some((entry.key(), entry.value())))
}
None => Ok(None),
}
}
pub fn composite_query(&self) -> crate::composite::CompositeQuery<'_> {
crate::composite::CompositeQuery::new(self)
}
pub fn read_blob_range(
&self,
blob_id: &BlobId,
offset: u64,
length: u64,
) -> Result<Option<Vec<u8>>> {
let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(None);
};
let Some(guard) = btree.get(blob_id)? else {
return Ok(None);
};
let meta = guard.value();
if length == 0 {
return Ok(Some(Vec::new()));
}
let end = offset.saturating_add(length);
if end > meta.blob_ref.length {
return Err(StorageError::BlobRangeOutOfBounds {
blob_length: meta.blob_ref.length,
requested_offset: offset,
requested_length: length,
});
}
let blob_state = self.mem.get_committed_blob_state();
let file_offset = blob_state.region_offset + meta.blob_ref.offset + offset;
#[allow(clippy::cast_possible_truncation)]
let data = self.mem.blob_read(file_offset, length as usize)?;
Ok(Some(data))
}
pub fn blob_reader(&self, blob_id: &BlobId) -> Result<Option<BlobReader>> {
let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(None);
};
let Some(guard) = btree.get(blob_id)? else {
return Ok(None);
};
let meta = guard.value();
let blob_state = self.mem.get_committed_blob_state();
let file_offset = blob_state.region_offset + meta.blob_ref.offset;
Ok(Some(BlobReader::new(
Arc::clone(&self.mem),
file_offset,
meta.blob_ref.length,
)))
}
pub fn dedup_stats(&self) -> Result<DedupStats> {
let Some(btree) = self.open_system_btree(BLOB_DEDUP_INDEX)? else {
return Ok(DedupStats {
total_dedup_entries: 0,
total_ref_count: 0,
bytes_saved: 0,
});
};
let mut total_dedup_entries: u64 = 0;
let mut total_ref_count: u64 = 0;
let mut bytes_saved: u64 = 0;
let range = btree.range::<RangeFull, Sha256Key>(&(..))?;
for entry in range {
let entry = entry?;
let val = entry.value();
total_dedup_entries += 1;
total_ref_count += u64::from(val.ref_count);
if val.ref_count > 1 {
bytes_saved += val.length * u64::from(val.ref_count - 1);
}
}
Ok(DedupStats {
total_dedup_entries,
total_ref_count,
bytes_saved,
})
}
pub fn blob_stats(&self) -> Result<BlobStats> {
let blob_state = self.mem.get_committed_blob_state();
let region_bytes = blob_state.region_length;
if region_bytes == 0 {
return Ok(BlobStats {
blob_count: 0,
live_bytes: 0,
region_bytes: 0,
dead_bytes: 0,
fragmentation_ratio: 0.0,
});
}
let Some(btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(BlobStats {
blob_count: 0,
live_bytes: 0,
region_bytes,
dead_bytes: region_bytes,
fragmentation_ratio: 1.0,
});
};
let mut blob_count: u64 = 0;
let mut unique_offsets = crate::compat::HashSet::new();
let mut live_bytes: u64 = 0;
let range = btree.range::<RangeFull, BlobId>(&(..))?;
for entry in range {
let entry = entry?;
let meta = entry.value();
blob_count += 1;
if unique_offsets.insert(meta.blob_ref.offset) {
live_bytes += meta.blob_ref.length;
}
}
let dead_bytes = region_bytes.saturating_sub(live_bytes);
#[allow(clippy::cast_precision_loss)]
let fragmentation_ratio = if region_bytes > 0 {
dead_bytes as f64 / region_bytes as f64
} else {
0.0
};
Ok(BlobStats {
blob_count,
live_bytes,
region_bytes,
dead_bytes,
fragmentation_ratio,
})
}
pub fn blobs_in_time_range(
&self,
start_ns: u64,
end_ns: u64,
) -> Result<Vec<(TemporalKey, BlobMeta)>> {
if start_ns > end_ns {
return Ok(Vec::new());
}
let Some(temporal_btree) = self.open_system_btree(BLOB_TEMPORAL_INDEX)? else {
return Ok(Vec::new());
};
let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(Vec::new());
};
let start = TemporalKey::range_start(start_ns);
let end = TemporalKey::range_end(end_ns);
let range = temporal_btree.range(&(start..=end))?;
let mut results = Vec::new();
for entry in range {
let entry = entry?;
let temporal_key = entry.key();
if let Some(meta_guard) = blob_btree.get(&temporal_key.blob_id)? {
results.push((temporal_key, meta_guard.value()));
}
}
Ok(results)
}
pub fn blobs_near(
&self,
reference: &BlobId,
window_ns: u64,
) -> Result<Vec<(TemporalKey, BlobMeta)>> {
let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(Vec::new());
};
let Some(guard) = blob_btree.get(reference)? else {
return Ok(Vec::new());
};
let ref_meta = guard.value();
let half_window = window_ns / 2;
let start_ns = ref_meta.wall_clock_ns.saturating_sub(half_window);
let end_ns = ref_meta.wall_clock_ns.saturating_add(half_window);
self.blobs_in_time_range(start_ns, end_ns)
}
pub fn causal_chain(
&self,
blob_id: &BlobId,
max_hops: usize,
) -> Result<Vec<(BlobId, BlobMeta, Option<CausalEdge>)>> {
let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(Vec::new());
};
let edges_btree = self.open_system_btree(BLOB_CAUSAL_EDGES)?;
let legacy_btree = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?;
let mut chain = Vec::new();
let mut current = *blob_id;
for _ in 0..=max_hops {
let Some(g) = blob_btree.get(¤t)? else {
break;
};
let meta = g.value();
let parent = meta.causal_parent;
let edge = if let Some(parent_id) = parent {
Self::lookup_causal_edge(
&parent_id,
¤t,
edges_btree.as_ref(),
legacy_btree.as_ref(),
)?
} else {
None
};
chain.push((current, meta, edge));
match parent {
Some(p) => current = p,
None => break,
}
}
Ok(chain)
}
pub fn causal_children(&self, blob_id: &BlobId) -> Result<Vec<CausalEdge>> {
if let Some(edges_btree) = self.open_system_btree(BLOB_CAUSAL_EDGES)? {
let start_key = CausalEdgeKey::new(*blob_id, BlobId::MIN);
let end_key = CausalEdgeKey::new(*blob_id, BlobId::MAX);
let mut children = Vec::new();
let range = edges_btree.range(&(start_key..=end_key))?;
for entry in range {
let entry = entry?;
children.push(entry.value());
}
if !children.is_empty() {
return Ok(children);
}
}
if let Some(legacy_btree) = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?
&& let Some(g) = legacy_btree.get(blob_id)?
{
return Ok(vec![CausalEdge::legacy(g.value())]);
}
Ok(Vec::new())
}
pub fn causal_path(
&self,
from: &BlobId,
to: &BlobId,
max_depth: usize,
) -> Result<Option<CausalPath>> {
if from == to {
return Ok(Some(vec![(*from, None)]));
}
let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(None);
};
let edges_btree = self.open_system_btree(BLOB_CAUSAL_EDGES)?;
let legacy_btree = self.open_system_btree(BLOB_CAUSAL_CHILDREN)?;
let mut path: CausalPath = Vec::new();
let mut current = *to;
for _ in 0..max_depth {
let Some(g) = blob_btree.get(¤t)? else {
break;
};
let meta = g.value();
let Some(parent) = meta.causal_parent else {
break;
};
let edge = Self::lookup_causal_edge(
&parent,
¤t,
edges_btree.as_ref(),
legacy_btree.as_ref(),
)?;
path.push((current, edge));
if parent == *from {
path.push((*from, None));
path.reverse();
return Ok(Some(path));
}
current = parent;
}
Ok(None)
}
fn lookup_causal_edge(
parent: &BlobId,
child: &BlobId,
edges_btree: Option<&Btree<CausalEdgeKey, CausalEdge>>,
legacy_btree: Option<&Btree<BlobId, BlobId>>,
) -> Result<Option<CausalEdge>> {
if let Some(bt) = edges_btree {
let key = CausalEdgeKey::new(*parent, *child);
if let Some(g) = bt.get(&key)? {
return Ok(Some(g.value()));
}
}
if let Some(bt) = legacy_btree
&& let Some(g) = bt.get(parent)?
{
return Ok(Some(CausalEdge::legacy(g.value())));
}
Ok(None)
}
pub fn blobs_by_tag(&self, tag: &str) -> Result<Vec<BlobId>> {
let Some(tag_btree) = self.open_system_btree(BLOB_TAG_INDEX)? else {
return Ok(Vec::new());
};
let start = TagKey::range_start(tag);
let end = TagKey::range_end(tag);
let range = tag_btree.range(&(start..=end))?;
let mut results = Vec::new();
for entry in range {
let entry = entry?;
results.push(entry.key().blob_id);
}
Ok(results)
}
pub fn blobs_in_namespace(&self, namespace: &str) -> Result<Vec<(BlobId, BlobMeta)>> {
let Some(ns_idx_btree) = self.open_system_btree(BLOB_NAMESPACE_INDEX)? else {
return Ok(Vec::new());
};
let Some(blob_btree) = self.open_system_btree(BLOB_TABLE)? else {
return Ok(Vec::new());
};
let start = NamespaceKey::range_start(namespace);
let end = NamespaceKey::range_end(namespace);
let range = ns_idx_btree.range(&(start..=end))?;
let mut results = Vec::new();
for entry in range {
let entry = entry?;
let blob_id = entry.key().blob_id;
if let Some(meta_guard) = blob_btree.get(&blob_id)? {
results.push((blob_id, meta_guard.value()));
}
}
Ok(results)
}
pub fn blobs_in_time_range_ns(
&self,
start_ns: u64,
end_ns: u64,
namespace: Option<&str>,
) -> Result<Vec<(TemporalKey, BlobMeta)>> {
let results = self.blobs_in_time_range(start_ns, end_ns)?;
let Some(ns) = namespace else {
return Ok(results);
};
let Some(ns_btree) = self.open_system_btree(BLOB_NAMESPACE)? else {
return Ok(Vec::new());
};
let mut filtered = Vec::new();
for (tk, meta) in results {
if let Some(ns_guard) = ns_btree.get(&tk.blob_id)?
&& ns_guard.value().namespace_str() == ns
{
filtered.push((tk, meta));
}
}
Ok(filtered)
}
pub fn blob_tags(&self, blob_id: &BlobId) -> Result<Vec<String>> {
let Some(tag_btree) = self.open_system_btree(BLOB_TAG_INDEX)? else {
return Ok(Vec::new());
};
let start = TagKey::new("", BlobId::MIN);
let end = TagKey {
tag: [0xFF; 32],
tag_len: 32,
blob_id: BlobId::MAX,
};
let range = tag_btree.range(&(start..=end))?;
let mut tags = Vec::new();
for entry in range {
let entry = entry?;
let key = entry.key();
if key.blob_id == *blob_id {
tags.push(key.tag_str().to_string());
}
}
Ok(tags)
}
pub fn blob_namespace(&self, blob_id: &BlobId) -> Result<Option<String>> {
let Some(ns_btree) = self.open_system_btree(BLOB_NAMESPACE)? else {
return Ok(None);
};
match ns_btree.get(blob_id)? {
Some(g) => Ok(Some(g.value().namespace_str().to_string())),
None => Ok(None),
}
}
pub fn read_cdc_since(&self, after_txn_id: u64) -> Result<Vec<ChangeStream>> {
let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
return Ok(Vec::new());
};
let start = CdcKey::new(after_txn_id.saturating_add(1), 0);
let end = CdcKey::new(u64::MAX, u32::MAX);
let range = btree.range::<core::ops::RangeInclusive<CdcKey>, CdcKey>(&(start..=end))?;
let mut results = Vec::new();
for entry in range {
let entry = entry?;
let record = CdcRecord::deserialize(entry.value_data())?;
results.push(ChangeStream::from_key_record(entry.key(), record));
}
Ok(results)
}
pub fn read_cdc_range(&self, start_txn: u64, end_txn: u64) -> Result<Vec<ChangeStream>> {
if start_txn > end_txn {
return Ok(Vec::new());
}
let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
return Ok(Vec::new());
};
let start = CdcKey::new(start_txn, 0);
let end = CdcKey::new(end_txn, u32::MAX);
let range = btree.range::<core::ops::RangeInclusive<CdcKey>, CdcKey>(&(start..=end))?;
let mut results = Vec::new();
for entry in range {
let entry = entry?;
let record = CdcRecord::deserialize(entry.value_data())?;
results.push(ChangeStream::from_key_record(entry.key(), record));
}
Ok(results)
}
pub fn cdc_cursor(&self, name: &str) -> Result<Option<u64>> {
let Some(btree) = self.open_system_btree(CDC_CURSOR_TABLE)? else {
return Ok(None);
};
match btree.get(&name)? {
Some(guard) => Ok(Some(guard.value())),
None => Ok(None),
}
}
pub fn latest_cdc_transaction_id(&self) -> Result<Option<u64>> {
let Some(btree) = self.open_system_btree(CDC_LOG_TABLE)? else {
return Ok(None);
};
let mut range = btree.range::<core::ops::RangeFull, CdcKey>(&(..))?;
match range.next_back() {
Some(entry) => {
let entry = entry?;
Ok(Some(entry.key().transaction_id))
}
None => Ok(None),
}
}
pub fn close(self) -> Result<(), TransactionError> {
if Arc::strong_count(self.tree.transaction_guard()) > 1 {
return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
}
Ok(())
}
}
impl Debug for ReadTransaction {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.write_str("ReadTransaction")
}
}
impl crate::storage_traits::StorageWrite for WriteTransaction {
type Table<'txn, K: Key + 'static, V: Value + 'static>
= Table<'txn, K, V>
where
Self: 'txn;
fn open_storage_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: TableDefinition<K, V>,
) -> Result<Self::Table<'_, K, V>> {
self.open_table(definition)
.map_err(|e| e.into_storage_error_or_corrupted("open_storage_table"))
}
}
impl crate::storage_traits::StorageRead for ReadTransaction {
type Table<'txn, K: Key + 'static, V: Value + 'static>
= ReadOnlyTable<K, V>
where
Self: 'txn;
fn open_storage_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: TableDefinition<K, V>,
) -> Result<Self::Table<'_, K, V>> {
self.open_table(definition)
.map_err(|e| e.into_storage_error_or_corrupted("open_storage_table"))
}
}
#[cfg(test)]
mod test {
use crate::{Database, TableDefinition};
const X: TableDefinition<&str, &str> = TableDefinition::new("x");
#[test]
fn transaction_id_persistence() {
let tmpfile = crate::create_tempfile();
let db = Database::create(tmpfile.path()).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert("hello", "world").unwrap();
}
let first_txn_id = write_txn.transaction_id;
write_txn.commit().unwrap();
drop(db);
let db2 = Database::create(tmpfile.path()).unwrap();
let write_txn = db2.begin_write().unwrap();
assert!(write_txn.transaction_id > first_txn_id);
}
}