use crate::runtime::values::Value;
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use thiserror::Error;
#[derive(Debug, Clone)]
pub enum TransactionEvent {
Begin {
tx_id: String,
isolation_level: IsolationLevel,
},
Read {
tx_id: String,
key: String,
},
Write {
tx_id: String,
key: String,
},
SavepointCreated {
tx_id: String,
savepoint_name: String,
},
SavepointRolledBack {
tx_id: String,
savepoint_name: String,
},
Commit {
tx_id: String,
keys_modified: usize,
},
Rollback {
tx_id: String,
},
Timeout {
tx_id: String,
elapsed_ms: u64,
},
Conflict {
tx_id: String,
key: String,
reason: String,
},
Deadlock {
tx_id: String,
cycle: Option<Vec<String>>,
},
}
pub type TransactionEventCallback = Box<dyn Fn(&TransactionEvent) + Send + Sync>;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TransactionLogEntry {
pub timestamp: u64,
pub tx_id: String,
pub event_type: String,
pub keys: Vec<String>,
pub isolation_level: Option<String>,
}
pub struct TransactionLog {
file: Option<BufWriter<File>>,
#[allow(dead_code)]
#[allow(dead_code)]
path: PathBuf, }
impl TransactionLog {
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let file = OpenOptions::new().create(true).append(true).open(&path)?;
Ok(Self {
file: Some(BufWriter::new(file)),
path,
})
}
pub fn log_event(&mut self, event: &TransactionEvent) -> io::Result<()> {
let entry = match event {
TransactionEvent::Begin {
tx_id,
isolation_level,
} => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "begin".to_string(),
keys: vec![],
isolation_level: Some(format!("{:?}", isolation_level)),
},
TransactionEvent::Read { tx_id, key } => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "read".to_string(),
keys: vec![key.clone()],
isolation_level: None,
},
TransactionEvent::Write { tx_id, key } => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "write".to_string(),
keys: vec![key.clone()],
isolation_level: None,
},
TransactionEvent::Commit {
tx_id,
keys_modified,
} => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "commit".to_string(),
keys: vec![format!("modified:{}", keys_modified)],
isolation_level: None,
},
TransactionEvent::Rollback { tx_id } => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "rollback".to_string(),
keys: vec![],
isolation_level: None,
},
TransactionEvent::SavepointCreated {
tx_id,
savepoint_name,
} => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "savepoint_created".to_string(),
keys: vec![savepoint_name.clone()],
isolation_level: None,
},
TransactionEvent::SavepointRolledBack {
tx_id,
savepoint_name,
} => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "savepoint_rollback".to_string(),
keys: vec![savepoint_name.clone()],
isolation_level: None,
},
TransactionEvent::Timeout { tx_id, elapsed_ms } => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "timeout".to_string(),
keys: vec![format!("elapsed_ms:{}", elapsed_ms)],
isolation_level: None,
},
TransactionEvent::Conflict { tx_id, key, reason } => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "conflict".to_string(),
keys: vec![key.clone(), reason.clone()],
isolation_level: None,
},
TransactionEvent::Deadlock { tx_id, cycle } => TransactionLogEntry {
timestamp: get_current_timestamp(),
tx_id: tx_id.clone(),
event_type: "deadlock".to_string(),
keys: cycle
.as_ref()
.map(|c| vec![format!("cycle:{}", c.join(","))])
.unwrap_or_default(),
isolation_level: None,
},
};
if let Some(ref mut file) = self.file {
serde_json::to_writer(&mut *file, &entry).map_err(io::Error::other)?;
file.write_all(b"\n")?;
file.flush()?;
}
Ok(())
}
pub fn close(&mut self) -> io::Result<()> {
if let Some(mut file) = self.file.take() {
file.flush()?;
}
Ok(())
}
}
impl Drop for TransactionLog {
fn drop(&mut self) {
let _ = self.close();
}
}
pub trait StateStorage {
fn get(&self, key: &str) -> Option<Value>;
fn set(&mut self, key: &str, value: Value);
fn contains_key(&self, key: &str) -> bool;
fn remove(&mut self, key: &str) -> Option<Value>;
}
#[derive(Default)]
pub struct InMemoryStorage {
state: HashMap<String, Value>,
}
impl InMemoryStorage {
pub fn new() -> Self {
Self::default()
}
pub fn from_map(map: HashMap<String, Value>) -> Self {
Self { state: map }
}
}
impl StateStorage for InMemoryStorage {
fn get(&self, key: &str) -> Option<Value> {
self.state.get(key).cloned()
}
fn set(&mut self, key: &str, value: Value) {
self.state.insert(key.to_string(), value);
}
fn contains_key(&self, key: &str) -> bool {
self.state.contains_key(key)
}
fn remove(&mut self, key: &str) -> Option<Value> {
self.state.remove(key)
}
}
pub struct FileBackedStorage {
state: HashMap<String, Value>,
file_path: PathBuf,
auto_flush: bool, }
impl FileBackedStorage {
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Self::with_auto_flush(path, true)
}
pub fn with_auto_flush<P: AsRef<Path>>(path: P, auto_flush: bool) -> io::Result<Self> {
let file_path = path.as_ref().to_path_buf();
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent)?;
}
let state = Self::load_state_recovery(&file_path)?;
Ok(Self {
state,
file_path,
auto_flush,
})
}
fn load_state_recovery(file_path: &Path) -> io::Result<HashMap<String, Value>> {
let temp_path = file_path.with_extension("tmp");
let from_main = file_path.exists().then(|| Self::load_from_file(file_path));
match from_main {
Some(Ok(state)) => Ok(state),
Some(Err(e)) => {
eprintln!(
"Warning: Failed to load transaction state from {:?}: {}. Trying recovery from .tmp.",
file_path, e
);
if temp_path.exists() {
match Self::load_from_file(&temp_path) {
Ok(state) => {
let _ = fs::rename(&temp_path, file_path);
Ok(state)
}
Err(e2) => {
eprintln!("Warning: Recovery from .tmp failed: {}. Starting with empty state.", e2);
Ok(HashMap::new())
}
}
} else {
Ok(HashMap::new())
}
}
None => {
if temp_path.exists() {
match Self::load_from_file(&temp_path) {
Ok(state) => {
let _ = fs::rename(&temp_path, file_path);
Ok(state)
}
Err(e) => {
eprintln!("Warning: Recovery from .tmp failed: {}. Starting with empty state.", e);
Ok(HashMap::new())
}
}
} else {
Ok(HashMap::new())
}
}
}
}
fn load_from_file(path: &Path) -> io::Result<HashMap<String, Value>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
match serde_json::from_reader(reader) {
Ok(state) => Ok(state),
Err(e) => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid JSON in {:?}: {}", path, e),
)),
}
}
pub fn flush(&self) -> io::Result<()> {
let temp_path = self.file_path.with_extension("tmp");
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)?;
let writer = BufWriter::new(file);
serde_json::to_writer_pretty(writer, &self.state).map_err(io::Error::other)?;
fs::rename(&temp_path, &self.file_path)?;
Ok(())
}
}
impl StateStorage for FileBackedStorage {
fn get(&self, key: &str) -> Option<Value> {
self.state.get(key).cloned()
}
fn set(&mut self, key: &str, value: Value) {
self.state.insert(key.to_string(), value);
if self.auto_flush {
if let Err(e) = self.flush() {
eprintln!("Warning: Failed to flush transaction state to disk: {}", e);
}
}
}
fn contains_key(&self, key: &str) -> bool {
self.state.contains_key(key)
}
fn remove(&mut self, key: &str) -> Option<Value> {
let result = self.state.remove(key);
if self.auto_flush && result.is_some() {
if let Err(e) = self.flush() {
eprintln!("Warning: Failed to flush transaction state to disk: {}", e);
}
}
result
}
}
impl Drop for FileBackedStorage {
fn drop(&mut self) {
if let Err(e) = self.flush() {
eprintln!("Warning: Failed to flush transaction state on drop: {}", e);
}
}
}
#[cfg(feature = "sqlite-storage")]
pub struct SqliteStorage {
conn: rusqlite::Connection,
}
#[cfg(feature = "sqlite-storage")]
impl SqliteStorage {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Box<dyn std::error::Error>> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let conn = rusqlite::Connection::open(path)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
[],
)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
Ok(Self { conn })
}
pub fn new_in_memory() -> Result<Self, Box<dyn std::error::Error>> {
let conn = rusqlite::Connection::open_in_memory()?;
conn.execute(
"CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)",
[],
)?;
Ok(Self { conn })
}
}
#[cfg(feature = "sqlite-storage")]
impl StateStorage for SqliteStorage {
fn get(&self, key: &str) -> Option<Value> {
let result: Result<String, rusqlite::Error> =
self.conn
.query_row("SELECT value FROM kv_store WHERE key = ?1", [key], |row| {
row.get(0)
});
match result {
Ok(json_str) => {
serde_json::from_str(&json_str).ok()
}
Err(rusqlite::Error::QueryReturnedNoRows) => None,
Err(e) => {
eprintln!("Warning: SQLite get error for key '{}': {}", key, e);
None
}
}
}
fn set(&mut self, key: &str, value: Value) {
let json_str = match serde_json::to_string(&value) {
Ok(s) => s,
Err(e) => {
eprintln!(
"Warning: Failed to serialize value for key '{}': {}",
key, e
);
return;
}
};
if let Err(e) = self.conn.execute(
"INSERT OR REPLACE INTO kv_store (key, value) VALUES (?1, ?2)",
rusqlite::params![key, json_str],
) {
eprintln!("Warning: SQLite set error for key '{}': {}", key, e);
}
}
fn contains_key(&self, key: &str) -> bool {
let result: Result<i64, rusqlite::Error> = self.conn.query_row(
"SELECT COUNT(*) FROM kv_store WHERE key = ?1",
[key],
|row| row.get(0),
);
matches!(result, Ok(count) if count > 0)
}
fn remove(&mut self, key: &str) -> Option<Value> {
let current = self.get(key);
if current.is_some() {
if let Err(e) = self
.conn
.execute("DELETE FROM kv_store WHERE key = ?1", [key])
{
eprintln!("Warning: SQLite remove error for key '{}': {}", key, e);
return None;
}
}
current
}
}
#[derive(Error, Debug, Clone)]
pub enum TransactionError {
#[error("Transaction not found: {0}")]
NotFound(String),
#[error("Transaction already active")]
AlreadyActive,
#[error("No active transaction")]
NoActiveTransaction,
#[error("Transaction conflict detected")]
Conflict,
#[error("Deadlock detected")]
Deadlock,
#[error("Deadlock detected (cycle: {0:?})")]
DeadlockWithCycle(Vec<String>),
#[error("Transaction timeout")]
Timeout,
#[error("Resource limit exceeded: {0}")]
LimitExceeded(String),
#[error("Rollback failed: {0}")]
RollbackFailed(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted, ReadCommitted, RepeatableRead, Serializable, }
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransactionState {
Active,
Preparing, Committed,
RolledBack,
Failed,
}
#[derive(Debug, Clone)]
pub struct Savepoint {
pub name: String,
pub state_snapshot: HashMap<String, Value>,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub struct Transaction {
pub id: String,
pub state: TransactionState,
pub isolation_level: IsolationLevel,
pub start_time: u64,
pub timeout_ms: Option<u64>,
pub original_state: HashMap<String, Value>,
pub modified_state: HashMap<String, Value>,
pub savepoints: Vec<Savepoint>,
pub participants: Vec<String>, pub is_distributed: bool,
}
pub struct TransactionManager {
active_transactions: HashMap<String, Transaction>,
transaction_counter: u64,
storage: Box<dyn StateStorage>,
read_locks: HashMap<String, Vec<String>>, write_locks: HashMap<String, String>, wait_for: HashMap<String, Vec<String>>,
default_timeout_ms: Option<u64>, event_callback: Option<TransactionEventCallback>, transaction_log: Option<TransactionLog>, optimize_read_only_audit: bool,
max_active_transactions: usize, max_keys_per_transaction: usize, }
impl Transaction {
pub fn new(id: String, isolation_level: IsolationLevel) -> Self {
Self {
id,
state: TransactionState::Active,
isolation_level,
start_time: get_current_timestamp(),
timeout_ms: Some(30000), original_state: HashMap::new(),
modified_state: HashMap::new(),
savepoints: Vec::new(),
participants: Vec::new(),
is_distributed: false,
}
}
pub fn is_timed_out(&self) -> bool {
if let Some(timeout) = self.timeout_ms {
let elapsed = get_current_timestamp() - self.start_time;
elapsed > timeout
} else {
false
}
}
pub fn create_savepoint(&mut self, name: String) {
let savepoint = Savepoint {
name,
state_snapshot: self.modified_state.clone(),
timestamp: get_current_timestamp(),
};
self.savepoints.push(savepoint);
}
pub fn rollback_to_savepoint(&mut self, name: &str) -> Result<(), TransactionError> {
if let Some(pos) = self.savepoints.iter().position(|sp| sp.name == name) {
let savepoint = &self.savepoints[pos];
self.modified_state = savepoint.state_snapshot.clone();
self.savepoints.truncate(pos + 1);
Ok(())
} else {
Err(TransactionError::NotFound(format!(
"Savepoint '{}' not found",
name
)))
}
}
}
impl TransactionManager {
pub fn new() -> Self {
Self::with_storage(Box::new(InMemoryStorage::new()))
}
pub fn from_env() -> io::Result<Self> {
let storage_type = std::env::var("DAL_TX_STORAGE").unwrap_or_else(|_| "memory".to_string());
let log_path = std::env::var("DAL_TX_LOG_PATH").ok();
let timeout_ms = std::env::var("DAL_TX_TIMEOUT_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok());
let storage: Box<dyn StateStorage> = match storage_type.as_str() {
"file" => {
let storage_path = std::env::var("DAL_TX_STORAGE_PATH")
.unwrap_or_else(|_| "./dal_tx_state.json".to_string());
Box::new(FileBackedStorage::new(storage_path)?)
}
#[cfg(feature = "sqlite-storage")]
"sqlite" => {
let storage_path = std::env::var("DAL_TX_STORAGE_PATH")
.unwrap_or_else(|_| "./dal_tx_state.db".to_string());
Box::new(SqliteStorage::new(storage_path).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("SQLite error: {}", e))
})?)
}
#[cfg(not(feature = "sqlite-storage"))]
"sqlite" => {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"SQLite backend requires 'sqlite-storage' feature. Compile with: cargo build --features sqlite-storage"
));
}
"memory" | _ => Box::new(InMemoryStorage::new()),
};
let mut manager = Self::with_storage(storage);
if let Some(timeout) = timeout_ms {
manager = manager.with_default_timeout(Some(timeout));
}
if let Some(log) = log_path {
manager = manager.with_transaction_log(log)?;
}
if let Ok(max_active) = std::env::var("DAL_TX_MAX_ACTIVE") {
if let Ok(limit) = max_active.parse::<usize>() {
manager = manager.with_max_active_transactions(limit);
}
}
if let Ok(max_keys) = std::env::var("DAL_TX_MAX_KEYS") {
if let Ok(limit) = max_keys.parse::<usize>() {
manager = manager.with_max_keys_per_transaction(limit);
}
}
if std::env::var("DAL_TX_READ_ONLY_AUDIT_OPTIMIZATION").as_deref() == Ok("1") {
manager = manager.with_read_only_audit_optimization(true);
}
Ok(manager)
}
pub fn with_storage(storage: Box<dyn StateStorage>) -> Self {
Self {
active_transactions: HashMap::new(),
transaction_counter: 0,
storage,
read_locks: HashMap::new(),
write_locks: HashMap::new(),
wait_for: HashMap::new(),
default_timeout_ms: Some(30000), event_callback: None,
transaction_log: None,
optimize_read_only_audit: false,
max_active_transactions: 1000,
max_keys_per_transaction: 10000,
}
}
pub fn with_default_timeout(mut self, timeout_ms: Option<u64>) -> Self {
self.default_timeout_ms = timeout_ms;
self
}
pub fn with_max_active_transactions(mut self, max: usize) -> Self {
self.max_active_transactions = max;
self
}
pub fn with_max_keys_per_transaction(mut self, max: usize) -> Self {
self.max_keys_per_transaction = max;
self
}
pub fn with_event_callback(mut self, callback: TransactionEventCallback) -> Self {
self.event_callback = Some(callback);
self
}
pub fn with_transaction_log<P: AsRef<Path>>(mut self, path: P) -> io::Result<Self> {
self.transaction_log = Some(TransactionLog::new(path)?);
Ok(self)
}
pub fn with_read_only_audit_optimization(mut self, enable: bool) -> Self {
self.optimize_read_only_audit = enable;
self
}
fn emit_event(&mut self, event: TransactionEvent) {
if let Some(ref callback) = self.event_callback {
callback(&event);
}
if let Some(ref mut log) = self.transaction_log {
if let Err(e) = log.log_event(&event) {
eprintln!("Warning: Failed to write to transaction log: {}", e);
}
}
}
fn emit_event_skip_log(&mut self, event: TransactionEvent) {
if let Some(ref callback) = self.event_callback {
callback(&event);
}
}
pub fn get_committed(&self, key: &str) -> Option<Value> {
self.storage.get(key)
}
pub fn get_transaction(&self, tx_id: &str) -> Option<&Transaction> {
self.active_transactions.get(tx_id)
}
pub fn set_transaction_timeout(
&mut self,
tx_id: &str,
timeout_ms: Option<u64>,
) -> Result<(), TransactionError> {
let tx = self
.active_transactions
.get_mut(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
tx.timeout_ms = timeout_ms;
Ok(())
}
pub fn begin_transaction(
&mut self,
isolation_level: IsolationLevel,
) -> Result<String, TransactionError> {
if self.max_active_transactions > 0
&& self.active_transactions.len() >= self.max_active_transactions
{
return Err(TransactionError::LimitExceeded(format!(
"Maximum active transactions limit reached ({}/{})",
self.active_transactions.len(),
self.max_active_transactions
)));
}
self.transaction_counter += 1;
let tx_id = format!("tx_{}", self.transaction_counter);
let mut transaction = Transaction::new(tx_id.clone(), isolation_level);
transaction.timeout_ms = self.default_timeout_ms;
self.emit_event(TransactionEvent::Begin {
tx_id: tx_id.clone(),
isolation_level,
});
self.active_transactions.insert(tx_id.clone(), transaction);
Ok(tx_id)
}
pub fn read(&mut self, tx_id: &str, key: &str) -> Result<Option<Value>, TransactionError> {
let (should_lock, is_timed_out, modified_value) = {
let tx = self
.active_transactions
.get(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
if tx.state != TransactionState::Active {
return Err(TransactionError::NoActiveTransaction);
}
let should_lock = tx.isolation_level != IsolationLevel::ReadUncommitted;
let is_timed_out = tx.is_timed_out();
let modified_value = tx.modified_state.get(key).cloned();
(should_lock, is_timed_out, modified_value)
};
if is_timed_out {
self.emit_event(TransactionEvent::Timeout {
tx_id: tx_id.to_string(),
elapsed_ms: get_current_timestamp() - self.active_transactions[tx_id].start_time,
});
return Err(TransactionError::Timeout);
}
if should_lock {
self.acquire_read_lock(tx_id, key)?;
}
self.emit_event(TransactionEvent::Read {
tx_id: tx_id.to_string(),
key: key.to_string(),
});
if let Some(value) = modified_value {
return Ok(Some(value));
}
Ok(self.storage.get(key))
}
pub fn write(
&mut self,
tx_id: &str,
key: String,
value: Value,
) -> Result<(), TransactionError> {
self.acquire_write_lock(tx_id, &key)?;
self.emit_event(TransactionEvent::Write {
tx_id: tx_id.to_string(),
key: key.clone(),
});
let tx = self
.active_transactions
.get_mut(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
if tx.state != TransactionState::Active {
return Err(TransactionError::NoActiveTransaction);
}
if self.max_keys_per_transaction > 0
&& !tx.modified_state.contains_key(&key)
&& tx.modified_state.len() >= self.max_keys_per_transaction
{
return Err(TransactionError::LimitExceeded(format!(
"Maximum keys per transaction limit reached ({}/{})",
tx.modified_state.len(),
self.max_keys_per_transaction
)));
}
if !tx.original_state.contains_key(&key) {
if let Some(original) = self.storage.get(&key) {
tx.original_state.insert(key.clone(), original);
}
}
tx.modified_state.insert(key, value);
Ok(())
}
pub fn commit(&mut self, tx_id: &str) -> Result<(), TransactionError> {
let keys_modified = {
let tx = self
.active_transactions
.get(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
if tx.state != TransactionState::Active {
return Err(TransactionError::NoActiveTransaction);
}
if tx.is_timed_out() {
let elapsed_ms = get_current_timestamp() - tx.start_time;
self.emit_event(TransactionEvent::Timeout {
tx_id: tx_id.to_string(),
elapsed_ms,
});
self.rollback(tx_id)?;
return Err(TransactionError::Timeout);
}
if tx.is_distributed {
return self.two_phase_commit(tx_id);
}
tx.modified_state.len()
};
let tx = self.active_transactions.get_mut(tx_id).unwrap();
for (key, value) in &tx.modified_state {
self.storage.set(key, value.clone());
}
tx.state = TransactionState::Committed;
let is_read_only = keys_modified == 0;
if is_read_only && self.optimize_read_only_audit {
self.emit_event_skip_log(TransactionEvent::Commit {
tx_id: tx_id.to_string(),
keys_modified,
});
} else {
self.emit_event(TransactionEvent::Commit {
tx_id: tx_id.to_string(),
keys_modified,
});
}
self.release_locks(tx_id);
self.remove_from_wait_for(tx_id);
self.active_transactions.remove(tx_id);
Ok(())
}
pub fn rollback(&mut self, tx_id: &str) -> Result<(), TransactionError> {
let tx = self
.active_transactions
.get_mut(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
tx.state = TransactionState::RolledBack;
self.emit_event(TransactionEvent::Rollback {
tx_id: tx_id.to_string(),
});
self.release_locks(tx_id);
self.remove_from_wait_for(tx_id);
self.active_transactions.remove(tx_id);
Ok(())
}
fn two_phase_commit(&mut self, tx_id: &str) -> Result<(), TransactionError> {
let tx = self
.active_transactions
.get_mut(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
tx.state = TransactionState::Preparing;
for (key, value) in &tx.modified_state {
self.storage.set(key, value.clone());
}
tx.state = TransactionState::Committed;
self.release_locks(tx_id);
self.remove_from_wait_for(tx_id);
self.active_transactions.remove(tx_id);
Ok(())
}
fn remove_from_wait_for(&mut self, tx_id: &str) {
self.wait_for.remove(tx_id);
for blockers in self.wait_for.values_mut() {
blockers.retain(|b| b != tx_id);
}
}
fn add_wait_edge_and_detect_cycle(&mut self, from: &str, to: &str) -> Option<Vec<String>> {
let entry = self
.wait_for
.entry(from.to_string())
.or_insert_with(Vec::new);
if !entry.contains(&to.to_string()) {
entry.push(to.to_string());
}
self.find_cycle(from, to)
}
fn find_cycle(&self, from: &str, to: &str) -> Option<Vec<String>> {
let mut path = vec![to.to_string()];
let mut visited = std::collections::HashSet::new();
if self.find_cycle_dfs(from, to, &mut path, &mut visited) {
path.push(from.to_string());
Some(path)
} else {
None
}
}
fn find_cycle_dfs(
&self,
from: &str,
current: &str,
path: &mut Vec<String>,
visited: &mut std::collections::HashSet<String>,
) -> bool {
if current == from {
return true;
}
if !visited.insert(current.to_string()) {
return false;
}
if let Some(blockers) = self.wait_for.get(current) {
for next in blockers {
path.push(next.clone());
if self.find_cycle_dfs(from, next, path, visited) {
return true;
}
path.pop();
}
}
visited.remove(current);
false
}
fn acquire_read_lock(&mut self, tx_id: &str, key: &str) -> Result<(), TransactionError> {
if let Some(tx) = self.active_transactions.get(tx_id) {
if tx.is_timed_out() {
self.emit_event(TransactionEvent::Deadlock {
tx_id: tx_id.to_string(),
cycle: None,
});
return Err(TransactionError::Deadlock);
}
}
if let Some(write_owner) = self.write_locks.get(key) {
if write_owner != tx_id {
let owner = write_owner.clone();
if let Some(cycle) = self.add_wait_edge_and_detect_cycle(tx_id, &owner) {
self.emit_event(TransactionEvent::Deadlock {
tx_id: tx_id.to_string(),
cycle: Some(cycle.clone()),
});
return Err(TransactionError::DeadlockWithCycle(cycle));
}
self.emit_event(TransactionEvent::Conflict {
tx_id: tx_id.to_string(),
key: key.to_string(),
reason: format!("Read blocked by write lock from {}", owner),
});
return Err(TransactionError::Conflict);
}
}
self.read_locks
.entry(key.to_string())
.or_insert_with(Vec::new)
.push(tx_id.to_string());
Ok(())
}
fn acquire_write_lock(&mut self, tx_id: &str, key: &str) -> Result<(), TransactionError> {
if let Some(tx) = self.active_transactions.get(tx_id) {
if tx.is_timed_out() {
self.emit_event(TransactionEvent::Deadlock {
tx_id: tx_id.to_string(),
cycle: None,
});
return Err(TransactionError::Deadlock);
}
}
if let Some(write_owner) = self.write_locks.get(key) {
if write_owner != tx_id {
let owner = write_owner.clone();
if let Some(cycle) = self.add_wait_edge_and_detect_cycle(tx_id, &owner) {
self.emit_event(TransactionEvent::Deadlock {
tx_id: tx_id.to_string(),
cycle: Some(cycle.clone()),
});
return Err(TransactionError::DeadlockWithCycle(cycle));
}
self.emit_event(TransactionEvent::Conflict {
tx_id: tx_id.to_string(),
key: key.to_string(),
reason: format!("Write blocked by write lock from {}", owner),
});
return Err(TransactionError::Conflict);
}
}
if let Some(readers) = self.read_locks.get(key) {
let others: Vec<String> = readers.iter().filter(|r| *r != tx_id).cloned().collect();
let n_readers = others.len();
if !others.is_empty() {
for r in &others {
if let Some(cycle) = self.add_wait_edge_and_detect_cycle(tx_id, r) {
self.emit_event(TransactionEvent::Deadlock {
tx_id: tx_id.to_string(),
cycle: Some(cycle.clone()),
});
return Err(TransactionError::DeadlockWithCycle(cycle));
}
}
self.emit_event(TransactionEvent::Conflict {
tx_id: tx_id.to_string(),
key: key.to_string(),
reason: format!("Write blocked by {} read lock(s)", n_readers),
});
return Err(TransactionError::Conflict);
}
}
self.write_locks.insert(key.to_string(), tx_id.to_string());
Ok(())
}
fn release_locks(&mut self, tx_id: &str) {
self.read_locks.retain(|_, readers| {
readers.retain(|r| r != tx_id);
!readers.is_empty()
});
self.write_locks.retain(|_, owner| owner != tx_id);
}
pub fn create_savepoint(&mut self, tx_id: &str, name: String) -> Result<(), TransactionError> {
let tx = self
.active_transactions
.get_mut(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
tx.create_savepoint(name.clone());
self.emit_event(TransactionEvent::SavepointCreated {
tx_id: tx_id.to_string(),
savepoint_name: name,
});
Ok(())
}
pub fn rollback_to_savepoint(
&mut self,
tx_id: &str,
name: &str,
) -> Result<(), TransactionError> {
let tx = self
.active_transactions
.get_mut(tx_id)
.ok_or_else(|| TransactionError::NotFound(tx_id.to_string()))?;
let result = tx.rollback_to_savepoint(name);
if result.is_ok() {
self.emit_event(TransactionEvent::SavepointRolledBack {
tx_id: tx_id.to_string(),
savepoint_name: name.to_string(),
});
}
result
}
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
fn get_current_timestamp() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transaction_begin_commit() {
let mut manager = TransactionManager::new();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
assert!(manager.active_transactions.contains_key(&tx_id));
manager
.write(&tx_id, "key1".to_string(), Value::Int(42))
.unwrap();
manager.commit(&tx_id).unwrap();
assert!(manager.get_transaction(&tx_id).is_none());
assert_eq!(manager.get_committed("key1"), Some(Value::Int(42)));
}
#[test]
fn test_transaction_rollback() {
let mut manager =
TransactionManager::with_storage(Box::new(InMemoryStorage::from_map(HashMap::from([
("key1".to_string(), Value::Int(10)),
]))));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(42))
.unwrap();
manager.rollback(&tx_id).unwrap();
assert_eq!(manager.get_committed("key1"), Some(Value::Int(10)));
}
#[test]
fn test_savepoint_rollback() {
let mut manager = TransactionManager::new();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(1))
.unwrap();
manager.create_savepoint(&tx_id, "sp1".to_string()).unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(2))
.unwrap();
manager.rollback_to_savepoint(&tx_id, "sp1").unwrap();
let tx = manager.get_transaction(&tx_id).unwrap();
assert_eq!(tx.modified_state.get("key1"), Some(&Value::Int(1)));
}
#[test]
fn test_isolation_read_committed() {
let mut manager =
TransactionManager::with_storage(Box::new(InMemoryStorage::from_map(HashMap::from([
("counter".to_string(), Value::Int(0)),
]))));
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx1, "counter".to_string(), Value::Int(1))
.unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
let _read_result = manager.read(&tx2, "counter");
manager.commit(&tx1).unwrap();
let value = manager.read(&tx2, "counter").unwrap();
assert_eq!(value, Some(Value::Int(1)));
manager.commit(&tx2).unwrap();
}
#[test]
fn test_write_conflict() {
let mut manager = TransactionManager::new();
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx1, "key1".to_string(), Value::Int(1))
.unwrap();
let result = manager.write(&tx2, "key1".to_string(), Value::Int(2));
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), TransactionError::Conflict));
}
#[test]
fn test_transaction_timeout() {
let mut manager = TransactionManager::new();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager.set_transaction_timeout(&tx_id, Some(1)).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let result = manager.commit(&tx_id);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), TransactionError::Timeout));
}
#[test]
fn test_state_storage_contains_key() {
let mut storage = InMemoryStorage::new();
assert!(!storage.contains_key("key1"));
storage.set("key1", Value::Int(42));
assert!(storage.contains_key("key1"));
assert!(!storage.contains_key("key2"));
}
#[test]
fn test_state_storage_remove() {
let mut storage = InMemoryStorage::new();
storage.set("key1", Value::Int(42));
storage.set("key2", Value::String("hello".to_string()));
let removed = storage.remove("key1");
assert_eq!(removed, Some(Value::Int(42)));
assert!(!storage.contains_key("key1"));
assert!(storage.contains_key("key2"));
let not_found = storage.remove("key3");
assert_eq!(not_found, None);
}
#[test]
fn test_configurable_default_timeout() {
let mut manager = TransactionManager::new().with_default_timeout(Some(5000));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
let tx = manager.get_transaction(&tx_id).unwrap();
assert_eq!(tx.timeout_ms, Some(5000));
}
#[test]
fn test_no_default_timeout() {
let mut manager = TransactionManager::new().with_default_timeout(None);
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
let tx = manager.get_transaction(&tx_id).unwrap();
assert_eq!(tx.timeout_ms, None);
assert!(!tx.is_timed_out());
}
#[test]
fn test_transaction_event_callback() {
use std::sync::{Arc, Mutex};
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let mut manager = TransactionManager::new().with_event_callback(Box::new(move |event| {
events_clone.lock().unwrap().push(format!("{:?}", event));
}));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(42))
.unwrap();
manager.commit(&tx_id).unwrap();
let captured_events = events.lock().unwrap();
assert!(captured_events.len() >= 3);
assert!(captured_events.iter().any(|e| e.contains("Begin")));
assert!(captured_events.iter().any(|e| e.contains("Write")));
assert!(captured_events.iter().any(|e| e.contains("Commit")));
}
#[test]
fn test_savepoint_events() {
use std::sync::{Arc, Mutex};
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let mut manager = TransactionManager::new().with_event_callback(Box::new(move |event| {
events_clone.lock().unwrap().push(format!("{:?}", event));
}));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(1))
.unwrap();
manager.create_savepoint(&tx_id, "sp1".to_string()).unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(2))
.unwrap();
manager.rollback_to_savepoint(&tx_id, "sp1").unwrap();
manager.commit(&tx_id).unwrap();
let captured_events = events.lock().unwrap();
assert!(captured_events
.iter()
.any(|e| e.contains("SavepointCreated")));
assert!(captured_events
.iter()
.any(|e| e.contains("SavepointRolledBack")));
}
#[test]
fn test_conflict_event() {
use std::sync::{Arc, Mutex};
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let mut manager = TransactionManager::new().with_event_callback(Box::new(move |event| {
events_clone.lock().unwrap().push(format!("{:?}", event));
}));
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx1, "key1".to_string(), Value::Int(1))
.unwrap();
let result = manager.write(&tx2, "key1".to_string(), Value::Int(2));
assert!(result.is_err());
let captured_events = events.lock().unwrap();
assert!(captured_events.iter().any(|e| e.contains("Conflict")));
}
#[test]
fn test_deadlock_detection_timeout() {
use std::sync::{Arc, Mutex};
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let mut manager = TransactionManager::new()
.with_default_timeout(Some(1)) .with_event_callback(Box::new(move |event| {
events_clone.lock().unwrap().push(format!("{:?}", event));
}));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let result = manager.write(&tx_id, "key1".to_string(), Value::Int(1));
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), TransactionError::Deadlock));
let captured_events = events.lock().unwrap();
assert!(captured_events.iter().any(|e| e.contains("Deadlock")));
}
#[test]
fn test_deadlock_detection_cycle() {
let mut manager = TransactionManager::new();
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx1, "key1".to_string(), Value::Int(1))
.unwrap();
manager
.write(&tx2, "key2".to_string(), Value::Int(2))
.unwrap();
let r1 = manager.write(&tx1, "key2".to_string(), Value::Int(0));
assert!(r1.is_err());
assert!(matches!(r1.unwrap_err(), TransactionError::Conflict));
let r2 = manager.write(&tx2, "key1".to_string(), Value::Int(0));
assert!(r2.is_err());
match r2.unwrap_err() {
TransactionError::DeadlockWithCycle(cycle) => {
assert!(cycle.len() >= 2);
assert!(cycle.contains(&tx1));
assert!(cycle.contains(&tx2));
}
_ => panic!("expected DeadlockWithCycle"),
}
}
#[test]
fn test_rollback_event() {
use std::sync::{Arc, Mutex};
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let mut manager = TransactionManager::new().with_event_callback(Box::new(move |event| {
events_clone.lock().unwrap().push(format!("{:?}", event));
}));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(42))
.unwrap();
manager.rollback(&tx_id).unwrap();
let captured_events = events.lock().unwrap();
assert!(captured_events.iter().any(|e| e.contains("Rollback")));
}
#[test]
fn test_file_backed_storage_persistence() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("test_state.json");
{
let mut storage = FileBackedStorage::new(&storage_path).unwrap();
storage.set("key1", Value::Int(42));
storage.set("key2", Value::String("hello".to_string()));
assert_eq!(storage.get("key1"), Some(Value::Int(42)));
}
{
let storage = FileBackedStorage::new(&storage_path).unwrap();
assert_eq!(storage.get("key1"), Some(Value::Int(42)));
assert_eq!(
storage.get("key2"),
Some(Value::String("hello".to_string()))
);
}
}
#[test]
fn test_file_backed_storage_remove_persists() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("test_remove.json");
{
let mut storage = FileBackedStorage::new(&storage_path).unwrap();
storage.set("key1", Value::Int(1));
storage.set("key2", Value::Int(2));
storage.remove("key1");
}
{
let storage = FileBackedStorage::new(&storage_path).unwrap();
assert_eq!(storage.get("key1"), None);
assert_eq!(storage.get("key2"), Some(Value::Int(2)));
}
}
#[test]
fn test_file_backed_storage_recovery_from_tmp() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("recovery_state.json");
{
let mut storage = FileBackedStorage::new(&storage_path).unwrap();
storage.set("recovered", Value::Int(99));
storage.set("name", Value::String("from_tmp".to_string()));
}
let content = fs::read_to_string(&storage_path).unwrap();
fs::remove_file(&storage_path).unwrap();
let tmp_path = storage_path.with_extension("tmp");
fs::write(&tmp_path, &content).unwrap();
let storage = FileBackedStorage::new(&storage_path).unwrap();
assert_eq!(storage.get("recovered"), Some(Value::Int(99)));
assert_eq!(
storage.get("name"),
Some(Value::String("from_tmp".to_string()))
);
assert!(
storage_path.exists(),
"main file should be promoted from .tmp"
);
}
#[test]
fn test_transaction_manager_with_file_storage() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("tx_state.json");
{
let mut manager = TransactionManager::with_storage(Box::new(
FileBackedStorage::new(&storage_path).unwrap(),
));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "balance".to_string(), Value::Int(1000))
.unwrap();
manager.commit(&tx_id).unwrap();
}
{
let manager = TransactionManager::with_storage(Box::new(
FileBackedStorage::new(&storage_path).unwrap(),
));
assert_eq!(manager.get_committed("balance"), Some(Value::Int(1000)));
}
}
#[test]
fn test_transaction_log_writes_events() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let log_path = temp_dir.path().join("tx.log");
{
let mut manager = TransactionManager::new()
.with_transaction_log(&log_path)
.unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "key1".to_string(), Value::Int(42))
.unwrap();
manager.commit(&tx_id).unwrap();
}
let log_contents = fs::read_to_string(&log_path).unwrap();
assert!(log_contents.contains("begin"));
assert!(log_contents.contains("write"));
assert!(log_contents.contains("commit"));
let lines: Vec<&str> = log_contents.lines().collect();
assert!(lines.len() >= 3);
for line in lines {
if !line.trim().is_empty() {
let parsed: Result<TransactionLogEntry, _> = serde_json::from_str(line);
assert!(parsed.is_ok(), "Log line should be valid JSON: {}", line);
}
}
}
#[test]
#[serial_test::serial]
fn test_from_env_memory_backend() {
std::env::remove_var("DAL_TX_STORAGE");
let manager = TransactionManager::from_env().unwrap();
let mut manager = manager;
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "test".to_string(), Value::Int(1))
.unwrap();
manager.commit(&tx_id).unwrap();
assert_eq!(manager.get_committed("test"), Some(Value::Int(1)));
std::env::remove_var("DAL_TX_STORAGE");
std::env::remove_var("DAL_TX_STORAGE_PATH");
std::env::remove_var("DAL_TX_LOG_PATH");
std::env::remove_var("DAL_TX_TIMEOUT_MS");
}
#[test]
#[serial_test::serial]
fn test_from_env_file_backend() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("env_test.json");
std::env::set_var("DAL_TX_STORAGE", "file");
std::env::set_var("DAL_TX_STORAGE_PATH", storage_path.to_str().unwrap());
{
let mut manager = TransactionManager::from_env().unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "persisted".to_string(), Value::Int(999))
.unwrap();
manager.commit(&tx_id).unwrap();
}
{
let manager = TransactionManager::from_env().unwrap();
assert_eq!(manager.get_committed("persisted"), Some(Value::Int(999)));
}
std::env::remove_var("DAL_TX_STORAGE");
std::env::remove_var("DAL_TX_STORAGE_PATH");
std::env::remove_var("DAL_TX_LOG_PATH");
std::env::remove_var("DAL_TX_TIMEOUT_MS");
}
#[test]
fn test_file_backed_storage_concurrent_transactions() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("concurrent.json");
let mut manager = TransactionManager::with_storage(Box::new(
FileBackedStorage::new(&storage_path).unwrap(),
));
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx1, "account1".to_string(), Value::Int(100))
.unwrap();
manager.commit(&tx1).unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx2, "account2".to_string(), Value::Int(200))
.unwrap();
manager.commit(&tx2).unwrap();
assert_eq!(manager.get_committed("account1"), Some(Value::Int(100)));
assert_eq!(manager.get_committed("account2"), Some(Value::Int(200)));
}
#[test]
fn test_max_active_transactions_limit() {
let mut manager = TransactionManager::new().with_max_active_transactions(3);
let tx1 = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
let tx3 = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
let result = manager.begin_transaction(IsolationLevel::Serializable);
assert!(result.is_err(), "Should reject 4th transaction");
assert!(result.unwrap_err().to_string().contains("limit"));
manager.commit(&tx1).unwrap();
let tx4 = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
assert!(tx4.starts_with("tx_"));
manager.rollback(&tx2).unwrap();
manager.rollback(&tx3).unwrap();
manager.rollback(&tx4).unwrap();
}
#[test]
fn test_max_keys_per_transaction_limit() {
let mut manager = TransactionManager::new().with_max_keys_per_transaction(5);
let tx_id = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
for i in 0..5 {
manager
.write(&tx_id, format!("key_{}", i), Value::Int(i))
.unwrap();
}
let result = manager.write(&tx_id, "key_6".to_string(), Value::Int(6));
assert!(result.is_err(), "Should reject 6th key");
assert!(result.unwrap_err().to_string().contains("limit"));
manager
.write(&tx_id, "key_0".to_string(), Value::Int(100))
.unwrap();
manager.commit(&tx_id).unwrap();
}
#[test]
#[serial_test::serial]
fn test_resource_limits_from_env() {
std::env::set_var("DAL_TX_STORAGE", "memory");
std::env::set_var("DAL_TX_MAX_ACTIVE", "10");
std::env::set_var("DAL_TX_MAX_KEYS", "100");
let mut manager = TransactionManager::from_env().unwrap();
let mut txs = vec![];
for _ in 0..10 {
txs.push(
manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap(),
);
}
let result = manager.begin_transaction(IsolationLevel::Serializable);
assert!(result.is_err(), "Should respect DAL_TX_MAX_ACTIVE limit");
for tx in txs {
let _ = manager.rollback(&tx);
}
std::env::remove_var("DAL_TX_STORAGE");
std::env::remove_var("DAL_TX_MAX_ACTIVE");
std::env::remove_var("DAL_TX_MAX_KEYS");
}
#[test]
fn test_unlimited_resources() {
let mut manager = TransactionManager::new()
.with_max_active_transactions(0)
.with_max_keys_per_transaction(0);
let mut txs = vec![];
for _ in 0..100 {
txs.push(
manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap(),
);
}
let tx_id = manager
.begin_transaction(IsolationLevel::Serializable)
.unwrap();
for i in 0..100 {
manager
.write(&tx_id, format!("key_{}", i), Value::Int(i))
.unwrap();
}
manager.commit(&tx_id).unwrap();
for tx in txs {
let _ = manager.rollback(&tx);
}
}
#[cfg(feature = "sqlite-storage")]
#[test]
fn test_sqlite_storage_basic_operations() {
let mut storage = SqliteStorage::new_in_memory().unwrap();
storage.set("key1", Value::Int(42));
assert_eq!(storage.get("key1"), Some(Value::Int(42)));
assert!(storage.contains_key("key1"));
assert!(!storage.contains_key("key2"));
let removed = storage.remove("key1");
assert_eq!(removed, Some(Value::Int(42)));
assert!(!storage.contains_key("key1"));
}
#[cfg(feature = "sqlite-storage")]
#[test]
fn test_sqlite_storage_persistence() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
{
let mut storage = SqliteStorage::new(&db_path).unwrap();
storage.set("key1", Value::Int(42));
storage.set("key2", Value::String("hello".to_string()));
}
{
let storage = SqliteStorage::new(&db_path).unwrap();
assert_eq!(storage.get("key1"), Some(Value::Int(42)));
assert_eq!(
storage.get("key2"),
Some(Value::String("hello".to_string()))
);
}
}
#[cfg(feature = "sqlite-storage")]
#[test]
fn test_transaction_manager_with_sqlite_storage() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("tx.db");
{
let mut manager =
TransactionManager::with_storage(Box::new(SqliteStorage::new(&db_path).unwrap()));
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "balance".to_string(), Value::Int(5000))
.unwrap();
manager
.write(
&tx_id,
"user".to_string(),
Value::String("alice".to_string()),
)
.unwrap();
manager.commit(&tx_id).unwrap();
}
{
let manager =
TransactionManager::with_storage(Box::new(SqliteStorage::new(&db_path).unwrap()));
assert_eq!(manager.get_committed("balance"), Some(Value::Int(5000)));
assert_eq!(
manager.get_committed("user"),
Some(Value::String("alice".to_string()))
);
}
}
#[cfg(feature = "sqlite-storage")]
#[test]
fn test_sqlite_rollback_doesnt_persist() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("rollback.db");
{
let mut manager =
TransactionManager::with_storage(Box::new(SqliteStorage::new(&db_path).unwrap()));
let tx1 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx1, "count".to_string(), Value::Int(10))
.unwrap();
manager.commit(&tx1).unwrap();
let tx2 = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx2, "count".to_string(), Value::Int(999))
.unwrap();
manager.rollback(&tx2).unwrap();
}
{
let manager =
TransactionManager::with_storage(Box::new(SqliteStorage::new(&db_path).unwrap()));
assert_eq!(manager.get_committed("count"), Some(Value::Int(10)));
}
}
#[cfg(feature = "sqlite-storage")]
#[test]
#[serial_test::serial]
fn test_from_env_sqlite_backend() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("env_sqlite.db");
std::env::set_var("DAL_TX_STORAGE", "sqlite");
std::env::set_var("DAL_TX_STORAGE_PATH", db_path.to_str().unwrap());
{
let mut manager = TransactionManager::from_env().unwrap();
let tx_id = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
manager
.write(&tx_id, "sqlite_test".to_string(), Value::Int(777))
.unwrap();
manager.commit(&tx_id).unwrap();
}
{
let manager = TransactionManager::from_env().unwrap();
assert_eq!(manager.get_committed("sqlite_test"), Some(Value::Int(777)));
}
std::env::remove_var("DAL_TX_STORAGE");
std::env::remove_var("DAL_TX_STORAGE_PATH");
std::env::remove_var("DAL_TX_LOG_PATH");
std::env::remove_var("DAL_TX_TIMEOUT_MS");
}
}