use std::fmt;
use std::num::NonZeroU64;
use crate::encoding::{
append_u16_le, append_u32_le, append_u64_le, read_u16_le, read_u32_le, read_u64_le,
};
use crate::{ObjectId, PageData, PageNumber};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct TxnId(NonZeroU64);
impl TxnId {
pub const MAX_RAW: u64 = (1_u64 << 62) - 1;
#[inline]
pub const fn new(raw: u64) -> Option<Self> {
if raw > Self::MAX_RAW {
return None;
}
match NonZeroU64::new(raw) {
Some(nz) => Some(Self(nz)),
None => None,
}
}
#[inline]
pub const fn get(self) -> u64 {
self.0.get()
}
#[inline]
pub const fn checked_next(self) -> Option<Self> {
Self::new(self.get().wrapping_add(1))
}
}
impl fmt::Display for TxnId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "txn#{}", self.get())
}
}
impl TryFrom<u64> for TxnId {
type Error = InvalidTxnId;
fn try_from(value: u64) -> Result<Self, Self::Error> {
Self::new(value).ok_or(InvalidTxnId { raw: value })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InvalidTxnId {
raw: u64,
}
impl fmt::Display for InvalidTxnId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"invalid TxnId {} (must satisfy 1 <= id <= {})",
self.raw,
TxnId::MAX_RAW
)
}
}
impl std::error::Error for InvalidTxnId {}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct CommitSeq(u64);
impl CommitSeq {
pub const ZERO: Self = Self(0);
#[inline]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u64 {
self.0
}
#[inline]
#[must_use]
pub const fn next(self) -> Self {
Self(
self.0
.checked_add(1)
.expect("CommitSeq overflow after 2^64 commits"),
)
}
}
impl fmt::Display for CommitSeq {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "cs#{}", self.get())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct TxnEpoch(u32);
impl TxnEpoch {
#[inline]
pub const fn new(raw: u32) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u32 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct TxnToken {
pub id: TxnId,
pub epoch: TxnEpoch,
}
impl TxnToken {
#[inline]
pub const fn new(id: TxnId, epoch: TxnEpoch) -> Self {
Self { id, epoch }
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct SchemaEpoch(u64);
impl SchemaEpoch {
pub const ZERO: Self = Self(0);
#[inline]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
pub high: CommitSeq,
pub schema_epoch: SchemaEpoch,
}
impl Snapshot {
#[inline]
pub const fn new(high: CommitSeq, schema_epoch: SchemaEpoch) -> Self {
Self { high, schema_epoch }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct VersionPointer(u64);
impl VersionPointer {
#[inline]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageVersion {
pub pgno: PageNumber,
pub commit_seq: CommitSeq,
pub created_by: TxnToken,
pub data: PageData,
pub prev: Option<VersionPointer>,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
)]
pub enum OperatingMode {
#[default]
Compatibility,
Native,
}
impl fmt::Display for OperatingMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Compatibility => f.write_str("compatibility"),
Self::Native => f.write_str("native"),
}
}
}
impl OperatingMode {
#[must_use]
pub fn from_pragma(s: &str) -> Option<Self> {
let lower = s.trim().to_ascii_lowercase();
match lower.as_str() {
"compatibility" | "compat" => Some(Self::Compatibility),
"native" => Some(Self::Native),
_ => None,
}
}
#[must_use]
pub const fn is_native(self) -> bool {
matches!(self, Self::Native)
}
#[must_use]
pub const fn legacy_readers_allowed(self) -> bool {
matches!(self, Self::Compatibility)
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct CommitCapsule {
pub object_id: ObjectId,
pub snapshot_basis: CommitSeq,
pub intent_log: Vec<IntentOp>,
pub page_deltas: Vec<(PageNumber, Vec<u8>)>,
pub read_set_digest: [u8; 32],
pub write_set_digest: [u8; 32],
pub read_witness_refs: Vec<ObjectId>,
pub write_witness_refs: Vec<ObjectId>,
pub dependency_edge_refs: Vec<ObjectId>,
pub merge_witness_refs: Vec<ObjectId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct CommitMarker {
pub commit_seq: CommitSeq,
pub commit_time_unix_ns: u64,
pub capsule_object_id: ObjectId,
pub proof_object_id: ObjectId,
pub prev_marker: Option<ObjectId>,
pub integrity_hash: [u8; 16],
}
pub const COMMIT_MARKER_RECORD_V1_SIZE: usize = 88;
const COMMIT_MARKER_RECORD_VERSION: u8 = 1;
impl CommitMarker {
#[must_use]
pub fn to_record_bytes(&self) -> [u8; COMMIT_MARKER_RECORD_V1_SIZE] {
let mut buf = [0u8; COMMIT_MARKER_RECORD_V1_SIZE];
buf[0] = COMMIT_MARKER_RECORD_VERSION;
buf[1] = 0;
buf[2..10].copy_from_slice(&self.commit_seq.get().to_le_bytes());
buf[10..18].copy_from_slice(&self.commit_time_unix_ns.to_le_bytes());
buf[18..34].copy_from_slice(self.capsule_object_id.as_bytes());
buf[34..50].copy_from_slice(self.proof_object_id.as_bytes());
if let Some(prev) = self.prev_marker {
buf[50..66].copy_from_slice(prev.as_bytes());
}
buf[66] = u8::from(self.prev_marker.is_some());
buf[67..83].copy_from_slice(&self.integrity_hash);
buf
}
#[must_use]
pub fn from_record_bytes(data: &[u8; COMMIT_MARKER_RECORD_V1_SIZE]) -> Option<Self> {
if data[0] != COMMIT_MARKER_RECORD_VERSION {
return None;
}
let commit_seq = CommitSeq::new(u64::from_le_bytes(data[2..10].try_into().ok()?));
let commit_time_unix_ns = u64::from_le_bytes(data[10..18].try_into().ok()?);
let capsule_object_id = ObjectId::from_bytes(data[18..34].try_into().ok()?);
let proof_object_id = ObjectId::from_bytes(data[34..50].try_into().ok()?);
let has_prev = data[66] != 0;
let prev_marker = if has_prev {
Some(ObjectId::from_bytes(data[50..66].try_into().ok()?))
} else {
None
};
let mut integrity_hash = [0u8; 16];
integrity_hash.copy_from_slice(&data[67..83]);
Some(Self {
commit_seq,
commit_time_unix_ns,
capsule_object_id,
proof_object_id,
prev_marker,
integrity_hash,
})
}
#[must_use]
pub fn compute_integrity_hash(&self) -> [u8; 16] {
let mut buf = Vec::with_capacity(74);
append_u64_le(&mut buf, self.commit_seq.get());
append_u64_le(&mut buf, self.commit_time_unix_ns);
buf.extend_from_slice(self.capsule_object_id.as_bytes());
buf.extend_from_slice(self.proof_object_id.as_bytes());
if let Some(prev) = self.prev_marker {
buf.push(1);
buf.extend_from_slice(prev.as_bytes());
} else {
buf.push(0);
buf.extend_from_slice(&[0u8; 16]);
}
let hash128 = xxhash_rust::xxh3::xxh3_128(&buf);
hash128.to_le_bytes()
}
#[must_use]
pub fn new(
commit_seq: CommitSeq,
commit_time_unix_ns: u64,
capsule_object_id: ObjectId,
proof_object_id: ObjectId,
prev_marker: Option<ObjectId>,
) -> Self {
let mut marker = Self {
commit_seq,
commit_time_unix_ns,
capsule_object_id,
proof_object_id,
prev_marker,
integrity_hash: [0u8; 16],
};
marker.integrity_hash = marker.compute_integrity_hash();
marker
}
#[must_use]
pub fn verify_integrity(&self) -> bool {
self.integrity_hash == self.compute_integrity_hash()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct Oti {
pub f: u64,
pub al: u16,
pub t: u32,
pub z: u32,
pub n: u32,
}
pub const OTI_WIRE_SIZE: usize = 22;
impl Oti {
#[must_use]
pub fn to_bytes(self) -> [u8; OTI_WIRE_SIZE] {
let mut as_vec = Vec::with_capacity(OTI_WIRE_SIZE);
append_u64_le(&mut as_vec, self.f);
append_u16_le(&mut as_vec, self.al);
append_u32_le(&mut as_vec, self.t);
append_u32_le(&mut as_vec, self.z);
append_u32_le(&mut as_vec, self.n);
let mut buf = [0u8; OTI_WIRE_SIZE];
buf.copy_from_slice(&as_vec);
buf
}
#[must_use]
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.len() < OTI_WIRE_SIZE {
return None;
}
Some(Self {
f: read_u64_le(&data[0..8])?,
al: read_u16_le(&data[8..10])?,
t: read_u32_le(&data[10..14])?,
z: read_u32_le(&data[14..18])?,
n: read_u32_le(&data[18..22])?,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct DecodeProof {
pub object_id: ObjectId,
pub oti: Oti,
}
pub use crate::cx::{Budget, Cx};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub enum Outcome {
Ok,
Err,
Cancelled,
Panicked,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct EpochId(u64);
impl EpochId {
pub const ZERO: Self = Self(0);
#[inline]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u64 {
self.0
}
#[must_use]
pub const fn next(self) -> Option<Self> {
match self.0.checked_add(1) {
Some(val) => Some(Self(val)),
None => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct SymbolValidityWindow {
pub from_epoch: EpochId,
pub to_epoch: EpochId,
}
impl SymbolValidityWindow {
#[must_use]
pub const fn new(from_epoch: EpochId, to_epoch: EpochId) -> Self {
Self {
from_epoch,
to_epoch,
}
}
#[must_use]
pub const fn default_window(current_epoch: EpochId) -> Self {
Self {
from_epoch: EpochId::ZERO,
to_epoch: current_epoch,
}
}
#[must_use]
pub const fn contains(&self, epoch: EpochId) -> bool {
epoch.0 >= self.from_epoch.0 && epoch.0 <= self.to_epoch.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct RemoteCap([u8; 16]);
impl RemoteCap {
#[must_use]
pub const fn from_bytes(bytes: [u8; 16]) -> Self {
Self(bytes)
}
#[must_use]
pub const fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct SymbolAuthMasterKeyCap([u8; 32]);
impl SymbolAuthMasterKeyCap {
#[must_use]
pub const fn from_bytes(bytes: [u8; 32]) -> Self {
Self(bytes)
}
#[must_use]
pub const fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct IdempotencyKey([u8; 16]);
impl IdempotencyKey {
#[must_use]
pub const fn from_bytes(bytes: [u8; 16]) -> Self {
Self(bytes)
}
#[must_use]
pub const fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct Saga {
pub key: IdempotencyKey,
}
impl IdempotencyKey {
#[must_use]
pub fn derive(ecs_epoch: u64, request_bytes: &[u8]) -> Self {
let mut hasher = blake3::Hasher::new();
hasher.update(b"fsqlite:idempotency:v1");
hasher.update(&ecs_epoch.to_le_bytes());
hasher.update(request_bytes);
let digest = hasher.finalize();
let mut out = [0_u8; 16];
out.copy_from_slice(&digest.as_bytes()[..16]);
Self(out)
}
}
impl Saga {
#[must_use]
pub const fn new(key: IdempotencyKey) -> Self {
Self { key }
}
#[must_use]
pub const fn key(self) -> IdempotencyKey {
self.key
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct Region(u32);
impl Region {
#[inline]
pub const fn new(raw: u32) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u32 {
self.0
}
}
#[derive(
Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub enum WitnessKey {
Page(PageNumber),
Cell {
btree_root: PageNumber,
leaf_page: PageNumber,
tag: u64,
},
ByteRange {
page: PageNumber,
start: u32,
len: u32,
},
KeyRange {
btree_root: PageNumber,
lo: Vec<u8>,
hi: Vec<u8>,
},
Custom { namespace: u32, bytes: Vec<u8> },
}
impl WitnessKey {
#[must_use]
pub fn cell_tag(btree_root: PageNumber, canonical_key_bytes: &[u8]) -> u64 {
use xxhash_rust::xxh3::xxh3_64;
let mut buf =
Vec::with_capacity(b"fsqlite:witness:cell:v1".len() + 4 + canonical_key_bytes.len());
buf.extend_from_slice(b"fsqlite:witness:cell:v1");
buf.extend_from_slice(&btree_root.get().to_le_bytes());
buf.extend_from_slice(canonical_key_bytes);
xxh3_64(&buf)
}
#[must_use]
pub fn for_cell_read(
btree_root: PageNumber,
leaf_page: PageNumber,
canonical_key_bytes: &[u8],
) -> Self {
Self::Cell {
btree_root,
leaf_page,
tag: Self::cell_tag(btree_root, canonical_key_bytes),
}
}
#[must_use]
pub fn for_range_scan(leaf_pages: &[PageNumber]) -> Vec<Self> {
leaf_pages.iter().copied().map(Self::Page).collect()
}
#[must_use]
pub fn for_point_write(
btree_root: PageNumber,
canonical_key_bytes: &[u8],
leaf_pgno: PageNumber,
) -> (Self, Self) {
let cell = Self::Cell {
btree_root,
leaf_page: leaf_pgno,
tag: Self::cell_tag(btree_root, canonical_key_bytes),
};
let page = Self::Page(leaf_pgno);
(cell, page)
}
#[must_use]
pub const fn page_number(&self) -> Option<PageNumber> {
match self {
Self::Page(page) | Self::ByteRange { page, .. } => Some(*page),
Self::Cell { btree_root, .. } | Self::KeyRange { btree_root, .. } => Some(*btree_root),
Self::Custom { .. } => None,
}
}
#[must_use]
pub fn is_page(&self) -> bool {
matches!(self, Self::Page(_))
}
#[must_use]
pub fn is_cell(&self) -> bool {
matches!(self, Self::Cell { .. })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct RangeKey {
pub level: u8,
pub hash_prefix: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct ReadWitness {
pub txn: TxnId,
pub key: WitnessKey,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct WriteWitness {
pub txn: TxnId,
pub key: WitnessKey,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct WitnessIndexSegment {
pub epoch: EpochId,
pub reads: Vec<ReadWitness>,
pub writes: Vec<WriteWitness>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct DependencyEdge {
pub from: TxnId,
pub to: TxnId,
pub key_basis: WitnessKey,
pub observed_by: TxnId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CommitProof {
pub commit_seq: CommitSeq,
pub edges: Vec<DependencyEdge>,
pub evidence_refs: Vec<ObjectId>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct TableId(u32);
impl TableId {
#[inline]
pub const fn new(raw: u32) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u32 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct IndexId(u32);
impl IndexId {
#[inline]
pub const fn new(raw: u32) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u32 {
self.0
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct RowId(i64);
impl RowId {
pub const MAX: Self = Self(i64::MAX);
#[inline]
pub const fn new(raw: i64) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> i64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RowIdMode {
Normal,
AutoIncrement,
}
#[derive(Debug, Clone)]
pub struct RowIdAllocator {
mode: RowIdMode,
sequence_high_water: i64,
}
impl RowIdAllocator {
pub const fn new(mode: RowIdMode) -> Self {
Self {
mode,
sequence_high_water: 0,
}
}
pub fn allocate(&mut self, max_existing: Option<RowId>) -> Result<RowId, RowIdExhausted> {
let max_val = max_existing.map_or(0, RowId::get);
match self.mode {
RowIdMode::Normal => {
if max_val < i64::MAX {
Ok(RowId::new(max_val + 1))
} else {
Err(RowIdExhausted)
}
}
RowIdMode::AutoIncrement => {
let base = max_val.max(self.sequence_high_water);
if base == i64::MAX {
return Err(RowIdExhausted);
}
let next = base + 1;
self.sequence_high_water = next;
Ok(RowId::new(next))
}
}
}
pub const fn sequence_high_water(&self) -> i64 {
self.sequence_high_water
}
pub fn set_sequence_high_water(&mut self, val: i64) {
self.sequence_high_water = val;
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RowIdExhausted;
impl std::fmt::Display for RowIdExhausted {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("database or object is full (rowid exhausted)")
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct ColumnIdx(u32);
impl ColumnIdx {
#[inline]
pub const fn new(raw: u32) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u32 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum BtreeRef {
Table(TableId),
Index(IndexId),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum SemanticKeyKind {
TableRow,
IndexEntry,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct SemanticKeyRef {
pub btree: BtreeRef,
pub kind: SemanticKeyKind,
pub key_digest: [u8; 16],
}
impl SemanticKeyRef {
const DOMAIN_SEP: &'static [u8] = b"fsqlite:btree:key:v1";
#[must_use]
pub fn compute_digest(
kind: SemanticKeyKind,
btree: BtreeRef,
canonical_key_bytes: &[u8],
) -> [u8; 16] {
let mut hasher = blake3::Hasher::new();
hasher.update(Self::DOMAIN_SEP);
hasher.update(&[match kind {
SemanticKeyKind::TableRow => 0,
SemanticKeyKind::IndexEntry => 1,
}]);
match btree {
BtreeRef::Table(id) => {
hasher.update(&[0]);
hasher.update(&id.get().to_le_bytes());
}
BtreeRef::Index(id) => {
hasher.update(&[1]);
hasher.update(&id.get().to_le_bytes());
}
}
hasher.update(canonical_key_bytes);
let hash = hasher.finalize();
let bytes = hash.as_bytes();
let mut digest = [0u8; 16];
digest.copy_from_slice(&bytes[..16]);
digest
}
#[must_use]
pub fn new(btree: BtreeRef, kind: SemanticKeyKind, canonical_key_bytes: &[u8]) -> Self {
let key_digest = Self::compute_digest(kind, btree, canonical_key_bytes);
Self {
btree,
kind,
key_digest,
}
}
}
bitflags::bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct StructuralEffects: u32 {
const NONE = 0;
const PAGE_SPLIT = 1;
const PAGE_MERGE = 2;
const BALANCE_MULTI_PAGE = 4;
const OVERFLOW_ALLOC = 8;
const OVERFLOW_MUTATE = 16;
const FREELIST_MUTATE = 32;
const POINTER_MAP_MUTATE = 64;
const DEFRAG_MOVE_CELLS = 128;
}
}
impl serde::Serialize for StructuralEffects {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.bits().serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for StructuralEffects {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let bits = u32::deserialize(deserializer)?;
Self::from_bits(bits).ok_or_else(|| {
serde::de::Error::custom(format!("invalid StructuralEffects bits: {bits:#x}"))
})
}
}
impl Default for StructuralEffects {
fn default() -> Self {
Self::NONE
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct IntentFootprint {
pub reads: Vec<SemanticKeyRef>,
pub writes: Vec<SemanticKeyRef>,
pub structural: StructuralEffects,
}
impl IntentFootprint {
#[must_use]
pub fn empty() -> Self {
Self {
reads: Vec::new(),
writes: Vec::new(),
structural: StructuralEffects::NONE,
}
}
}
impl Default for IntentFootprint {
fn default() -> Self {
Self::empty()
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum RebaseExpr {
ColumnRef(ColumnIdx),
Literal(crate::SqliteValue),
UnaryOp {
op: RebaseUnaryOp,
operand: Box<Self>,
},
BinaryOp {
op: RebaseBinaryOp,
left: Box<Self>,
right: Box<Self>,
},
FunctionCall { name: String, args: Vec<Self> },
Cast { expr: Box<Self>, type_name: String },
Case {
operand: Option<Box<Self>>,
when_clauses: Vec<(Self, Self)>,
else_clause: Option<Box<Self>>,
},
Coalesce(Vec<Self>),
NullIf { left: Box<Self>, right: Box<Self> },
Concat { left: Box<Self>, right: Box<Self> },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum RebaseUnaryOp {
Negate,
BitwiseNot,
Not,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum RebaseBinaryOp {
Add,
Subtract,
Multiply,
Divide,
Remainder,
BitwiseAnd,
BitwiseOr,
ShiftLeft,
ShiftRight,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum IntentOpKind {
Insert {
table: TableId,
key: RowId,
record: Vec<u8>,
},
Delete {
table: TableId,
key: RowId,
},
Update {
table: TableId,
key: RowId,
new_record: Vec<u8>,
},
IndexInsert {
index: IndexId,
key: Vec<u8>,
rowid: RowId,
},
IndexDelete {
index: IndexId,
key: Vec<u8>,
rowid: RowId,
},
UpdateExpression {
table: TableId,
key: RowId,
column_updates: Vec<(ColumnIdx, RebaseExpr)>,
},
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct IntentOp {
pub schema_epoch: u64,
pub footprint: IntentFootprint,
pub op: IntentOpKind,
}
pub type IntentLog = Vec<IntentOp>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageHistory {
pub pgno: PageNumber,
pub versions: Vec<PageVersion>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct ArcCache;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RootManifest {
pub schema_epoch: SchemaEpoch,
pub root_page: PageNumber,
pub ecs_epoch: EpochId,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct TxnSlot(u32);
impl TxnSlot {
#[inline]
pub const fn new(raw: u32) -> Self {
Self(raw)
}
#[inline]
pub const fn get(self) -> u32 {
self.0
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::time::Duration;
use proptest::prelude::*;
use crate::PayloadHash;
use super::*;
#[test]
fn test_txn_id_nonzero_enforced() {
assert!(TxnId::new(0).is_none());
assert!(TxnId::try_from(0_u64).is_err());
assert!(TxnId::new(1).is_some());
assert!(TxnId::new(TxnId::MAX_RAW).is_some());
}
#[test]
fn test_txn_id_62_bit_max() {
assert!(TxnId::new(TxnId::MAX_RAW + 1).is_none());
assert!(TxnId::try_from(TxnId::MAX_RAW + 1).is_err());
}
#[test]
fn test_object_id_16_bytes_blake3_truncation() {
let header = b"hdr:v1";
let payload = b"payload";
let oid = ObjectId::derive(header, PayloadHash::blake3(payload));
assert_eq!(oid.as_bytes().len(), ObjectId::LEN);
}
#[test]
fn test_object_id_content_addressed() {
let header = b"hdr:v1";
let payload = b"payload";
let a = ObjectId::derive(header, PayloadHash::blake3(payload));
let b = ObjectId::derive(header, PayloadHash::blake3(payload));
assert_eq!(a, b);
let c = ObjectId::derive(header, PayloadHash::blake3(b"payload2"));
assert_ne!(a, c);
}
#[test]
fn prop_object_id_collision_resistance() {
let header = b"hdr:v1";
let mut ids = HashSet::<ObjectId>::with_capacity(10_000);
let mut state: u64 = 0xD6E8_FEB8_6659_FD93;
for i in 0..10_000_u64 {
state = state
.wrapping_mul(6_364_136_223_846_793_005_u64)
.wrapping_add(1_442_695_040_888_963_407_u64);
let mut payload = [0_u8; 32];
payload[..8].copy_from_slice(&i.to_le_bytes());
payload[8..16].copy_from_slice(&state.to_le_bytes());
payload[16..24].copy_from_slice(&state.rotate_left(17).to_le_bytes());
payload[24..32].copy_from_slice(&state.rotate_left(41).to_le_bytes());
let oid = ObjectId::derive(header, PayloadHash::blake3(&payload));
assert!(ids.insert(oid), "ObjectId collision at i={i}");
}
}
#[test]
fn test_snapshot_fields() {
let snap = Snapshot::new(CommitSeq::new(7), SchemaEpoch::new(9));
assert_eq!(snap.high.get(), 7);
assert_eq!(snap.schema_epoch.get(), 9);
}
#[test]
fn test_oti_field_widths_allow_large_symbol_size() {
let oti = Oti {
f: 1,
al: 4,
t: 65_536,
z: 1,
n: 1,
};
assert_eq!(oti.t, 65_536);
}
#[test]
fn test_budget_product_lattice_semantics() {
let a = Budget {
deadline: Some(Duration::from_millis(100)),
poll_quota: 10,
cost_quota: Some(500),
priority: 1,
};
let b = Budget {
deadline: Some(Duration::from_millis(50)),
poll_quota: 20,
cost_quota: Some(400),
priority: 9,
};
let c = a.meet(b);
assert_eq!(c.deadline, Some(Duration::from_millis(50)));
assert_eq!(c.poll_quota, 10);
assert_eq!(c.cost_quota, Some(400));
assert_eq!(c.priority, 9);
}
#[test]
fn test_outcome_ordering_lattice() {
assert!(Outcome::Ok < Outcome::Err);
assert!(Outcome::Err < Outcome::Cancelled);
assert!(Outcome::Cancelled < Outcome::Panicked);
}
#[test]
fn test_witness_key_variants_exhaustive() {
let pn = PageNumber::new(1).unwrap();
let a = WitnessKey::Page(pn);
let b = WitnessKey::Cell {
btree_root: pn,
leaf_page: pn,
tag: 7,
};
let c = WitnessKey::ByteRange {
page: pn,
start: 0,
len: 16,
};
assert!(matches!(a, WitnessKey::Page(_)));
assert!(matches!(b, WitnessKey::Cell { .. }));
assert!(matches!(c, WitnessKey::ByteRange { .. }));
}
#[test]
fn test_all_glossary_types_derive_debug_clone() {
fn assert_debug_clone<T: fmt::Debug + Clone>() {}
assert_debug_clone::<TxnId>();
assert_debug_clone::<CommitSeq>();
assert_debug_clone::<TxnEpoch>();
assert_debug_clone::<TxnToken>();
assert_debug_clone::<SchemaEpoch>();
assert_debug_clone::<Snapshot>();
assert_debug_clone::<VersionPointer>();
assert_debug_clone::<PageVersion>();
assert_debug_clone::<ObjectId>();
assert_debug_clone::<CommitCapsule>();
assert_debug_clone::<CommitMarker>();
assert_debug_clone::<Oti>();
assert_debug_clone::<DecodeProof>();
assert_debug_clone::<Cx<crate::cx::ComputeCaps>>();
assert_debug_clone::<Budget>();
assert_debug_clone::<Outcome>();
assert_debug_clone::<EpochId>();
assert_debug_clone::<SymbolValidityWindow>();
assert_debug_clone::<RemoteCap>();
assert_debug_clone::<SymbolAuthMasterKeyCap>();
assert_debug_clone::<IdempotencyKey>();
assert_debug_clone::<Saga>();
assert_debug_clone::<Region>();
assert_debug_clone::<WitnessKey>();
assert_debug_clone::<RangeKey>();
assert_debug_clone::<ReadWitness>();
assert_debug_clone::<WriteWitness>();
assert_debug_clone::<WitnessIndexSegment>();
assert_debug_clone::<DependencyEdge>();
assert_debug_clone::<CommitProof>();
assert_debug_clone::<TableId>();
assert_debug_clone::<IndexId>();
assert_debug_clone::<RowId>();
assert_debug_clone::<ColumnIdx>();
assert_debug_clone::<BtreeRef>();
assert_debug_clone::<SemanticKeyKind>();
assert_debug_clone::<SemanticKeyRef>();
assert_debug_clone::<StructuralEffects>();
assert_debug_clone::<IntentFootprint>();
assert_debug_clone::<RebaseExpr>();
assert_debug_clone::<RebaseUnaryOp>();
assert_debug_clone::<RebaseBinaryOp>();
assert_debug_clone::<IntentOpKind>();
assert_debug_clone::<IntentOp>();
assert_debug_clone::<PageHistory>();
assert_debug_clone::<ArcCache>();
assert_debug_clone::<RootManifest>();
assert_debug_clone::<TxnSlot>();
assert_debug_clone::<OperatingMode>();
}
#[test]
fn test_remote_cap_from_bytes_roundtrip() {
let raw = [0xAB_u8; 16];
let cap = RemoteCap::from_bytes(raw);
assert_eq!(cap.as_bytes(), &raw);
}
#[test]
fn test_idempotency_key_derivation_is_deterministic() {
let req = b"fetch:object=42";
let a = IdempotencyKey::derive(7, req);
let b = IdempotencyKey::derive(7, req);
let c = IdempotencyKey::derive(8, req);
assert_eq!(a, b);
assert_ne!(a, c);
}
#[test]
fn test_remote_cap_roundtrip() {
let raw = [0xAB_u8; 16];
let cap = RemoteCap::from_bytes(raw);
assert_eq!(cap.as_bytes(), &raw);
}
#[test]
fn test_symbol_auth_master_key_cap_roundtrip() {
let raw = [0xCD_u8; 32];
let cap = SymbolAuthMasterKeyCap::from_bytes(raw);
assert_eq!(cap.as_bytes(), &raw);
}
#[test]
fn test_idempotency_key_roundtrip() {
let raw = [0x11_u8; 16];
let key = IdempotencyKey::from_bytes(raw);
assert_eq!(key.as_bytes(), &raw);
}
#[test]
fn test_saga_constructor() {
let key = IdempotencyKey::from_bytes([0x22_u8; 16]);
let saga = Saga::new(key);
assert_eq!(saga.key(), key);
}
fn arb_budget() -> impl Strategy<Value = Budget> {
(
prop::option::of(any::<u64>()),
any::<u32>(),
prop::option::of(any::<u64>()),
any::<u8>(),
)
.prop_map(|(deadline_ms, poll_quota, cost_quota, priority)| Budget {
deadline: deadline_ms.map(Duration::from_millis),
poll_quota,
cost_quota,
priority,
})
}
proptest! {
#[test]
fn prop_budget_combine_associative(a in arb_budget(), b in arb_budget(), c in arb_budget()) {
prop_assert_eq!(a.meet(b).meet(c), a.meet(b.meet(c)));
}
#[test]
fn prop_budget_combine_commutative(a in arb_budget(), b in arb_budget()) {
prop_assert_eq!(a.meet(b), b.meet(a));
}
}
#[test]
fn test_rowid_reuse_without_autoincrement() {
let mut alloc = RowIdAllocator::new(RowIdMode::Normal);
let r = alloc.allocate(Some(RowId::new(5))).unwrap();
assert_eq!(r.get(), 6);
let r = alloc.allocate(Some(RowId::new(3))).unwrap();
assert_eq!(r.get(), 4);
}
#[test]
fn test_autoincrement_no_reuse() {
let mut alloc = RowIdAllocator::new(RowIdMode::AutoIncrement);
let r = alloc.allocate(Some(RowId::new(5))).unwrap();
assert_eq!(r.get(), 6);
let r = alloc.allocate(Some(RowId::new(3))).unwrap();
assert_eq!(r.get(), 7);
}
#[test]
fn test_sqlite_sequence_updates() {
let mut alloc = RowIdAllocator::new(RowIdMode::AutoIncrement);
assert_eq!(alloc.sequence_high_water(), 0);
let _ = alloc.allocate(Some(RowId::new(10))).unwrap();
assert_eq!(alloc.sequence_high_water(), 11);
alloc.set_sequence_high_water(100);
let r = alloc.allocate(Some(RowId::new(50))).unwrap();
assert_eq!(r.get(), 101);
assert_eq!(alloc.sequence_high_water(), 101);
}
#[test]
fn test_max_rowid_exhausted_autoincrement() {
let mut alloc = RowIdAllocator::new(RowIdMode::AutoIncrement);
let result = alloc.allocate(Some(RowId::MAX));
assert!(result.is_err());
}
#[test]
fn test_max_rowid_exhausted_normal() {
let mut alloc = RowIdAllocator::new(RowIdMode::Normal);
let result = alloc.allocate(Some(RowId::MAX));
assert!(result.is_err());
}
#[test]
fn test_rowid_allocate_empty_table() {
let mut alloc = RowIdAllocator::new(RowIdMode::Normal);
let r = alloc.allocate(None).unwrap();
assert_eq!(r.get(), 1);
let mut alloc = RowIdAllocator::new(RowIdMode::AutoIncrement);
let r = alloc.allocate(None).unwrap();
assert_eq!(r.get(), 1);
}
#[test]
fn test_intent_op_all_variants_encode_decode_roundtrip() {
use crate::SqliteValue;
let variants: Vec<IntentOpKind> = vec![
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(100),
record: vec![0x01, 0x02, 0x03],
},
IntentOpKind::Delete {
table: TableId::new(2),
key: RowId::new(200),
},
IntentOpKind::Update {
table: TableId::new(3),
key: RowId::new(300),
new_record: vec![0x04, 0x05],
},
IntentOpKind::IndexInsert {
index: IndexId::new(10),
key: vec![0xAA, 0xBB],
rowid: RowId::new(400),
},
IntentOpKind::IndexDelete {
index: IndexId::new(11),
key: vec![0xCC],
rowid: RowId::new(500),
},
IntentOpKind::UpdateExpression {
table: TableId::new(4),
key: RowId::new(600),
column_updates: vec![
(
ColumnIdx::new(0),
RebaseExpr::BinaryOp {
op: RebaseBinaryOp::Add,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(0))),
right: Box::new(RebaseExpr::Literal(SqliteValue::Integer(1))),
},
),
(
ColumnIdx::new(2),
RebaseExpr::Coalesce(vec![
RebaseExpr::ColumnRef(ColumnIdx::new(2)),
RebaseExpr::Literal(SqliteValue::Integer(0)),
]),
),
],
},
];
for variant in &variants {
let op = IntentOp {
schema_epoch: 42,
footprint: IntentFootprint::empty(),
op: variant.clone(),
};
let json = serde_json::to_string(&op).expect("serialize must succeed");
let decoded: IntentOp = serde_json::from_str(&json).expect("deserialize must succeed");
assert_eq!(decoded, op, "roundtrip failed for variant: {variant:?}");
}
}
#[test]
fn test_semantic_key_ref_digest_stable() {
let table = BtreeRef::Table(TableId::new(42));
let key_bytes = b"canonical_key_data";
let d1 = SemanticKeyRef::compute_digest(SemanticKeyKind::TableRow, table, key_bytes);
let d2 = SemanticKeyRef::compute_digest(SemanticKeyKind::TableRow, table, key_bytes);
assert_eq!(d1, d2, "digest must be stable across calls");
let skr = SemanticKeyRef::new(table, SemanticKeyKind::TableRow, key_bytes);
assert_eq!(skr.key_digest, d1);
let d3 = SemanticKeyRef::compute_digest(SemanticKeyKind::TableRow, table, b"different_key");
assert_ne!(d1, d3);
let d4 = SemanticKeyRef::compute_digest(SemanticKeyKind::IndexEntry, table, key_bytes);
assert_ne!(d1, d4);
let index = BtreeRef::Index(IndexId::new(42));
let d5 = SemanticKeyRef::compute_digest(SemanticKeyKind::TableRow, index, key_bytes);
assert_ne!(d1, d5);
assert_eq!(d1.len(), 16);
}
#[test]
fn test_structural_effects_bitflags() {
assert_eq!(StructuralEffects::NONE.bits(), 0);
assert!(StructuralEffects::NONE.is_empty());
let leaf = StructuralEffects::NONE;
assert!(!leaf.contains(StructuralEffects::PAGE_SPLIT));
assert!(!leaf.contains(StructuralEffects::FREELIST_MUTATE));
let split_overflow = StructuralEffects::PAGE_SPLIT | StructuralEffects::OVERFLOW_ALLOC;
assert!(split_overflow.contains(StructuralEffects::PAGE_SPLIT));
assert!(split_overflow.contains(StructuralEffects::OVERFLOW_ALLOC));
assert!(!split_overflow.contains(StructuralEffects::PAGE_MERGE));
let all = StructuralEffects::PAGE_SPLIT
| StructuralEffects::PAGE_MERGE
| StructuralEffects::BALANCE_MULTI_PAGE
| StructuralEffects::OVERFLOW_ALLOC
| StructuralEffects::OVERFLOW_MUTATE
| StructuralEffects::FREELIST_MUTATE
| StructuralEffects::POINTER_MAP_MUTATE
| StructuralEffects::DEFRAG_MOVE_CELLS;
assert!(all.contains(StructuralEffects::FREELIST_MUTATE));
assert!(all.contains(StructuralEffects::DEFRAG_MOVE_CELLS));
let json = serde_json::to_string(&split_overflow).expect("serialize");
let decoded: StructuralEffects = serde_json::from_str(&json).expect("deserialize");
assert_eq!(decoded, split_overflow);
}
#[test]
fn test_rowid_allocator_monotone_no_collision() {
let mut alloc = RowIdAllocator::new(RowIdMode::Normal);
let mut ids: Vec<RowId> = Vec::new();
for _ in 0..5 {
let max_existing = ids.last().copied();
let r = alloc.allocate(max_existing).unwrap();
ids.push(r);
}
for _ in 0..5 {
let max_existing = ids.last().copied();
let r = alloc.allocate(max_existing).unwrap();
ids.push(r);
}
let raw_ids: Vec<i64> = ids.iter().map(|r| r.get()).collect();
for window in raw_ids.windows(2) {
assert!(
window[1] > window[0],
"RowIds must be strictly monotonically increasing: {} <= {}",
window[0],
window[1]
);
}
let unique: HashSet<i64> = raw_ids.iter().copied().collect();
assert_eq!(unique.len(), raw_ids.len(), "RowIds must be disjoint");
}
#[test]
fn test_rowid_allocator_bump_on_explicit_rowid() {
let mut alloc = RowIdAllocator::new(RowIdMode::AutoIncrement);
let r1 = alloc.allocate(None).unwrap();
assert_eq!(r1.get(), 1);
alloc.set_sequence_high_water(1000);
let r2 = alloc.allocate(Some(RowId::new(999))).unwrap();
assert!(
r2.get() >= 1001,
"allocator must bump past explicit rowid 1000, got {}",
r2.get()
);
let r3 = alloc.allocate(Some(r2)).unwrap();
assert!(r3.get() > r2.get());
}
}