use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use super::coordinator::IsolationLevel;
pub(crate) const AUTOCOMMIT_POOL_BATCH: u64 = 16;
pub type Xid = u64;
pub const XID_NONE: Xid = 0;
#[derive(Debug, Clone)]
pub struct Snapshot {
pub xid: Xid,
pub in_progress: HashSet<Xid>,
}
impl Snapshot {
pub fn sees(&self, xmin: Xid, xmax: Xid) -> bool {
super::visibility::is_visible(xmin, xmax, self.xid, &self.in_progress, &HashSet::new())
}
}
#[derive(Debug, Clone)]
pub struct TxnContext {
pub xid: Xid,
pub isolation: IsolationLevel,
pub snapshot: Snapshot,
pub savepoints: Vec<(String, Xid)>,
pub released_sub_xids: Vec<Xid>,
}
impl TxnContext {
pub fn writer_xid(&self) -> Xid {
self.savepoints.last().map(|(_, x)| *x).unwrap_or(self.xid)
}
}
pub struct SnapshotManager {
next_xid: AtomicU64,
state: parking_lot::RwLock<ManagerState>,
autocommit_pool: parking_lot::Mutex<AutocommitPool>,
}
#[derive(Default)]
struct AutocommitPool {
next: Xid,
end: Xid,
}
#[derive(Default)]
struct ManagerState {
active: HashSet<Xid>,
aborted: HashSet<Xid>,
pinned: HashMap<Xid, u32>,
}
impl SnapshotManager {
pub fn new() -> Self {
Self {
next_xid: AtomicU64::new(1),
state: parking_lot::RwLock::new(ManagerState::default()),
autocommit_pool: parking_lot::Mutex::new(AutocommitPool::default()),
}
}
pub fn begin(&self) -> Xid {
let xid = self.next_xid.fetch_add(1, Ordering::Relaxed);
self.state.write().active.insert(xid);
xid
}
pub fn snapshot(&self, xid: Xid) -> Snapshot {
let state = self.state.read();
let in_progress: HashSet<Xid> =
state.active.iter().copied().filter(|&x| x != xid).collect();
Snapshot { xid, in_progress }
}
pub fn commit(&self, xid: Xid) {
let mut state = self.state.write();
state.active.remove(&xid);
state.aborted.remove(&xid);
}
pub fn allocate_committed_xid(&self) -> Xid {
let mut pool = self.autocommit_pool.lock();
if pool.next >= pool.end {
let start = self
.next_xid
.fetch_add(AUTOCOMMIT_POOL_BATCH, Ordering::Relaxed);
pool.next = start;
pool.end = start + AUTOCOMMIT_POOL_BATCH;
}
let xid = pool.next;
pool.next += 1;
xid
}
pub fn rollback(&self, xid: Xid) {
let mut state = self.state.write();
state.active.remove(&xid);
state.aborted.insert(xid);
}
pub fn is_aborted(&self, xid: Xid) -> bool {
self.state.read().aborted.contains(&xid)
}
pub fn is_active(&self, xid: Xid) -> bool {
self.state.read().active.contains(&xid)
}
pub fn oldest_active_xid(&self) -> Option<Xid> {
self.state.read().active.iter().copied().min()
}
pub fn oldest_pinned_xid(&self) -> Option<Xid> {
self.state.read().pinned.keys().copied().min()
}
pub fn peek_next_xid(&self) -> Xid {
self.next_xid.load(Ordering::Relaxed)
}
pub fn observe_committed_xid(&self, xid: Xid) {
if xid == XID_NONE {
return;
}
let target = xid.saturating_add(1);
let mut current = self.next_xid.load(Ordering::Relaxed);
while current < target {
match self.next_xid.compare_exchange(
current,
target,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(actual) => current = actual,
}
}
}
pub fn prune_aborted(&self, below: Xid) {
let mut state = self.state.write();
let ManagerState {
aborted, pinned, ..
} = &mut *state;
aborted.retain(|&x| x >= below || pinned.contains_key(&x));
}
pub fn pin(&self, xid: Xid) {
if xid == XID_NONE {
return;
}
let mut state = self.state.write();
*state.pinned.entry(xid).or_insert(0) += 1;
}
pub fn unpin(&self, xid: Xid) {
if xid == XID_NONE {
return;
}
let mut state = self.state.write();
if let Some(count) = state.pinned.get_mut(&xid) {
if *count <= 1 {
state.pinned.remove(&xid);
} else {
*count -= 1;
}
}
}
pub fn is_pinned(&self, xid: Xid) -> bool {
self.state.read().pinned.contains_key(&xid)
}
pub fn pin_count(&self, xid: Xid) -> u32 {
self.state.read().pinned.get(&xid).copied().unwrap_or(0)
}
}
impl Default for SnapshotManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn xids_are_monotonic() {
let m = SnapshotManager::new();
let a = m.begin();
let b = m.begin();
let c = m.begin();
assert!(a < b && b < c);
}
#[test]
fn snapshot_excludes_concurrent_writers() {
let m = SnapshotManager::new();
let writer = m.begin();
let reader = m.begin();
let snap = m.snapshot(reader);
assert!(snap.in_progress.contains(&writer));
assert!(!snap.sees(writer, XID_NONE));
}
#[test]
fn committed_rows_become_visible() {
let m = SnapshotManager::new();
let writer = m.begin();
m.commit(writer);
let reader = m.begin();
let snap = m.snapshot(reader);
assert!(snap.sees(writer, XID_NONE));
}
#[test]
fn rolled_back_writers_stay_hidden() {
let m = SnapshotManager::new();
let writer = m.begin();
m.rollback(writer);
assert!(m.is_aborted(writer));
}
#[test]
fn pre_mvcc_rows_always_visible() {
let m = SnapshotManager::new();
let reader = m.begin();
let snap = m.snapshot(reader);
assert!(snap.sees(XID_NONE, XID_NONE));
}
#[test]
fn deletion_xmax_respected() {
let m = SnapshotManager::new();
let creator = m.begin();
m.commit(creator);
let deleter = m.begin();
m.commit(deleter);
let reader = m.begin();
let snap = m.snapshot(reader);
assert!(!snap.sees(creator, deleter));
}
#[test]
fn pin_blocks_prune_of_aborted_xid() {
let m = SnapshotManager::new();
let writer = m.begin();
m.rollback(writer);
assert!(m.is_aborted(writer));
m.pin(writer);
m.prune_aborted(writer + 1);
assert!(m.is_aborted(writer));
m.unpin(writer);
m.prune_aborted(writer + 1);
assert!(!m.is_aborted(writer));
}
#[test]
fn pin_is_reference_counted() {
let m = SnapshotManager::new();
let x = m.begin();
m.pin(x);
m.pin(x);
assert_eq!(m.pin_count(x), 2);
m.unpin(x);
assert_eq!(m.pin_count(x), 1);
assert!(m.is_pinned(x));
m.unpin(x);
assert_eq!(m.pin_count(x), 0);
assert!(!m.is_pinned(x));
m.unpin(x);
assert_eq!(m.pin_count(x), 0);
}
#[test]
fn pin_xid_none_is_noop() {
let m = SnapshotManager::new();
m.pin(XID_NONE);
assert!(!m.is_pinned(XID_NONE));
assert_eq!(m.pin_count(XID_NONE), 0);
}
#[test]
fn allocate_committed_xid_is_monotonic_and_unique() {
let m = SnapshotManager::new();
let mut seen = HashSet::new();
let mut last = 0u64;
for _ in 0..50 {
let x = m.allocate_committed_xid();
assert!(x > last, "xids must be strictly increasing: {x} > {last}");
assert!(seen.insert(x), "duplicate xid handed out: {x}");
last = x;
}
}
#[test]
fn allocate_committed_xid_skips_active_set() {
let m = SnapshotManager::new();
let _x = m.allocate_committed_xid();
assert_eq!(m.oldest_active_xid(), None);
}
#[test]
fn allocate_committed_xid_visible_to_subsequent_snapshots() {
let m = SnapshotManager::new();
let writer = m.allocate_committed_xid();
let reader = m.begin();
let snap = m.snapshot(reader);
assert!(!snap.in_progress.contains(&writer));
assert!(!m.is_aborted(writer));
assert!(snap.sees(writer, XID_NONE));
}
#[test]
fn allocate_committed_xid_does_not_block_concurrent_begin() {
let m = SnapshotManager::new();
let tx = m.begin();
let auto1 = m.allocate_committed_xid();
let auto2 = m.allocate_committed_xid();
m.commit(tx);
assert!(tx < auto1 && auto1 < auto2);
assert_eq!(m.oldest_active_xid(), None);
}
#[test]
fn oldest_active_is_min_live_xid() {
let m = SnapshotManager::new();
let a = m.begin();
let b = m.begin();
assert_eq!(m.oldest_active_xid(), Some(a));
m.commit(a);
assert_eq!(m.oldest_active_xid(), Some(b));
m.commit(b);
assert_eq!(m.oldest_active_xid(), None);
}
}