use std::fmt;
use tracing::{debug, info, warn};
use fsqlite_types::{CommitSeq, PageNumber, SchemaEpoch, Snapshot};
use crate::VersionIdx;
use crate::core_types::CommitLog;
use crate::invariants::VersionStore;
use crate::witness_publication::CommitMarkerStore;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TimeTravelError {
HistoryNotRetained {
requested: CommitSeq,
gc_horizon: CommitSeq,
},
CommitSeqNotFound { requested: CommitSeq },
TimestampNotResolvable { target_unix_ns: u64 },
ReadOnlyViolation { attempted_op: &'static str },
DdlBlocked { attempted_op: &'static str },
EmptyCommitLog,
}
impl fmt::Display for TimeTravelError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::HistoryNotRetained {
requested,
gc_horizon,
} => write!(
f,
"history not retained: requested commit_seq={} but gc_horizon={}",
requested.get(),
gc_horizon.get()
),
Self::CommitSeqNotFound { requested } => {
write!(f, "commit_seq {} not found in commit log", requested.get())
}
Self::TimestampNotResolvable { target_unix_ns } => {
write!(
f,
"no commit marker found for timestamp_unix_ns={target_unix_ns}"
)
}
Self::ReadOnlyViolation { attempted_op } => {
write!(
f,
"time-travel queries are read-only: {attempted_op} is not permitted"
)
}
Self::DdlBlocked { attempted_op } => {
write!(
f,
"DDL not permitted in time-travel context: {attempted_op}"
)
}
Self::EmptyCommitLog => write!(f, "commit log is empty; no historical state"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimeTravelTarget {
CommitSequence(CommitSeq),
TimestampUnixNs(u64),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimeTravelSnapshot {
snapshot: Snapshot,
target_commit_seq: CommitSeq,
read_only: bool,
}
impl TimeTravelSnapshot {
#[must_use]
pub fn new_for_commit_seq(target_commit_seq: CommitSeq, schema_epoch: SchemaEpoch) -> Self {
Self {
snapshot: Snapshot::new(target_commit_seq, schema_epoch),
target_commit_seq,
read_only: true,
}
}
#[must_use]
pub fn snapshot(&self) -> &Snapshot {
&self.snapshot
}
#[must_use]
pub fn target_commit_seq(&self) -> CommitSeq {
self.target_commit_seq
}
#[must_use]
pub fn is_read_only(&self) -> bool {
self.read_only
}
pub fn check_dml(&self, op: &'static str) -> Result<(), TimeTravelError> {
Err(TimeTravelError::ReadOnlyViolation { attempted_op: op })
}
pub fn check_ddl(&self, op: &'static str) -> Result<(), TimeTravelError> {
Err(TimeTravelError::DdlBlocked { attempted_op: op })
}
#[must_use]
pub fn resolve_page(
&self,
version_store: &VersionStore,
page: PageNumber,
) -> Option<VersionIdx> {
version_store.resolve(page, &self.snapshot)
}
}
pub fn resolve_timestamp_via_markers(
marker_store: &CommitMarkerStore,
target_unix_ns: u64,
) -> Result<CommitSeq, TimeTravelError> {
debug!(
target_unix_ns,
"resolving timestamp to commit_seq via marker store"
);
if let Some(seq) = marker_store.resolve_seq_at_or_before_timestamp(target_unix_ns) {
info!(
commit_seq = seq.get(),
target_unix_ns, "timestamp resolved to commit_seq via marker store"
);
Ok(seq)
} else {
warn!(
target_unix_ns,
"no commit marker found at or before target timestamp"
);
Err(TimeTravelError::TimestampNotResolvable { target_unix_ns })
}
}
pub fn resolve_timestamp_via_commit_log(
commit_log: &CommitLog,
target_unix_ns: u64,
) -> Result<CommitSeq, TimeTravelError> {
debug!(
target_unix_ns,
"resolving timestamp to commit_seq via commit log"
);
if commit_log.is_empty() {
return Err(TimeTravelError::EmptyCommitLog);
}
let latest = commit_log
.latest_seq()
.ok_or(TimeTravelError::EmptyCommitLog)?;
let first_seq = CommitSeq::new(latest.get() + 1 - commit_log.len() as u64);
let mut lo = 0_u64;
let mut hi = commit_log.len() as u64;
let mut result_seq: Option<CommitSeq> = None;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let seq = CommitSeq::new(first_seq.get() + mid);
if let Some(record) = commit_log.get(seq) {
if record.timestamp_unix_ns <= target_unix_ns {
result_seq = Some(seq);
lo = mid + 1;
} else {
hi = mid;
}
} else {
hi = mid;
}
}
if let Some(seq) = result_seq {
info!(
commit_seq = seq.get(),
target_unix_ns, "timestamp resolved to commit_seq"
);
Ok(seq)
} else {
warn!(
target_unix_ns,
"no commit record found at or before target timestamp"
);
Err(TimeTravelError::TimestampNotResolvable { target_unix_ns })
}
}
pub fn create_time_travel_snapshot(
target: TimeTravelTarget,
commit_log: &CommitLog,
gc_horizon: CommitSeq,
schema_epoch: SchemaEpoch,
) -> Result<TimeTravelSnapshot, TimeTravelError> {
let target_seq = match target {
TimeTravelTarget::CommitSequence(seq) => seq,
TimeTravelTarget::TimestampUnixNs(ts) => resolve_timestamp_via_commit_log(commit_log, ts)?,
};
if commit_log.get(target_seq).is_none() {
return Err(TimeTravelError::CommitSeqNotFound {
requested: target_seq,
});
}
if target_seq < gc_horizon {
return Err(TimeTravelError::HistoryNotRetained {
requested: target_seq,
gc_horizon,
});
}
info!(
target_commit_seq = target_seq.get(),
gc_horizon = gc_horizon.get(),
"time-travel snapshot created"
);
Ok(TimeTravelSnapshot {
snapshot: Snapshot::new(target_seq, schema_epoch),
target_commit_seq: target_seq,
read_only: true,
})
}
pub fn resolve_page_at_commit(
version_store: &VersionStore,
commit_log: &CommitLog,
page: PageNumber,
target: TimeTravelTarget,
gc_horizon: CommitSeq,
schema_epoch: SchemaEpoch,
) -> Result<Option<VersionIdx>, TimeTravelError> {
let tt_snapshot = create_time_travel_snapshot(target, commit_log, gc_horizon, schema_epoch)?;
Ok(tt_snapshot.resolve_page(version_store, page))
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::{
CommitMarker, CommitSeq, ObjectId, PageData, PageNumber, PageSize, PageVersion,
SchemaEpoch, TxnEpoch, TxnId, TxnToken, VersionPointer,
};
use crate::core_types::CommitRecord;
fn make_txn_token(id: u64) -> TxnToken {
TxnToken::new(TxnId::new(id).unwrap(), TxnEpoch::new(1))
}
fn make_page_version(
pgno: u32,
commit_seq: u64,
data_byte: u8,
prev: Option<VersionPointer>,
) -> PageVersion {
PageVersion {
pgno: PageNumber::new(pgno).unwrap(),
commit_seq: CommitSeq::new(commit_seq),
created_by: make_txn_token(commit_seq),
data: PageData::from_vec(vec![data_byte; 4096]),
prev,
}
}
fn make_commit_record(seq: u64, timestamp_ns: u64) -> CommitRecord {
CommitRecord {
txn_id: TxnId::new(seq).unwrap(),
commit_seq: CommitSeq::new(seq),
pages: smallvec::smallvec![PageNumber::new(1).unwrap()],
timestamp_unix_ns: timestamp_ns,
}
}
fn make_commit_marker(seq: u64, timestamp_ns: u64) -> CommitMarker {
CommitMarker {
commit_seq: CommitSeq::new(seq),
commit_time_unix_ns: timestamp_ns,
capsule_object_id: ObjectId::from_bytes([1_u8; 16]),
proof_object_id: ObjectId::from_bytes([2_u8; 16]),
prev_marker: None,
integrity_hash: [0_u8; 16],
}
}
fn build_commit_log(count: u64) -> CommitLog {
let mut log = CommitLog::new(CommitSeq::new(1));
let base_ts = 1_700_000_000_000_000_000_u64; for i in 0..count {
let seq = i + 1;
log.append(make_commit_record(seq, base_ts + i * 1_000_000_000));
}
log
}
fn build_version_store_with_chain(pgno: u32, commit_seqs: &[u64]) -> VersionStore {
let store = VersionStore::new(PageSize::DEFAULT);
let mut prev_ptr: Option<VersionPointer> = None;
for &seq in commit_seqs {
#[allow(clippy::cast_possible_truncation)]
let version = make_page_version(pgno, seq, (seq & 0xFF) as u8, prev_ptr);
let idx = store.publish(version);
prev_ptr = Some(crate::invariants::idx_to_version_pointer(idx));
}
store
}
#[test]
fn test_time_travel_as_of_commitseq() {
let commit_log = build_commit_log(10);
let gc_horizon = CommitSeq::new(1);
let schema_epoch = SchemaEpoch::new(1);
let snapshot = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(5)),
&commit_log,
gc_horizon,
schema_epoch,
)
.expect("snapshot creation should succeed");
assert_eq!(snapshot.target_commit_seq(), CommitSeq::new(5));
assert_eq!(snapshot.snapshot().high, CommitSeq::new(5));
assert!(snapshot.is_read_only());
}
#[test]
fn test_time_travel_as_of_timestamp() {
let commit_log = build_commit_log(10);
let base_ts = 1_700_000_000_000_000_000_u64;
let gc_horizon = CommitSeq::new(1);
let schema_epoch = SchemaEpoch::new(1);
let target_ts = base_ts + 4_500_000_000;
let snapshot = create_time_travel_snapshot(
TimeTravelTarget::TimestampUnixNs(target_ts),
&commit_log,
gc_horizon,
schema_epoch,
)
.expect("timestamp resolution should succeed");
assert_eq!(snapshot.target_commit_seq(), CommitSeq::new(5));
}
#[test]
fn test_time_travel_timestamp_to_commitseq_resolution() {
let commit_log = build_commit_log(20);
let base_ts = 1_700_000_000_000_000_000_u64;
let seq = resolve_timestamp_via_commit_log(&commit_log, base_ts + 9_000_000_000)
.expect("exact timestamp should resolve");
assert_eq!(seq, CommitSeq::new(10));
let seq = resolve_timestamp_via_commit_log(&commit_log, base_ts + 14_500_000_000)
.expect("between-commits timestamp should resolve");
assert_eq!(seq, CommitSeq::new(15));
let seq = resolve_timestamp_via_commit_log(&commit_log, base_ts)
.expect("first commit timestamp should resolve");
assert_eq!(seq, CommitSeq::new(1));
let seq = resolve_timestamp_via_commit_log(&commit_log, base_ts + 19_000_000_000)
.expect("last commit timestamp should resolve");
assert_eq!(seq, CommitSeq::new(20));
}
#[test]
fn test_time_travel_timestamp_to_commitseq_resolution_via_markers() {
let base_ts = 1_700_000_000_000_000_000_u64;
let mut marker_store = CommitMarkerStore::new();
for seq in 1..=5 {
marker_store.publish(make_commit_marker(seq, base_ts + (seq - 1) * 1_000_000_000));
}
let seq = resolve_timestamp_via_markers(&marker_store, base_ts + 2_500_000_000)
.expect("between-commit marker timestamp should resolve");
assert_eq!(seq, CommitSeq::new(3));
let seq = resolve_timestamp_via_markers(&marker_store, base_ts + 4_000_000_000)
.expect("exact marker timestamp should resolve");
assert_eq!(seq, CommitSeq::new(5));
}
#[test]
fn test_time_travel_timestamp_to_commitseq_resolution_via_markers_not_found() {
let base_ts = 1_700_000_000_000_000_000_u64;
let mut marker_store = CommitMarkerStore::new();
marker_store.publish(make_commit_marker(10, base_ts + 1_000_000_000));
let result = resolve_timestamp_via_markers(&marker_store, base_ts);
assert!(matches!(
result.unwrap_err(),
TimeTravelError::TimestampNotResolvable { .. }
));
}
#[test]
fn test_time_travel_read_only() {
let commit_log = build_commit_log(5);
let snapshot = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(3)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let err = snapshot.check_dml("INSERT").unwrap_err();
assert_eq!(
err,
TimeTravelError::ReadOnlyViolation {
attempted_op: "INSERT"
}
);
let err = snapshot.check_dml("UPDATE").unwrap_err();
assert_eq!(
err,
TimeTravelError::ReadOnlyViolation {
attempted_op: "UPDATE"
}
);
let err = snapshot.check_dml("DELETE").unwrap_err();
assert_eq!(
err,
TimeTravelError::ReadOnlyViolation {
attempted_op: "DELETE"
}
);
}
#[test]
fn test_time_travel_ddl_blocked() {
let commit_log = build_commit_log(5);
let snapshot = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(3)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let err = snapshot.check_ddl("CREATE TABLE").unwrap_err();
assert_eq!(
err,
TimeTravelError::DdlBlocked {
attempted_op: "CREATE TABLE"
}
);
let err = snapshot.check_ddl("ALTER TABLE").unwrap_err();
assert_eq!(
err,
TimeTravelError::DdlBlocked {
attempted_op: "ALTER TABLE"
}
);
let err = snapshot.check_ddl("DROP TABLE").unwrap_err();
assert_eq!(
err,
TimeTravelError::DdlBlocked {
attempted_op: "DROP TABLE"
}
);
}
#[test]
fn test_time_travel_snapshot_isolation() {
let commit_log = build_commit_log(10);
let schema_epoch = SchemaEpoch::new(1);
let s1 = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(5)),
&commit_log,
CommitSeq::new(1),
schema_epoch,
)
.unwrap();
let s2 = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(5)),
&commit_log,
CommitSeq::new(1),
schema_epoch,
)
.unwrap();
assert_eq!(s1.snapshot(), s2.snapshot());
assert_eq!(s1.target_commit_seq(), s2.target_commit_seq());
}
#[test]
fn test_time_travel_mvcc_resolution() {
let commit_log = build_commit_log(10);
let store = build_version_store_with_chain(1, &[2, 5, 8]);
let snapshot = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(6)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let idx = snapshot
.resolve_page(&store, PageNumber::new(1).unwrap())
.expect("should resolve to commit 5 version");
let version = store.get_version(idx).expect("version should exist");
assert_eq!(version.commit_seq, CommitSeq::new(5));
}
#[test]
fn test_time_travel_pruned_history() {
let commit_log = build_commit_log(10);
let result = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(3)),
&commit_log,
CommitSeq::new(5),
SchemaEpoch::new(1),
);
assert_eq!(
result.unwrap_err(),
TimeTravelError::HistoryNotRetained {
requested: CommitSeq::new(3),
gc_horizon: CommitSeq::new(5),
}
);
}
#[test]
fn test_time_travel_multiple_commits() {
let commit_log = build_commit_log(10);
let store = build_version_store_with_chain(1, &[1, 3, 5, 7, 9]);
let s4 = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(4)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let idx = s4
.resolve_page(&store, PageNumber::new(1).unwrap())
.unwrap();
assert_eq!(
store.get_version(idx).unwrap().commit_seq,
CommitSeq::new(3)
);
let s8 = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(8)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let idx = s8
.resolve_page(&store, PageNumber::new(1).unwrap())
.unwrap();
assert_eq!(
store.get_version(idx).unwrap().commit_seq,
CommitSeq::new(7)
);
}
#[test]
fn test_time_travel_table_before_creation() {
let commit_log = build_commit_log(10);
let store = build_version_store_with_chain(99, &[5, 8]);
let s3 = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(3)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let result = s3.resolve_page(&store, PageNumber::new(99).unwrap());
assert!(
result.is_none(),
"page should not be visible before its creation commit"
);
}
#[test]
fn test_time_travel_with_joins() {
let commit_log = build_commit_log(10);
let store_a = build_version_store_with_chain(1, &[2, 5, 8]);
let store_b = build_version_store_with_chain(2, &[3, 6, 9]);
let snapshot = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(7)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
)
.unwrap();
let idx_a = snapshot
.resolve_page(&store_a, PageNumber::new(1).unwrap())
.unwrap();
let idx_b = snapshot
.resolve_page(&store_b, PageNumber::new(2).unwrap())
.unwrap();
assert_eq!(
store_a.get_version(idx_a).unwrap().commit_seq,
CommitSeq::new(5)
);
assert_eq!(
store_b.get_version(idx_b).unwrap().commit_seq,
CommitSeq::new(6)
);
}
#[test]
fn test_time_travel_commitseq_not_found() {
let commit_log = build_commit_log(5);
let result = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(99)),
&commit_log,
CommitSeq::new(1),
SchemaEpoch::new(1),
);
assert_eq!(
result.unwrap_err(),
TimeTravelError::CommitSeqNotFound {
requested: CommitSeq::new(99),
}
);
}
#[test]
fn test_time_travel_empty_commit_log() {
let commit_log = CommitLog::new(CommitSeq::new(1));
let result = resolve_timestamp_via_commit_log(&commit_log, 1_700_000_000_000_000_000);
assert_eq!(result.unwrap_err(), TimeTravelError::EmptyCommitLog);
}
#[test]
fn test_time_travel_timestamp_before_all_commits() {
let commit_log = build_commit_log(5);
let result = resolve_timestamp_via_commit_log(&commit_log, 1_000_000_000);
assert!(matches!(
result.unwrap_err(),
TimeTravelError::TimestampNotResolvable { .. }
));
}
#[test]
fn test_time_travel_at_gc_horizon_boundary() {
let commit_log = build_commit_log(10);
let result = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(5)),
&commit_log,
CommitSeq::new(5),
SchemaEpoch::new(1),
);
assert!(result.is_ok());
let result = create_time_travel_snapshot(
TimeTravelTarget::CommitSequence(CommitSeq::new(4)),
&commit_log,
CommitSeq::new(5),
SchemaEpoch::new(1),
);
assert!(matches!(
result.unwrap_err(),
TimeTravelError::HistoryNotRetained { .. }
));
}
}