use super::record::V2WALRecord;
use crate::snapshot::SnapshotId;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct TxRange {
pub tx_id: u64,
pub begin_lsn: u64,
pub commit_lsn: Option<u64>,
}
impl TxRange {
pub fn new(tx_id: u64, begin_lsn: u64) -> Self {
Self {
tx_id,
begin_lsn,
commit_lsn: None,
}
}
pub fn is_committed(&self) -> bool {
self.commit_lsn.is_some()
}
pub fn contains_lsn(&self, lsn: u64) -> bool {
if lsn < self.begin_lsn {
return false;
}
if let Some(commit_lsn) = self.commit_lsn {
lsn <= commit_lsn
} else {
true
}
}
}
#[derive(Debug, Default)]
pub struct TxRangeIndex {
tx_ranges: HashMap<u64, TxRange>,
max_committed_lsn: u64,
}
impl TxRangeIndex {
pub fn new() -> Self {
Self {
tx_ranges: HashMap::new(),
max_committed_lsn: 0,
}
}
pub fn begin_tx(&mut self, tx_id: u64, begin_lsn: u64) {
let range = TxRange::new(tx_id, begin_lsn);
self.tx_ranges.insert(tx_id, range);
}
pub fn commit_tx(&mut self, tx_id: u64, commit_lsn: u64) {
if let Some(range) = self.tx_ranges.get_mut(&tx_id) {
range.commit_lsn = Some(commit_lsn);
if commit_lsn > self.max_committed_lsn {
self.max_committed_lsn = commit_lsn;
}
}
}
pub fn rollback_tx(&mut self, tx_id: u64) {
self.tx_ranges.remove(&tx_id);
}
pub fn get_tx_range(&self, tx_id: u64) -> Option<&TxRange> {
self.tx_ranges.get(&tx_id)
}
pub fn get_tx_range_for_lsn(&self, lsn: u64) -> Option<&TxRange> {
self.tx_ranges
.values()
.find(|range| range.contains_lsn(lsn))
}
pub fn max_committed_lsn(&self) -> u64 {
self.max_committed_lsn
}
pub fn is_visible_at(&self, lsn: u64, snapshot_lsn: u64) -> bool {
if let Some(range) = self.get_tx_range_for_lsn(lsn) {
if let Some(commit_lsn) = range.commit_lsn {
commit_lsn <= snapshot_lsn
} else {
false
}
} else {
true
}
}
pub fn clear(&mut self) {
self.tx_ranges.clear();
self.max_committed_lsn = 0;
}
pub fn len(&self) -> usize {
self.tx_ranges.len()
}
pub fn is_empty(&self) -> bool {
self.tx_ranges.is_empty()
}
pub fn is_tx_visible(&self, tx_id: u64, snapshot_id: SnapshotId) -> bool {
if let Some(range) = self.tx_ranges.get(&tx_id) {
if let Some(commit_lsn) = range.commit_lsn {
commit_lsn != 0 && commit_lsn <= snapshot_id.as_lsn()
} else {
false
}
} else {
false
}
}
}
pub fn iter_visible_wal_records<'a, I>(
tx_index: &'a TxRangeIndex,
wal_records: I,
snapshot_id: SnapshotId,
) -> impl Iterator<Item = (u64, V2WALRecord)> + 'a
where
I: Iterator<Item = (u64, V2WALRecord)> + 'a,
{
wal_records.filter(move |(tx_id, _record)| tx_index.is_tx_visible(*tx_id, snapshot_id))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tx_range_creation() {
let range = TxRange::new(100, 1000);
assert_eq!(range.tx_id, 100);
assert_eq!(range.begin_lsn, 1000);
assert_eq!(range.commit_lsn, None);
assert!(!range.is_committed());
}
#[test]
fn test_tx_range_commit() {
let mut range = TxRange::new(100, 1000);
range.commit_lsn = Some(2000);
assert!(range.is_committed());
}
#[test]
fn test_tx_range_contains_lsn() {
let mut range = TxRange::new(100, 1000);
assert!(!range.contains_lsn(999));
assert!(range.contains_lsn(1000));
assert!(range.contains_lsn(1500));
range.commit_lsn = Some(2000);
assert!(range.contains_lsn(2000));
assert!(!range.contains_lsn(2001));
}
#[test]
fn test_tx_index_begin_commit() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.begin_tx(2, 200);
assert_eq!(index.len(), 2);
index.commit_tx(1, 150);
assert_eq!(index.max_committed_lsn(), 150);
index.commit_tx(2, 250);
assert_eq!(index.max_committed_lsn(), 250);
let range1 = index.get_tx_range(1).unwrap();
assert!(range1.is_committed());
assert_eq!(range1.commit_lsn, Some(150));
let range2 = index.get_tx_range(2).unwrap();
assert!(range2.is_committed());
assert_eq!(range2.commit_lsn, Some(250));
}
#[test]
fn test_tx_index_rollback() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.begin_tx(2, 200);
assert_eq!(index.len(), 2);
index.rollback_tx(1);
assert_eq!(index.len(), 1);
assert!(index.get_tx_range(1).is_none());
assert!(index.get_tx_range(2).is_some());
}
#[test]
fn test_tx_index_get_tx_range_for_lsn() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 200);
index.begin_tx(2, 300);
index.commit_tx(2, 400);
let range = index.get_tx_range_for_lsn(150).unwrap();
assert_eq!(range.tx_id, 1);
let range = index.get_tx_range_for_lsn(350).unwrap();
assert_eq!(range.tx_id, 2);
assert!(index.get_tx_range_for_lsn(250).is_none());
}
#[test]
fn test_tx_index_is_visible_at() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 200);
index.begin_tx(2, 300);
index.commit_tx(2, 400);
assert!(!index.is_visible_at(150, 150));
assert!(index.is_visible_at(150, 200));
assert!(!index.is_visible_at(350, 150));
assert!(index.is_visible_at(150, 400));
assert!(index.is_visible_at(350, 400));
assert!(index.is_visible_at(50, 100));
assert!(index.is_visible_at(500, 400));
}
#[test]
fn test_tx_index_clear() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 200);
assert_eq!(index.len(), 1);
assert_eq!(index.max_committed_lsn(), 200);
index.clear();
assert_eq!(index.len(), 0);
assert!(index.is_empty());
assert_eq!(index.max_committed_lsn(), 0);
}
#[test]
fn test_is_tx_visible_committed_before_snapshot() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 150);
let snapshot = SnapshotId::from_lsn(200);
assert!(index.is_tx_visible(1, snapshot));
}
#[test]
fn test_is_tx_visible_committed_after_snapshot() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 250);
let snapshot = SnapshotId::from_lsn(200);
assert!(!index.is_tx_visible(1, snapshot));
}
#[test]
fn test_is_tx_visible_uncommitted() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
let snapshot = SnapshotId::from_lsn(200);
assert!(!index.is_tx_visible(1, snapshot));
}
#[test]
fn test_is_tx_visible_nonexistent() {
let index = TxRangeIndex::new();
let snapshot = SnapshotId::current();
assert!(!index.is_tx_visible(999, snapshot));
}
#[test]
fn test_is_tx_visible_at_exact_commit_lsn() {
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 150);
let snapshot = SnapshotId::from_lsn(150);
assert!(index.is_tx_visible(1, snapshot));
}
#[test]
fn test_iter_visible_wal_records_filters_correctly() {
use V2WALRecord::*;
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
index.commit_tx(1, 150);
index.begin_tx(2, 160);
index.begin_tx(3, 170);
index.commit_tx(3, 250);
let wal_records = vec![
(
1,
TransactionBegin {
tx_id: 1,
timestamp: 100,
},
),
(
1,
TransactionCommit {
tx_id: 1,
timestamp: 150,
},
),
(
2,
TransactionBegin {
tx_id: 2,
timestamp: 160,
},
),
(
3,
TransactionBegin {
tx_id: 3,
timestamp: 170,
},
),
(
3,
TransactionCommit {
tx_id: 3,
timestamp: 250,
},
),
];
let snapshot = SnapshotId::from_lsn(200);
let visible: Vec<_> =
iter_visible_wal_records(&index, wal_records.into_iter(), snapshot).collect();
assert_eq!(visible.len(), 2);
assert_eq!(visible[0].0, 1);
assert_eq!(visible[1].0, 1);
match &visible[0].1 {
TransactionBegin { tx_id, .. } => assert_eq!(*tx_id, 1),
_ => panic!("Expected TransactionBegin"),
}
match &visible[1].1 {
TransactionCommit { tx_id, .. } => assert_eq!(*tx_id, 1),
_ => panic!("Expected TransactionCommit"),
}
}
#[test]
fn test_iter_visible_wal_records_empty_iterator() {
let index = TxRangeIndex::new();
let wal_records = vec![];
let snapshot = SnapshotId::current();
let visible: Vec<_> =
iter_visible_wal_records(&index, wal_records.into_iter(), snapshot).collect();
assert_eq!(visible.len(), 0);
}
#[test]
fn test_iter_visible_wal_records_all_invisible() {
use V2WALRecord::*;
let mut index = TxRangeIndex::new();
index.begin_tx(1, 100);
let wal_records = vec![
(
1,
TransactionBegin {
tx_id: 1,
timestamp: 100,
},
),
(
1,
NodeInsert {
node_id: 1,
slot_offset: 0,
node_data: vec![],
},
),
];
let snapshot = SnapshotId::from_lsn(200);
let visible: Vec<_> =
iter_visible_wal_records(&index, wal_records.into_iter(), snapshot).collect();
assert_eq!(visible.len(), 0);
}
}