use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use crate::sql::db::table::Value;
use super::clock::MvccClock;
use super::registry::{ActiveTxRegistry, TxTimestampOrId};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RowID {
pub table: String,
pub rowid: i64,
}
impl RowID {
pub fn new(table: impl Into<String>, rowid: i64) -> Self {
Self {
table: table.into(),
rowid,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum VersionPayload {
Present(Vec<(String, Value)>),
Tombstone,
}
#[derive(Debug, Clone)]
pub struct RowVersion {
pub begin: TxTimestampOrId,
pub end: Option<TxTimestampOrId>,
pub payload: VersionPayload,
}
impl RowVersion {
pub fn committed(commit_ts: u64, payload: VersionPayload) -> Self {
Self {
begin: TxTimestampOrId::Timestamp(commit_ts),
end: None,
payload,
}
}
pub fn in_flight(tx_id: super::TxId, payload: VersionPayload) -> Self {
Self {
begin: TxTimestampOrId::Id(tx_id),
end: None,
payload,
}
}
}
pub type RowVersionChain = Vec<RowVersion>;
#[derive(Clone, Debug)]
pub struct MvStore {
inner: Arc<MvStoreInner>,
}
#[derive(Debug)]
struct MvStoreInner {
versions: Mutex<HashMap<RowID, Arc<RwLock<RowVersionChain>>>>,
clock: Arc<MvccClock>,
active: ActiveTxRegistry,
}
impl MvStore {
pub fn new(clock: Arc<MvccClock>) -> Self {
Self {
inner: Arc::new(MvStoreInner {
versions: Mutex::new(HashMap::new()),
clock,
active: ActiveTxRegistry::new(),
}),
}
}
pub fn fresh() -> (Self, Arc<MvccClock>) {
let clock = Arc::new(MvccClock::new(0));
let store = Self::new(Arc::clone(&clock));
(store, clock)
}
pub fn clock(&self) -> &Arc<MvccClock> {
&self.inner.clock
}
pub fn active_registry(&self) -> &ActiveTxRegistry {
&self.inner.active
}
pub fn tracked_rows(&self) -> usize {
self.lock_map().len()
}
pub fn total_versions(&self) -> usize {
let map = self.lock_map();
map.values()
.map(|chain| chain.read().expect("chain RwLock poisoned").len())
.sum()
}
pub fn read(&self, row_id: &RowID, begin_ts: u64) -> Option<VersionPayload> {
let chain = {
let map = self.lock_map();
Arc::clone(map.get(row_id)?)
};
let chain = chain.read().expect("chain RwLock poisoned");
for v in chain.iter() {
if Self::visible_at(v, begin_ts) {
return Some(v.payload.clone());
}
}
None
}
pub fn visible_at(version: &RowVersion, begin_ts: u64) -> bool {
let begin_ok = match version.begin {
TxTimestampOrId::Timestamp(t) => t <= begin_ts,
TxTimestampOrId::Id(_) => false,
};
if !begin_ok {
return false;
}
match version.end {
None => true,
Some(TxTimestampOrId::Timestamp(t)) => t > begin_ts,
Some(TxTimestampOrId::Id(_)) => true,
}
}
pub fn latest_committed_begin(&self, row_id: &RowID) -> Option<u64> {
let chain = {
let map = self.lock_map();
Arc::clone(map.get(row_id)?)
};
let chain = chain.read().expect("chain RwLock poisoned");
chain.iter().rev().find_map(|v| match v.begin {
TxTimestampOrId::Timestamp(t) => Some(t),
TxTimestampOrId::Id(_) => None,
})
}
pub fn push_committed(&self, row_id: RowID, version: RowVersion) -> Result<(), MvStoreError> {
let begin_ts = match version.begin {
TxTimestampOrId::Timestamp(t) => t,
TxTimestampOrId::Id(_) => return Err(MvStoreError::NotCommitted),
};
let chain_arc = self.get_or_create_chain(row_id);
let mut chain = chain_arc.write().expect("chain RwLock poisoned");
if let Some(prev) = chain.last() {
let prev_begin = match prev.begin {
TxTimestampOrId::Timestamp(t) => t,
TxTimestampOrId::Id(_) => 0,
};
if begin_ts <= prev_begin {
return Err(MvStoreError::NonMonotonicBegin {
prev: prev_begin,
new: begin_ts,
});
}
match prev.end {
None => {}
Some(TxTimestampOrId::Timestamp(existing)) if existing == begin_ts => {
}
Some(TxTimestampOrId::Timestamp(existing)) => {
return Err(MvStoreError::PreviousAlreadyCapped { existing });
}
Some(TxTimestampOrId::Id(_)) => {
return Err(MvStoreError::PreviousCappedByInFlight);
}
}
}
if let Some(prev) = chain.last_mut() {
if prev.end.is_none() {
prev.end = Some(TxTimestampOrId::Timestamp(begin_ts));
}
}
chain.push(version);
Ok(())
}
pub fn push_in_flight(&self, row_id: RowID, version: RowVersion) {
let chain_arc = self.get_or_create_chain(row_id);
let mut chain = chain_arc.write().expect("chain RwLock poisoned");
chain.push(version);
}
pub fn active_watermark(&self) -> u64 {
self.inner.active.min_active_begin_ts().unwrap_or(u64::MAX)
}
pub fn gc_chain(&self, row_id: &RowID, watermark: u64) -> usize {
let chain_arc = match self.lock_map().get(row_id) {
Some(arc) => Arc::clone(arc),
None => return 0,
};
let reclaimed = {
let mut chain = chain_arc.write().expect("chain RwLock poisoned");
let before = chain.len();
chain.retain(|v| match v.end {
Some(TxTimestampOrId::Timestamp(t)) => t > watermark,
_ => true,
});
before - chain.len()
};
if reclaimed > 0 {
let chain_locked = chain_arc.read().expect("chain RwLock poisoned");
if chain_locked.is_empty() {
drop(chain_locked);
self.lock_map().remove(row_id);
}
}
reclaimed
}
pub fn gc_all(&self, watermark: u64) -> usize {
let row_ids: Vec<RowID> = self.lock_map().keys().cloned().collect();
row_ids
.iter()
.map(|rid| self.gc_chain(rid, watermark))
.sum()
}
fn get_or_create_chain(&self, row_id: RowID) -> Arc<RwLock<RowVersionChain>> {
let mut map = self.lock_map();
Arc::clone(
map.entry(row_id)
.or_insert_with(|| Arc::new(RwLock::new(Vec::new()))),
)
}
fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap<RowID, Arc<RwLock<RowVersionChain>>>> {
self.inner
.versions
.lock()
.unwrap_or_else(|e| panic!("sqlrite: MvStore versions mutex poisoned: {e}"))
}
}
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum MvStoreError {
#[error("push_committed expects a committed Timestamp, not an in-flight TxId")]
NotCommitted,
#[error("previous latest version already capped at end_ts={existing}")]
PreviousAlreadyCapped { existing: u64 },
#[error("previous latest version is being capped by an in-flight transaction")]
PreviousCappedByInFlight,
#[error("non-monotonic begin: previous={prev}, new={new}")]
NonMonotonicBegin { prev: u64, new: u64 },
}
#[cfg(test)]
mod tests {
use super::*;
fn payload(value: i64) -> VersionPayload {
VersionPayload::Present(vec![("v".to_string(), Value::Integer(value))])
}
#[test]
fn empty_store_returns_none() {
let (store, _clock) = MvStore::fresh();
assert!(store.read(&RowID::new("t", 1), 100).is_none());
assert_eq!(store.tracked_rows(), 0);
assert_eq!(store.total_versions(), 0);
}
#[test]
fn visibility_picks_the_right_version_for_each_begin_ts() {
let (store, clock) = MvStore::fresh();
let row = RowID::new("accounts", 1);
clock.observe(5);
store
.push_committed(row.clone(), RowVersion::committed(5, payload(100)))
.unwrap();
clock.observe(10);
store
.push_committed(row.clone(), RowVersion::committed(10, payload(200)))
.unwrap();
assert_eq!(store.read(&row, 4), None);
assert_eq!(store.read(&row, 5), Some(payload(100)));
assert_eq!(store.read(&row, 9), Some(payload(100)));
assert_eq!(store.read(&row, 10), Some(payload(200)));
assert_eq!(store.read(&row, 1_000), Some(payload(200)));
}
#[test]
fn push_committed_caps_previous_latest() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 7);
store
.push_committed(row.clone(), RowVersion::committed(2, payload(1)))
.unwrap();
store
.push_committed(row.clone(), RowVersion::committed(5, payload(2)))
.unwrap();
assert_eq!(store.read(&row, 4), Some(payload(1)));
}
#[test]
fn visible_at_handles_each_combination() {
let v = RowVersion {
begin: TxTimestampOrId::Timestamp(10),
end: None,
payload: payload(0),
};
assert!(!MvStore::visible_at(&v, 9));
assert!(MvStore::visible_at(&v, 10));
assert!(MvStore::visible_at(&v, 1_000));
let v = RowVersion {
begin: TxTimestampOrId::Timestamp(10),
end: Some(TxTimestampOrId::Timestamp(20)),
payload: payload(0),
};
assert!(!MvStore::visible_at(&v, 9));
assert!(MvStore::visible_at(&v, 10));
assert!(MvStore::visible_at(&v, 19));
assert!(!MvStore::visible_at(&v, 20));
let v = RowVersion {
begin: TxTimestampOrId::Id(super::super::TxId(42)),
end: None,
payload: payload(0),
};
assert!(!MvStore::visible_at(&v, 0));
assert!(!MvStore::visible_at(&v, 1_000));
let v = RowVersion {
begin: TxTimestampOrId::Timestamp(5),
end: Some(TxTimestampOrId::Id(super::super::TxId(42))),
payload: payload(0),
};
assert!(MvStore::visible_at(&v, 10));
assert!(!MvStore::visible_at(&v, 4)); }
#[test]
fn tombstone_versions_capture_the_delete() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 1);
store
.push_committed(row.clone(), RowVersion::committed(1, payload(42)))
.unwrap();
store
.push_committed(
row.clone(),
RowVersion::committed(5, VersionPayload::Tombstone),
)
.unwrap();
assert_eq!(store.read(&row, 1), Some(payload(42)));
assert_eq!(store.read(&row, 4), Some(payload(42)));
assert_eq!(store.read(&row, 5), Some(VersionPayload::Tombstone));
assert_eq!(store.read(&row, 100), Some(VersionPayload::Tombstone));
}
#[test]
fn push_committed_rejects_in_flight_begin() {
let (store, _clock) = MvStore::fresh();
let v = RowVersion::in_flight(super::super::TxId(7), payload(0));
let err = store
.push_committed(RowID::new("t", 1), v)
.expect_err("in-flight begin must be rejected");
assert_eq!(err, MvStoreError::NotCommitted);
}
#[test]
fn push_committed_rejects_non_monotonic_begin() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 1);
store
.push_committed(row.clone(), RowVersion::committed(10, payload(1)))
.unwrap();
let err = store
.push_committed(row.clone(), RowVersion::committed(10, payload(2)))
.expect_err("equal begin should be rejected");
assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. }));
let err = store
.push_committed(row.clone(), RowVersion::committed(5, payload(2)))
.expect_err("backward begin should be rejected");
assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. }));
}
#[test]
fn in_flight_versions_are_invisible_to_other_readers() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 1);
store
.push_committed(row.clone(), RowVersion::committed(5, payload(100)))
.unwrap();
store.push_in_flight(
row.clone(),
RowVersion::in_flight(super::super::TxId(99), payload(200)),
);
assert_eq!(store.read(&row, 5), Some(payload(100)));
assert_eq!(store.read(&row, 1_000), Some(payload(100)));
}
#[test]
fn tracked_rows_and_total_versions_are_accurate() {
let (store, _clock) = MvStore::fresh();
store
.push_committed(RowID::new("a", 1), RowVersion::committed(1, payload(0)))
.unwrap();
store
.push_committed(RowID::new("a", 1), RowVersion::committed(2, payload(0)))
.unwrap();
store
.push_committed(RowID::new("a", 2), RowVersion::committed(1, payload(0)))
.unwrap();
store
.push_committed(RowID::new("b", 1), RowVersion::committed(1, payload(0)))
.unwrap();
assert_eq!(store.tracked_rows(), 3);
assert_eq!(store.total_versions(), 4);
}
#[test]
fn store_is_send_and_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<MvStore>();
assert_sync::<MvStore>();
}
#[test]
fn concurrent_reads_see_consistent_snapshots() {
use std::thread;
let (store, _clock) = MvStore::fresh();
for rid in 0..32 {
let row = RowID::new("t", rid);
store
.push_committed(row.clone(), RowVersion::committed(1, payload(rid)))
.unwrap();
store
.push_committed(row, RowVersion::committed(10, payload(rid * 100)))
.unwrap();
}
let store_arc = Arc::new(store);
let handles: Vec<_> = (0..8)
.map(|_| {
let s = Arc::clone(&store_arc);
thread::spawn(move || {
for _ in 0..500 {
for rid in 0..32 {
let row = RowID::new("t", rid);
assert_eq!(s.read(&row, 5), Some(payload(rid)));
assert_eq!(s.read(&row, 100), Some(payload(rid * 100)));
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn store_shares_caller_clock() {
let clock = Arc::new(MvccClock::new(42));
let store = MvStore::new(Arc::clone(&clock));
assert_eq!(store.clock().now(), 42);
clock.tick(); assert_eq!(store.clock().now(), 43);
}
#[test]
fn active_watermark_is_max_with_no_readers() {
let (store, _clock) = MvStore::fresh();
assert_eq!(store.active_watermark(), u64::MAX);
}
#[test]
fn active_watermark_tracks_oldest_in_flight_tx() {
let (store, clock) = MvStore::fresh();
let h1 = store.active_registry().register(&clock); assert_eq!(store.active_watermark(), 1);
let h2 = store.active_registry().register(&clock); assert_eq!(store.active_watermark(), 1);
drop(h1);
assert_eq!(store.active_watermark(), 2);
drop(h2);
assert_eq!(store.active_watermark(), u64::MAX);
}
#[test]
fn gc_chain_reclaims_versions_below_watermark() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 1);
store
.push_committed(row.clone(), RowVersion::committed(1, payload(1)))
.unwrap();
store
.push_committed(row.clone(), RowVersion::committed(5, payload(2)))
.unwrap();
store
.push_committed(row.clone(), RowVersion::committed(9, payload(3)))
.unwrap();
assert_eq!(store.total_versions(), 3);
let reclaimed = store.gc_chain(&row, 5);
assert_eq!(reclaimed, 1);
assert_eq!(store.total_versions(), 2);
let reclaimed = store.gc_chain(&row, u64::MAX);
assert_eq!(reclaimed, 1);
assert_eq!(store.total_versions(), 1);
}
#[test]
fn gc_chain_drops_empty_chain_from_map() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 1);
store
.push_committed(row.clone(), RowVersion::committed(1, payload(1)))
.unwrap();
store
.push_committed(
row.clone(),
RowVersion::committed(5, VersionPayload::Tombstone),
)
.unwrap();
{
let map = store.inner.versions.lock().unwrap();
let chain_arc = map.get(&row).unwrap().clone();
drop(map);
let mut chain = chain_arc.write().unwrap();
if let Some(last) = chain.last_mut() {
last.end = Some(TxTimestampOrId::Timestamp(10));
}
}
assert_eq!(store.tracked_rows(), 1);
let reclaimed = store.gc_chain(&row, u64::MAX);
assert_eq!(reclaimed, 2);
assert_eq!(store.tracked_rows(), 0);
}
#[test]
fn gc_all_sweeps_every_row() {
let (store, _clock) = MvStore::fresh();
for rid in 0..4 {
let row = RowID::new("t", rid);
store
.push_committed(row.clone(), RowVersion::committed(1, payload(rid)))
.unwrap();
store
.push_committed(row.clone(), RowVersion::committed(2, payload(rid * 10)))
.unwrap();
}
assert_eq!(store.total_versions(), 8);
let reclaimed = store.gc_all(u64::MAX);
assert_eq!(reclaimed, 4);
assert_eq!(store.total_versions(), 4);
assert_eq!(store.tracked_rows(), 4);
}
#[test]
fn gc_preserves_versions_visible_to_active_readers() {
let (store, clock) = MvStore::fresh();
let row = RowID::new("t", 1);
store
.push_committed(row.clone(), RowVersion::committed(1, payload(1)))
.unwrap();
store
.push_committed(row.clone(), RowVersion::committed(10, payload(2)))
.unwrap();
clock.observe(4);
let reader = store.active_registry().register(&clock); assert_eq!(reader.begin_ts(), 5);
assert_eq!(store.active_watermark(), 5);
let reclaimed = store.gc_chain(&row, store.active_watermark());
assert_eq!(reclaimed, 0);
assert_eq!(store.read(&row, 5), Some(payload(1)));
drop(reader);
let reclaimed = store.gc_chain(&row, store.active_watermark());
assert_eq!(reclaimed, 1);
}
#[test]
fn gc_keeps_chain_bounded_under_repeated_updates() {
let (store, _clock) = MvStore::fresh();
let row = RowID::new("t", 1);
for ts in 1..=100u64 {
store
.push_committed(row.clone(), RowVersion::committed(ts, payload(ts as i64)))
.unwrap();
store.gc_chain(&row, store.active_watermark());
}
assert_eq!(store.total_versions(), 1);
}
}