use crate::bytes_range::BytesRange;
use crate::rand::DbRand;
use crate::utils::IdGenerator;
use bytes::Bytes;
use log::warn;
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::RangeBounds;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Clone, Copy, Debug, PartialEq, Default)]
#[allow(unused)]
pub enum IsolationLevel {
#[default]
Snapshot,
SerializableSnapshot,
}
pub(crate) struct TransactionState {
pub(crate) read_only: bool,
pub(crate) started_seq: u64,
committed_seq: Option<u64>,
write_keys: HashSet<Bytes>,
read_keys: HashSet<Bytes>,
read_ranges: Vec<BytesRange>,
}
impl TransactionState {
fn track_write_keys(&mut self, keys: impl IntoIterator<Item = Bytes>) {
self.write_keys.extend(keys);
}
#[allow(unused)]
fn track_read_keys(&mut self, keys: impl IntoIterator<Item = Bytes>) {
self.read_keys.extend(keys);
}
#[allow(unused)]
fn track_read_range(&mut self, range: BytesRange) {
self.read_ranges.push(range);
}
fn mark_as_committed(&mut self, seq: u64) {
self.committed_seq = Some(seq);
self.read_keys.clear();
self.read_ranges.clear();
}
}
pub struct TransactionManager {
inner: Arc<RwLock<TransactionManagerInner>>,
db_rand: Arc<DbRand>,
}
struct TransactionManagerInner {
active_txns: HashMap<Uuid, TransactionState>,
recent_committed_txns: VecDeque<TransactionState>,
}
impl TransactionManager {
pub fn new(db_rand: Arc<DbRand>) -> Self {
Self {
inner: Arc::new(RwLock::new(TransactionManagerInner {
active_txns: HashMap::new(),
recent_committed_txns: VecDeque::new(),
})),
db_rand,
}
}
pub fn new_txn(&self, seq: u64, read_only: bool) -> Uuid {
let txn_id = self.db_rand.rng().gen_uuid();
let txn_state = TransactionState {
read_only,
started_seq: seq,
committed_seq: None,
write_keys: HashSet::new(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
};
{
let mut inner = self.inner.write();
inner.active_txns.insert(txn_id, txn_state);
}
txn_id
}
#[cfg(test)]
pub fn new_txn_with_id(&self, seq: u64, read_only: bool, txn_id: Uuid) -> Uuid {
let txn_state = TransactionState {
read_only,
started_seq: seq,
committed_seq: None,
write_keys: HashSet::new(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
};
{
let mut inner = self.inner.write();
inner.active_txns.insert(txn_id, txn_state);
}
txn_id
}
pub fn drop_txn(&self, txn_id: &Uuid) {
let mut inner = self.inner.write();
inner.active_txns.remove(txn_id);
inner.recycle_recent_committed_txns();
}
pub fn track_write_keys(&self, txn_id: &Uuid, write_keys: &HashSet<Bytes>) {
let mut inner = self.inner.write();
if let Some(txn_state) = inner.active_txns.get_mut(txn_id) {
txn_state.track_write_keys(write_keys.iter().cloned());
}
}
#[allow(unused)]
pub fn track_read_keys(&self, txn_id: &Uuid, read_keys: &HashSet<Bytes>) {
let mut inner = self.inner.write();
if let Some(txn_state) = inner.active_txns.get_mut(txn_id) {
txn_state.track_read_keys(read_keys.iter().cloned());
}
}
#[allow(unused)]
pub fn track_read_range(&self, txn_id: &Uuid, range: BytesRange) {
let mut inner = self.inner.write();
if let Some(txn_state) = inner.active_txns.get_mut(txn_id) {
txn_state.track_read_range(range);
}
}
pub fn check_has_conflict(&self, txn_id: &Uuid) -> bool {
let inner = self.inner.read();
let txn_state = match inner.active_txns.get(txn_id) {
None => return false,
Some(txn_state) => txn_state,
};
let ww_conflict =
inner.has_write_write_conflict(&txn_state.write_keys, txn_state.started_seq);
if ww_conflict {
return true;
}
inner.has_read_write_conflict(
&txn_state.read_keys,
txn_state.read_ranges.clone(),
txn_state.started_seq,
)
}
pub fn track_recent_committed_txn(&self, txn_id: &Uuid, committed_seq: u64) {
let mut inner = self.inner.write();
if !inner.has_non_readonly_active_txn() {
return;
}
if let Some(mut txn_state) = inner.active_txns.remove(txn_id) {
if txn_state.read_only {
unreachable!("attempted to commit a read-only transaction");
}
txn_state.mark_as_committed(committed_seq);
inner.recent_committed_txns.push_back(txn_state);
}
}
pub fn track_recent_committed_write_batch(&self, keys: &HashSet<Bytes>, committed_seq: u64) {
let mut inner = self.inner.write();
if !inner.has_non_readonly_active_txn() {
return;
}
inner.recent_committed_txns.push_back(TransactionState {
read_only: false,
started_seq: committed_seq,
committed_seq: Some(committed_seq),
write_keys: keys.clone(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
});
}
pub fn min_active_seq(&self) -> Option<u64> {
let inner = self.inner.read();
inner
.active_txns
.values()
.map(|state| state.started_seq)
.min()
}
}
impl TransactionManagerInner {
fn min_conflict_check_seq(&self) -> Option<u64> {
self.active_txns
.values()
.filter(|state| !state.read_only)
.map(|state| state.started_seq)
.min()
}
fn has_non_readonly_active_txn(&self) -> bool {
self.active_txns.values().any(|state| !state.read_only)
}
fn recycle_recent_committed_txns(&mut self) {
let min_conflict_seq = self.min_conflict_check_seq();
if let Some(min_seq) = min_conflict_seq {
self.recent_committed_txns.retain(|txn| {
if let Some(committed_seq) = txn.committed_seq {
committed_seq >= min_seq
} else {
warn!(
"Found transaction with committed_seq = None, this may cause memory leaks"
);
true
}
});
} else {
self.recent_committed_txns.clear();
}
}
fn has_write_write_conflict(&self, write_keys: &HashSet<Bytes>, started_seq: u64) -> bool {
if write_keys.is_empty() {
return false;
}
for committed_txn in &self.recent_committed_txns {
if committed_txn.read_only {
continue;
}
let other_committed_seq = committed_txn.committed_seq.expect(
"all txns in recent_committed_txns should be committed with committed_seq set",
);
if other_committed_seq > started_seq
&& !write_keys.is_disjoint(&committed_txn.write_keys)
{
return true;
}
}
false
}
fn has_read_write_conflict(
&self,
read_keys: &HashSet<Bytes>,
read_ranges: Vec<BytesRange>,
started_seq: u64,
) -> bool {
if read_keys.is_empty() && read_ranges.is_empty() {
return false;
}
for committed_txn in &self.recent_committed_txns {
if committed_txn.read_only {
continue;
}
let other_committed_seq = committed_txn.committed_seq.expect(
"all txns in recent_committed_txns should be committed with committed_seq set",
);
if other_committed_seq > started_seq {
if !read_keys.is_disjoint(&committed_txn.write_keys) {
return true;
}
for read_range in &read_ranges {
if committed_txn
.write_keys
.iter()
.any(|write_key| read_range.contains(write_key))
{
return true;
}
}
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rand::DbRand;
use bytes::Bytes;
use parking_lot::Mutex;
use rstest::rstest;
use std::collections::HashSet;
struct CheckConflictTestCase {
name: &'static str,
recent_committed_txns: Vec<TransactionState>,
current_write_keys: Vec<&'static str>,
current_started_seq: u64,
expected_conflict: bool,
}
#[test]
fn test_drop_txn_removes_active_transaction() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let txn_id = txn_manager.new_txn(100, false);
assert!(txn_manager.inner.read().active_txns.contains_key(&txn_id));
txn_manager.drop_txn(&txn_id);
assert!(!txn_manager.inner.read().active_txns.contains_key(&txn_id));
}
#[test]
fn test_drop_txn_nonexistent_transaction_safe() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let fake_id = Uuid::new_v4();
txn_manager.drop_txn(&fake_id);
let txn_id = txn_manager.new_txn(100, false);
assert!(txn_manager.inner.read().active_txns.contains_key(&txn_id));
}
#[test]
fn test_drop_txn_triggers_garbage_collection() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let txn_id = txn_manager.new_txn(100, false);
let keys: HashSet<Bytes> = ["key1"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys, 50);
assert!(!txn_manager.inner.read().recent_committed_txns.is_empty());
txn_manager.drop_txn(&txn_id);
assert!(txn_manager.inner.read().recent_committed_txns.is_empty());
}
#[rstest]
#[case::no_recent_committed_txns(CheckConflictTestCase {
name: "no_recent_committed_txns",
recent_committed_txns: vec![],
current_write_keys: vec!["key1", "key2"],
current_started_seq: 100,
expected_conflict: false,
})]
#[case::no_overlapping_keys(CheckConflictTestCase {
name: "no_overlapping_keys",
recent_committed_txns: vec![TransactionState {
read_only: false,
started_seq: 50,
committed_seq: Some(80),
write_keys: ["key1", "key2"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key3", "key4"],
current_started_seq: 100,
expected_conflict: false,
})]
#[case::concurrent_write_same_key(CheckConflictTestCase {
name: "concurrent_write_same_key",
recent_committed_txns: vec![TransactionState {
read_only: false,
started_seq: 50,
committed_seq: Some(150),
write_keys: ["key1"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key1"],
current_started_seq: 100,
expected_conflict: true,
})]
#[case::multiple_committed_mixed_conflict(CheckConflictTestCase {
name: "multiple_committed_mixed_conflict",
recent_committed_txns: vec![
TransactionState {
read_only: false,
started_seq: 30,
committed_seq: Some(50),
write_keys: ["key1"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
},
TransactionState {
read_only: false,
started_seq: 80,
committed_seq: Some(150),
write_keys: ["key2"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
},
],
current_write_keys: vec!["key1", "key2"],
current_started_seq: 100,
expected_conflict: true,
})]
#[case::readonly_committed_no_conflict(CheckConflictTestCase {
name: "readonly_committed_no_conflict",
recent_committed_txns: vec![TransactionState {
read_only: true,
started_seq: 80,
committed_seq: Some(150),
write_keys: HashSet::new(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key1"],
current_started_seq: 100,
expected_conflict: false,
})]
#[case::committed_before_current_started(CheckConflictTestCase {
name: "committed_before_current_started",
recent_committed_txns: vec![TransactionState {
read_only: false,
started_seq: 30,
committed_seq: Some(50),
write_keys: ["key1"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key1"],
current_started_seq: 100,
expected_conflict: false,
})]
#[case::exact_seq_boundary(CheckConflictTestCase {
name: "exact_seq_boundary",
recent_committed_txns: vec![TransactionState {
read_only: false,
started_seq: 100,
committed_seq: Some(100),
write_keys: ["key1"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key1"],
current_started_seq: 100,
expected_conflict: false,
})]
#[case::partial_key_overlap_conflict(CheckConflictTestCase {
name: "partial_key_overlap_conflict",
recent_committed_txns: vec![TransactionState {
read_only: false,
started_seq: 80,
committed_seq: Some(150),
write_keys: ["key1", "key2", "key3"]
.into_iter()
.map(Bytes::from)
.collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key3", "key4", "key5"],
current_started_seq: 100,
expected_conflict: true,
})]
#[case::max_seq_values(CheckConflictTestCase {
name: "max_seq_values",
recent_committed_txns: vec![TransactionState {
read_only: false,
started_seq: u64::MAX - 1,
committed_seq: Some(u64::MAX),
write_keys: ["key1"].into_iter().map(Bytes::from).collect(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}],
current_write_keys: vec!["key1"],
current_started_seq: u64::MAX - 1,
expected_conflict: true, // Should be true since committed_seq > started_seq
})]
fn test_check_conflict_table_driven(#[case] case: CheckConflictTestCase) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
{
let mut inner = txn_manager.inner.write();
inner.recent_committed_txns = case.recent_committed_txns.into();
}
let conflict_keys: HashSet<Bytes> = case
.current_write_keys
.into_iter()
.map(Bytes::from)
.collect();
let inner = txn_manager.inner.read();
let has_conflict = inner.has_write_write_conflict(&conflict_keys, case.current_started_seq);
assert_eq!(
has_conflict, case.expected_conflict,
"Test case '{}' failed",
case.name
);
}
#[derive(Debug)]
struct MinActiveSeqTestCase {
name: &'static str,
transactions: Vec<(u64, bool)>, expected_min_seq: Option<u64>,
}
#[rstest]
#[case::no_transactions_returns_none(MinActiveSeqTestCase {
name: "no transactions returns none",
transactions: vec![],
expected_min_seq: None,
})]
#[case::single_transaction(MinActiveSeqTestCase {
name: "single transaction",
transactions: vec![(100, false)],
expected_min_seq: Some(100),
})]
#[case::multiple_transactions_returns_minimum(MinActiveSeqTestCase {
name: "multiple transactions returns minimum",
transactions: vec![(200, false), (100, true), (150, false)],
expected_min_seq: Some(100),
})]
#[case::mixed_readonly_and_write_transactions(MinActiveSeqTestCase {
name: "mixed readonly and write transactions",
transactions: vec![(50, true), (100, false)],
expected_min_seq: Some(50),
})]
fn test_min_active_seq_table_driven(#[case] case: MinActiveSeqTestCase) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
for (seq_no, read_only) in case.transactions {
let _txn_id = txn_manager.new_txn(seq_no, read_only);
}
assert_eq!(
txn_manager.min_active_seq(),
case.expected_min_seq,
"Test case '{}' failed",
case.name
);
}
struct TrackRecentCommittedTxnTestCase {
name: &'static str,
setup: Box<dyn Fn(&mut TransactionManager)>,
expected_recent_committed_txn: Option<TransactionState>,
expected_recent_committed_txns_len: usize,
expected_active_txns_len: usize,
}
#[rstest]
#[case::valid_id(TrackRecentCommittedTxnTestCase {
name: "track committed transaction with valid id",
setup: Box::new(|txn_manager| {
// Create a transaction
let txn_id = txn_manager.new_txn(100, false);
// Create another active transaction to ensure recent_committed_txns is tracked
let _other_txn = txn_manager.new_txn(200, false);
// Track committed transaction
let keys: HashSet<Bytes> = ["key1", "key2"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&txn_id, &keys);
txn_manager.track_recent_committed_txn(&txn_id, 150);
}),
expected_recent_committed_txn: Some(TransactionState {
started_seq: 100,
committed_seq: Some(150),
write_keys: ["key1", "key2"].into_iter().map(Bytes::from).collect(),
read_only: false,
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}),
expected_recent_committed_txns_len: 1,
expected_active_txns_len: 1, // Only the other transaction remains
})]
#[case::nonexistent_id(TrackRecentCommittedTxnTestCase {
name: "track committed transaction with nonexistent id",
setup: Box::new(|txn_manager| {
// Create an active transaction to ensure recent_committed_txns would be tracked
let _active_txn = txn_manager.new_txn(100, false);
// Try to track a non-existent transaction
let fake_id = Uuid::new_v4();
let _keys: HashSet<Bytes> = ["key1"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_txn(&fake_id, 150);
}),
expected_recent_committed_txn: None,
expected_recent_committed_txns_len: 0,
expected_active_txns_len: 1, // The active transaction remains
})]
#[case::no_tracking_when_only_readonly_active(TrackRecentCommittedTxnTestCase {
name: "no tracking when only readonly transactions active",
setup: Box::new(|txn_manager| {
// Create a transaction and a readonly transaction
let txn_id = txn_manager.new_txn(100, false);
let _readonly_txn = txn_manager.new_txn(200, true);
// Drop the transaction first so that after removal, only readonly remains
txn_manager.drop_txn(&txn_id);
// Now track committed transaction - this should not be tracked since no active writers
let _keys: HashSet<Bytes> = ["key1"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_txn(&txn_id, 150);
}),
expected_recent_committed_txn: None,
expected_recent_committed_txns_len: 0,
expected_active_txns_len: 1, // Only the readonly transaction remains
})]
#[case::without_id_creates_record(TrackRecentCommittedTxnTestCase {
name: "track without id creates record",
setup: Box::new(|txn_manager| {
// Create an active write transaction first to enable tracking
let _active_txn = txn_manager.new_txn(50, false);
// Track without transaction ID (non-transactional write)
let keys: HashSet<Bytes> = ["key1", "key2"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys, 100);
}),
expected_recent_committed_txn: Some(TransactionState {
started_seq: 100,
committed_seq: Some(100),
write_keys: ["key1", "key2"].into_iter().map(Bytes::from).collect(),
read_only: false,
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}),
expected_recent_committed_txns_len: 1,
expected_active_txns_len: 1, // Now we have the active transaction remaining
})]
#[case::merges_conflict_keys(TrackRecentCommittedTxnTestCase {
name: "merges conflict keys from existing transaction",
setup: Box::new(|txn_manager| {
// Create a transaction with some conflict keys already tracked
let txn_id = txn_manager.new_txn(100, false);
// Create another active transaction to ensure tracking happens
let _other_txn = txn_manager.new_txn(200, false);
// Add some keys to the transaction's conflict_keys before committing
{
let mut inner = txn_manager.inner.write();
if let Some(txn_state) = inner.active_txns.get_mut(&txn_id) {
txn_state.track_write_keys(["existing_key"].into_iter().map(Bytes::from));
}
}
// Track committed transaction with additional keys
let additional_keys: HashSet<Bytes> = ["key1", "key2"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&txn_id, &additional_keys);
txn_manager.track_recent_committed_txn(&txn_id, 150);
}),
expected_recent_committed_txn: Some(TransactionState {
started_seq: 100,
committed_seq: Some(150),
write_keys: ["existing_key", "key1", "key2"].into_iter().map(Bytes::from).collect(),
read_only: false,
read_keys: HashSet::new(),
read_ranges: Vec::new(),
}),
expected_recent_committed_txns_len: 1,
expected_active_txns_len: 1, // Only the other transaction remains
})]
fn test_track_recent_committed_txn_table_driven(
#[case] test_case: TrackRecentCommittedTxnTestCase,
) {
let db_rand = Arc::new(DbRand::new(0));
let mut txn_manager = TransactionManager::new(db_rand);
(test_case.setup)(&mut txn_manager);
let inner = txn_manager.inner.read();
assert_eq!(
inner.recent_committed_txns.len(),
test_case.expected_recent_committed_txns_len,
"Test case '{}' failed: expected {} recent committed transactions, got {}",
test_case.name,
test_case.expected_recent_committed_txns_len,
inner.recent_committed_txns.len()
);
assert_eq!(
inner.active_txns.len(),
test_case.expected_active_txns_len,
"Test case '{}' failed: expected {} active transactions, got {}",
test_case.name,
test_case.expected_active_txns_len,
inner.active_txns.len()
);
if let Some(expected_txn) = test_case.expected_recent_committed_txn {
assert!(
!inner.recent_committed_txns.is_empty(),
"Test case '{}' failed: expected a committed transaction but none found",
test_case.name
);
let actual_txn = &inner.recent_committed_txns[0];
assert_eq!(
actual_txn.started_seq, expected_txn.started_seq,
"Test case '{}' failed: expected started_seq {}, got {}",
test_case.name, expected_txn.started_seq, actual_txn.started_seq
);
assert_eq!(
actual_txn.committed_seq, expected_txn.committed_seq,
"Test case '{}' failed: expected committed_seq {:?}, got {:?}",
test_case.name, expected_txn.committed_seq, actual_txn.committed_seq
);
assert_eq!(
actual_txn.write_keys, expected_txn.write_keys,
"Test case '{}' failed: expected write_keys {:?}, got {:?}",
test_case.name, expected_txn.write_keys, actual_txn.write_keys
);
assert_eq!(
actual_txn.read_only, expected_txn.read_only,
"Test case '{}' failed: expected read_only {}, got {}",
test_case.name, expected_txn.read_only, actual_txn.read_only
);
}
}
#[test]
fn test_recycle_recent_committed_txns_filters_by_min_seq() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let active_txn1 = txn_manager.new_txn(120, false);
let keys1: HashSet<Bytes> = ["key1"].into_iter().map(Bytes::from).collect();
let keys2: HashSet<Bytes> = ["key2"].into_iter().map(Bytes::from).collect();
let keys3: HashSet<Bytes> = ["key3"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys1, 50); txn_manager.track_recent_committed_write_batch(&keys2, 100); txn_manager.track_recent_committed_write_batch(&keys3, 150);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 3);
txn_manager.drop_txn(&active_txn1);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 0);
}
#[test]
fn test_recycle_recent_committed_txns_clears_all_when_no_active_writers() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let txn_id = txn_manager.new_txn(300, false);
let keys: HashSet<Bytes> = ["key1", "key2"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys, 100);
txn_manager.track_recent_committed_write_batch(&keys, 200);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 2);
txn_manager.drop_txn(&txn_id);
assert!(txn_manager.inner.read().recent_committed_txns.is_empty());
}
#[test]
fn test_recycle_recent_committed_txns_boundary_condition_equal_seq() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let _active_txn1 = txn_manager.new_txn(100, false); let active_txn2 = txn_manager.new_txn(200, false);
let keys: HashSet<Bytes> = ["key1"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys, 99); txn_manager.track_recent_committed_write_batch(&keys, 100); txn_manager.track_recent_committed_write_batch(&keys, 101);
txn_manager.drop_txn(&active_txn2);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 2);
let committed_seqs: Vec<u64> = txn_manager
.inner
.read()
.recent_committed_txns
.iter()
.map(|txn| txn.committed_seq.unwrap())
.collect();
assert!(committed_seqs.contains(&100));
assert!(committed_seqs.contains(&101));
}
#[test]
fn test_recycle_recent_committed_txns_handles_none_committed_seq() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
{
let mut inner = txn_manager.inner.write();
inner.recent_committed_txns.push_back(TransactionState {
read_only: false,
started_seq: 50,
committed_seq: None, write_keys: HashSet::new(),
read_keys: HashSet::new(),
read_ranges: Vec::new(),
});
}
let keys: HashSet<Bytes> = ["key1"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys, 100);
let _active_txn1 = txn_manager.new_txn(150, false); let active_txn2 = txn_manager.new_txn(200, false);
txn_manager.drop_txn(&active_txn2);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 1);
assert_eq!(
txn_manager.inner.read().recent_committed_txns[0].committed_seq,
None
);
}
#[test]
fn test_transaction_lifecycle_complete_flow() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let txn_id = txn_manager.new_txn(100, false);
assert_eq!(txn_manager.min_active_seq(), Some(100));
let write_keys: HashSet<Bytes> = ["key1", "key2"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&txn_id, &write_keys);
let has_conflict = txn_manager.check_has_conflict(&txn_id);
assert!(!has_conflict);
let other_txn = txn_manager.new_txn(50, false);
let other_keys: HashSet<Bytes> = ["key1", "key3"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&other_txn, &other_keys);
txn_manager.track_recent_committed_txn(&other_txn, 120);
let has_conflict = txn_manager.check_has_conflict(&txn_id);
assert!(has_conflict);
txn_manager.track_write_keys(&txn_id, &write_keys);
txn_manager.track_recent_committed_txn(&txn_id, 150);
assert!(txn_manager.inner.read().active_txns.is_empty());
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 2);
assert_eq!(txn_manager.min_active_seq(), None);
}
#[test]
fn test_concurrent_transactions_conflict_detection() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let keys_a: HashSet<Bytes> = ["keyA"].into_iter().map(Bytes::from).collect();
let keys_b: HashSet<Bytes> = ["keyB"].into_iter().map(Bytes::from).collect();
let keys_ab: HashSet<Bytes> = ["keyA", "keyB"].into_iter().map(Bytes::from).collect();
let txn1 = txn_manager.new_txn(100, false); let txn2 = txn_manager.new_txn(110, false); let txn3 = txn_manager.new_txn(120, false); txn_manager.track_write_keys(&txn1, &keys_a);
txn_manager.track_write_keys(&txn2, &keys_b);
txn_manager.track_write_keys(&txn3, &keys_ab);
assert!(!txn_manager.check_has_conflict(&txn1));
assert!(!txn_manager.check_has_conflict(&txn2));
assert!(!txn_manager.check_has_conflict(&txn3));
txn_manager.track_write_keys(&txn1, &keys_a);
txn_manager.track_recent_committed_txn(&txn1, 130);
assert!(!txn_manager.check_has_conflict(&txn2));
assert!(txn_manager.check_has_conflict(&txn3));
txn_manager.track_write_keys(&txn2, &keys_b);
txn_manager.track_recent_committed_txn(&txn2, 140);
assert!(txn_manager.check_has_conflict(&txn3));
assert_eq!(txn_manager.inner.read().active_txns.len(), 1); assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 2); }
#[test]
fn test_garbage_collection_timing_with_multiple_operations() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let long_txn = txn_manager.new_txn(100, false); let short_txn1 = txn_manager.new_txn(150, false); let short_txn2 = txn_manager.new_txn(200, false);
let keys1: HashSet<Bytes> = ["old_key1"].into_iter().map(Bytes::from).collect();
let keys2: HashSet<Bytes> = ["old_key2"].into_iter().map(Bytes::from).collect();
txn_manager.track_recent_committed_write_batch(&keys1, 50);
txn_manager.track_recent_committed_write_batch(&keys2, 60);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 2);
let short_keys: HashSet<Bytes> = ["short_key"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&short_txn1, &short_keys);
txn_manager.track_recent_committed_txn(&short_txn1, 180);
txn_manager.track_write_keys(&short_txn2, &short_keys);
txn_manager.track_recent_committed_txn(&short_txn2, 220);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 4);
txn_manager.drop_txn(&long_txn);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 0);
assert!(txn_manager.inner.read().active_txns.is_empty());
}
#[test]
fn test_readonly_vs_write_transaction_interactions() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let readonly_txn1 = txn_manager.new_txn(50, true); let write_txn1 = txn_manager.new_txn(100, false); let _readonly_txn2 = txn_manager.new_txn(150, true); let write_txn2 = txn_manager.new_txn(200, false);
assert_eq!(txn_manager.min_active_seq(), Some(50));
assert_eq!(txn_manager.inner.read().min_conflict_check_seq(), Some(100));
let keys: HashSet<Bytes> = ["test_key"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&write_txn1, &keys);
txn_manager.track_recent_committed_txn(&write_txn1, 120);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 1);
txn_manager.drop_txn(&readonly_txn1);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 0); assert_eq!(txn_manager.inner.read().min_conflict_check_seq(), Some(200));
txn_manager.drop_txn(&write_txn2);
assert_eq!(txn_manager.inner.read().recent_committed_txns.len(), 0);
assert_eq!(txn_manager.inner.read().min_conflict_check_seq(), None);
assert_eq!(txn_manager.min_active_seq(), Some(150));
}
#[test]
fn test_ssi_phantom_read_conflict_on_range() {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let reader_txn = txn_manager.new_txn(100, true);
let _active_writer_guard = txn_manager.new_txn(200, false);
let writer_txn = txn_manager.new_txn(60, false);
let writer_keys: HashSet<Bytes> = ["foo5"].into_iter().map(Bytes::from).collect();
txn_manager.track_write_keys(&writer_txn, &writer_keys);
txn_manager.track_recent_committed_txn(&writer_txn, 120);
let range = BytesRange::from(Bytes::from("foo0")..=Bytes::from("foo9"));
txn_manager.track_read_range(&reader_txn, range);
assert!(txn_manager.check_has_conflict(&reader_txn));
}
use proptest::collection::vec;
use proptest::prelude::*;
use rand::Rng;
#[derive(Debug, Clone)]
enum TxnOperation {
Create {
read_only: bool,
txn_id: Uuid,
},
Drop {
txn_id: Uuid,
},
Commit {
txn_id: Option<Uuid>,
keys: Vec<String>,
},
TrackReadKeys {
txn_id: Uuid,
keys: Vec<String>,
},
TrackReadRange {
txn_id: Uuid,
start_key: String,
end_key: String,
},
Recycle,
}
fn txn_operation_strategy() -> impl Strategy<Value = TxnOperation> {
let tracked_txn_ids = Arc::new(Mutex::new(Vec::<Uuid>::new()));
let tracked_txn_ids_for_drop = Arc::clone(&tracked_txn_ids);
let tracked_txn_ids_for_commit = Arc::clone(&tracked_txn_ids);
let tracked_txn_ids_for_track_read_keys = Arc::clone(&tracked_txn_ids);
let tracked_txn_ids_for_track_read_range = Arc::clone(&tracked_txn_ids);
let sample_txn_id = move |tracked_txn_ids: Arc<Mutex<Vec<Uuid>>>| {
let ids = tracked_txn_ids.lock();
if ids.is_empty() {
Uuid::new_v4()
} else {
let idx = rand::rng().random_range(0..ids.len());
ids[idx]
}
};
prop_oneof![
(any::<bool>()).prop_map(move |read_only| {
let txn_id = Uuid::new_v4();
tracked_txn_ids.lock().push(txn_id);
TxnOperation::Create { read_only, txn_id }
}),
(0..1000u32).prop_map(move |_| {
TxnOperation::Drop {
txn_id: sample_txn_id(tracked_txn_ids_for_drop.clone()),
}
}),
(
prop::option::of(
(0..100u32)
.prop_map(move |_| sample_txn_id(tracked_txn_ids_for_commit.clone()))
),
vec("key[0-9][0-9]", 1..5),
)
.prop_map(|(txn_id, keys)| TxnOperation::Commit { txn_id, keys }),
(
(0..100u32)
.prop_map(move |_| sample_txn_id(tracked_txn_ids_for_track_read_keys.clone())),
vec("key[0-9][0-9]", 1..3),
)
.prop_map(|(txn_id, keys)| TxnOperation::TrackReadKeys { txn_id, keys }),
(
(0..100u32)
.prop_map(move |_| sample_txn_id(tracked_txn_ids_for_track_read_range.clone())),
"key[0-9][0-9]",
"key[0-9][0-9]",
)
.prop_map(|(txn_id, start_key, end_key)| {
TxnOperation::TrackReadRange {
txn_id,
start_key: start_key.clone().min(end_key.clone()),
end_key: start_key.max(end_key).to_string(),
}
}),
(0..10u32).prop_map(|_| TxnOperation::Recycle)
]
}
fn operation_sequence_strategy() -> impl Strategy<Value = Vec<TxnOperation>> {
vec(txn_operation_strategy(), 5..100)
}
#[derive(Debug)]
struct ExecutionState {
seq_counter: u64,
}
#[derive(Debug, PartialEq)]
enum ExecutionEffect {
Nothing,
CommitSuccess,
CommitConflict,
Recycled,
}
impl ExecutionState {
fn new() -> Self {
Self { seq_counter: 100 }
}
fn execute_operation(
&mut self,
manager: &TransactionManager,
op: &TxnOperation,
) -> ExecutionEffect {
match op {
TxnOperation::Create { read_only, txn_id } => {
manager.new_txn_with_id(self.seq_counter, *read_only, *txn_id);
ExecutionEffect::Nothing
}
TxnOperation::Drop { txn_id } => {
manager.drop_txn(txn_id);
ExecutionEffect::Nothing
}
TxnOperation::TrackReadKeys { txn_id, keys } => {
let key_set: HashSet<Bytes> =
keys.iter().map(|k| Bytes::from(k.clone())).collect();
manager.track_read_keys(txn_id, &key_set);
ExecutionEffect::Nothing
}
TxnOperation::TrackReadRange {
txn_id,
start_key,
end_key,
} => {
let range = BytesRange::from(
Bytes::from(start_key.clone())..=Bytes::from(end_key.clone()),
);
manager.track_read_range(txn_id, range);
ExecutionEffect::Nothing
}
TxnOperation::Commit { txn_id, keys } => {
let key_set = keys
.iter()
.map(|k: &String| Bytes::from(k.clone()))
.collect();
self.seq_counter += 10;
match txn_id {
None => {
manager.track_recent_committed_write_batch(&key_set, self.seq_counter);
ExecutionEffect::CommitSuccess
}
Some(txn_id) => {
{
let inner = manager.inner.read();
if inner
.active_txns
.get(txn_id)
.map(|txn| txn.read_only)
.unwrap_or(false)
{
return ExecutionEffect::Nothing;
}
}
manager.track_write_keys(txn_id, &key_set);
if !manager.check_has_conflict(txn_id) {
manager.track_recent_committed_txn(txn_id, self.seq_counter);
ExecutionEffect::CommitSuccess
} else {
ExecutionEffect::CommitConflict
}
}
}
}
TxnOperation::Recycle => {
manager.inner.write().recycle_recent_committed_txns();
ExecutionEffect::Recycled
}
}
}
}
proptest! {
#[test]
fn prop_inv_disjoint_active_and_committed_sets(ops in operation_sequence_strategy()) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let mut exec_state = ExecutionState::new();
for op in ops {
exec_state.execute_operation(&txn_manager, &op);
let inner = txn_manager.inner.read();
let active_ids: HashSet<Uuid> = inner.active_txns.keys().cloned().collect();
for active_id in &active_ids {
prop_assert!(inner.active_txns.contains_key(active_id),
"Active transaction {:?} should be in active_txns", active_id);
}
prop_assert!(
inner.recent_committed_txns.iter().all(|_committed_txn| true),
"No committed transaction should have an active counterpart"
);
}
}
#[test]
fn prop_all_write_without_conflict_should_be_committed(ops in operation_sequence_strategy()) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let mut exec_state = ExecutionState::new();
for op in ops {
let effect = exec_state.execute_operation(&txn_manager, &op);
if let TxnOperation::Commit { txn_id: None, keys: _, } = &op {
prop_assert!(effect == ExecutionEffect::CommitSuccess, "If the commit is successful, the transaction should have conflict");
}
}
}
#[test]
fn prop_inv_min_active_seq_correctness(ops in operation_sequence_strategy()) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let mut exec_state = ExecutionState::new();
for op in ops {
exec_state.execute_operation(&txn_manager, &op);
let min_active_seq = txn_manager.min_active_seq();
let inner = txn_manager.inner.read();
if inner.active_txns.is_empty() {
prop_assert!(
min_active_seq.is_none(),
"min_active_seq should be None when no active transactions exist"
);
} else {
let expected_min = inner.active_txns.values()
.map(|txn| txn.started_seq)
.min();
prop_assert_eq!(
min_active_seq,
expected_min,
"min_active_seq should equal the minimum started_seq of all active transactions"
);
}
}
}
#[test]
fn prop_inv_garbage_collection_correctness(ops in operation_sequence_strategy()) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let mut exec_state = ExecutionState::new();
for op in ops {
let effect = exec_state.execute_operation(&txn_manager, &op);
if effect == ExecutionEffect::Recycled {
let inner = txn_manager.inner.read();
if !inner.recent_committed_txns.is_empty() {
let has_no_non_readonly_active_txn = inner.has_non_readonly_active_txn();
prop_assert!(
has_no_non_readonly_active_txn,
"invariant violation: recent_committed_txns is not empty but there are no active write transactions. \
Active transaction IDs: {:?}, Number of recent committed: {}",
inner.active_txns.keys().collect::<Vec<_>>(),
inner.recent_committed_txns.len()
);
}
if !inner.has_non_readonly_active_txn() {
prop_assert!(inner.recent_committed_txns.is_empty(), "If there's no active non-readonly transactions, recent_committed_txns should be empty");
}
}
}
}
#[test]
fn prop_inv_ssi_read_write_conflict_detection(ops in operation_sequence_strategy()) {
let db_rand = Arc::new(DbRand::new(0));
let txn_manager = TransactionManager::new(db_rand);
let mut exec_state = ExecutionState::new();
for op in ops {
exec_state.execute_operation(&txn_manager, &op);
let inner = txn_manager.inner.read();
for (_id, txn) in inner.active_txns.iter() {
let rw_conflict = inner.has_read_write_conflict(
&txn.read_keys,
txn.read_ranges.clone(),
txn.started_seq,
);
let mut culprit_exists = false;
for committed in inner.recent_committed_txns.iter() {
if committed.read_only {
continue;
}
let Some(other_committed_seq) = committed.committed_seq else { continue };
if other_committed_seq <= txn.started_seq {
continue;
}
let direct_conflict = !txn.read_keys.is_disjoint(&committed.write_keys);
let mut phantom_conflict = false;
if !txn.read_ranges.is_empty() && !committed.write_keys.is_empty() {
'outer: for range in txn.read_ranges.iter() {
for w in committed.write_keys.iter() {
if range.contains(w) {
phantom_conflict = true;
break 'outer;
}
}
}
}
if direct_conflict || phantom_conflict {
culprit_exists = true;
break;
}
}
prop_assert_eq!(rw_conflict, culprit_exists,
"SSI RW invariant violation: rw_conflict={} but culprit_exists={} (started_seq={:?}, read_keys_len={}, read_ranges_len={}, recent_committed_len={})",
rw_conflict,
culprit_exists,
txn.started_seq,
txn.read_keys.len(),
txn.read_ranges.len(),
inner.recent_committed_txns.len()
);
}
}
}
}
}