use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fmt;
use crate::cli::error::{CliError, CliResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransactionState {
Idle,
Active,
Preparing,
Committed,
RolledBack,
Failed,
}
impl fmt::Display for TransactionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransactionState::Idle => write!(f, "idle"),
TransactionState::Active => write!(f, "active"),
TransactionState::Preparing => write!(f, "preparing"),
TransactionState::Committed => write!(f, "committed"),
TransactionState::RolledBack => write!(f, "rolled back"),
TransactionState::Failed => write!(f, "failed"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum IsolationLevel {
ReadUncommitted,
#[default]
ReadCommitted,
RepeatableRead,
Serializable,
}
impl fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IsolationLevel::ReadUncommitted => write!(f, "read-uncommitted"),
IsolationLevel::ReadCommitted => write!(f, "read-committed"),
IsolationLevel::RepeatableRead => write!(f, "repeatable-read"),
IsolationLevel::Serializable => write!(f, "serializable"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionOperation {
pub query: String,
pub timestamp: DateTime<Utc>,
pub description: Option<String>,
}
impl TransactionOperation {
pub fn new(query: String) -> Self {
Self {
query,
timestamp: Utc::now(),
description: None,
}
}
pub fn with_description(query: String, description: String) -> Self {
Self {
query,
timestamp: Utc::now(),
description: Some(description),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionMetadata {
pub id: String,
pub state: TransactionState,
pub isolation_level: IsolationLevel,
pub started_at: DateTime<Utc>,
pub ended_at: Option<DateTime<Utc>>,
pub operation_count: usize,
pub read_only: bool,
pub name: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TransactionConfig {
pub isolation_level: IsolationLevel,
pub read_only: bool,
pub max_operations: usize,
pub timeout_seconds: Option<u64>,
pub auto_commit: bool,
}
impl Default for TransactionConfig {
fn default() -> Self {
Self {
isolation_level: IsolationLevel::default(),
read_only: false,
max_operations: 1000,
timeout_seconds: Some(300), auto_commit: false,
}
}
}
impl TransactionConfig {
pub fn read_only() -> Self {
Self {
read_only: true,
..Default::default()
}
}
pub fn with_isolation(isolation_level: IsolationLevel) -> Self {
Self {
isolation_level,
..Default::default()
}
}
pub fn auto_commit(mut self, enabled: bool) -> Self {
self.auto_commit = enabled;
self
}
pub fn timeout(mut self, seconds: u64) -> Self {
self.timeout_seconds = Some(seconds);
self
}
}
pub struct TransactionManager {
metadata: Option<TransactionMetadata>,
operations: VecDeque<TransactionOperation>,
config: TransactionConfig,
transaction_counter: u64,
}
impl TransactionManager {
pub fn new() -> Self {
Self::with_config(TransactionConfig::default())
}
pub fn with_config(config: TransactionConfig) -> Self {
Self {
metadata: None,
operations: VecDeque::new(),
config,
transaction_counter: 0,
}
}
pub fn is_active(&self) -> bool {
matches!(
self.metadata.as_ref().map(|m| m.state),
Some(TransactionState::Active)
)
}
pub fn state(&self) -> TransactionState {
self.metadata
.as_ref()
.map(|m| m.state)
.unwrap_or(TransactionState::Idle)
}
pub fn metadata(&self) -> Option<&TransactionMetadata> {
self.metadata.as_ref()
}
pub fn operation_count(&self) -> usize {
self.operations.len()
}
pub fn begin(&mut self) -> CliResult<String> {
self.begin_with_name(None)
}
pub fn begin_with_name(&mut self, name: Option<String>) -> CliResult<String> {
if self.is_active() {
return Err(CliError::invalid_arguments(
"A transaction is already active. Commit or rollback the current transaction first.",
));
}
self.transaction_counter += 1;
let tx_id = format!("tx-{}", self.transaction_counter);
self.metadata = Some(TransactionMetadata {
id: tx_id.clone(),
state: TransactionState::Active,
isolation_level: self.config.isolation_level,
started_at: Utc::now(),
ended_at: None,
operation_count: 0,
read_only: self.config.read_only,
name,
});
self.operations.clear();
Ok(tx_id)
}
pub fn add_operation(&mut self, query: String) -> CliResult<()> {
self.add_operation_with_description(query, None)
}
pub fn add_operation_with_description(
&mut self,
query: String,
description: Option<String>,
) -> CliResult<()> {
if !self.is_active() {
return Err(CliError::invalid_arguments(
"No active transaction. Use BEGIN to start a transaction.",
));
}
if self.operations.len() >= self.config.max_operations {
return Err(CliError::invalid_arguments(format!(
"Transaction operation limit reached ({}). Commit or rollback the transaction.",
self.config.max_operations
)));
}
if let Some(timeout) = self.config.timeout_seconds {
if let Some(ref meta) = self.metadata {
let elapsed = Utc::now()
.signed_duration_since(meta.started_at)
.num_seconds() as u64;
if elapsed > timeout {
self.fail("Transaction timeout")?;
return Err(CliError::invalid_arguments(format!(
"Transaction timed out after {} seconds",
timeout
)));
}
}
}
let operation = if let Some(desc) = description {
TransactionOperation::with_description(query, desc)
} else {
TransactionOperation::new(query)
};
self.operations.push_back(operation);
if let Some(ref mut meta) = self.metadata {
meta.operation_count = self.operations.len();
}
Ok(())
}
pub fn operations(&self) -> &VecDeque<TransactionOperation> {
&self.operations
}
pub fn prepare(&mut self) -> CliResult<()> {
if !self.is_active() {
return Err(CliError::invalid_arguments("No active transaction"));
}
if let Some(ref mut meta) = self.metadata {
meta.state = TransactionState::Preparing;
}
Ok(())
}
pub fn commit(&mut self) -> CliResult<Vec<String>> {
if !self.is_active() && self.state() != TransactionState::Preparing {
return Err(CliError::invalid_arguments(
"No active transaction to commit",
));
}
let queries: Vec<String> = self.operations.iter().map(|op| op.query.clone()).collect();
if let Some(ref mut meta) = self.metadata {
meta.state = TransactionState::Committed;
meta.ended_at = Some(Utc::now());
}
self.operations.clear();
Ok(queries)
}
pub fn rollback(&mut self) -> CliResult<()> {
if !self.is_active() && self.state() != TransactionState::Preparing {
return Err(CliError::invalid_arguments(
"No active transaction to rollback",
));
}
if let Some(ref mut meta) = self.metadata {
meta.state = TransactionState::RolledBack;
meta.ended_at = Some(Utc::now());
}
self.operations.clear();
Ok(())
}
pub fn fail(&mut self, reason: &str) -> CliResult<()> {
if let Some(ref mut meta) = self.metadata {
meta.state = TransactionState::Failed;
meta.ended_at = Some(Utc::now());
}
self.operations.clear();
Err(CliError::invalid_arguments(format!(
"Transaction failed: {}",
reason
)))
}
pub fn stats(&self) -> TransactionStats {
TransactionStats {
total_transactions: self.transaction_counter,
current_operations: self.operations.len(),
current_state: self.state(),
is_active: self.is_active(),
started_at: self.metadata.as_ref().map(|m| m.started_at),
operation_limit: self.config.max_operations,
timeout_seconds: self.config.timeout_seconds,
}
}
pub fn clear(&mut self) {
self.metadata = None;
self.operations.clear();
}
pub fn config(&self) -> &TransactionConfig {
&self.config
}
pub fn set_config(&mut self, config: TransactionConfig) -> CliResult<()> {
if self.is_active() {
return Err(CliError::invalid_arguments(
"Cannot change configuration while transaction is active",
));
}
self.config = config;
Ok(())
}
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TransactionStats {
pub total_transactions: u64,
pub current_operations: usize,
pub current_state: TransactionState,
pub is_active: bool,
pub started_at: Option<DateTime<Utc>>,
pub operation_limit: usize,
pub timeout_seconds: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transaction_lifecycle() {
let mut manager = TransactionManager::new();
assert_eq!(manager.state(), TransactionState::Idle);
assert!(!manager.is_active());
let tx_id = manager.begin().unwrap();
assert!(!tx_id.is_empty());
assert_eq!(manager.state(), TransactionState::Active);
assert!(manager.is_active());
manager
.add_operation("INSERT DATA { <x> <y> <z> }".to_string())
.unwrap();
assert_eq!(manager.operation_count(), 1);
manager
.add_operation("DELETE DATA { <a> <b> <c> }".to_string())
.unwrap();
assert_eq!(manager.operation_count(), 2);
let queries = manager.commit().unwrap();
assert_eq!(queries.len(), 2);
assert_eq!(manager.state(), TransactionState::Committed);
assert!(!manager.is_active());
assert_eq!(manager.operation_count(), 0);
}
#[test]
fn test_transaction_rollback() {
let mut manager = TransactionManager::new();
manager.begin().unwrap();
manager
.add_operation("INSERT DATA { <x> <y> <z> }".to_string())
.unwrap();
manager
.add_operation("DELETE DATA { <a> <b> <c> }".to_string())
.unwrap();
assert_eq!(manager.operation_count(), 2);
manager.rollback().unwrap();
assert_eq!(manager.state(), TransactionState::RolledBack);
assert_eq!(manager.operation_count(), 0);
assert!(!manager.is_active());
}
#[test]
fn test_nested_transaction_prevention() {
let mut manager = TransactionManager::new();
manager.begin().unwrap();
let result = manager.begin();
assert!(result.is_err());
}
#[test]
fn test_operation_without_transaction() {
let mut manager = TransactionManager::new();
let result = manager.add_operation("INSERT DATA { <x> <y> <z> }".to_string());
assert!(result.is_err());
}
#[test]
fn test_commit_without_transaction() {
let mut manager = TransactionManager::new();
let result = manager.commit();
assert!(result.is_err());
}
#[test]
fn test_rollback_without_transaction() {
let mut manager = TransactionManager::new();
let result = manager.rollback();
assert!(result.is_err());
}
#[test]
fn test_transaction_with_name() {
let mut manager = TransactionManager::new();
manager
.begin_with_name(Some("test-transaction".to_string()))
.unwrap();
let meta = manager.metadata().unwrap();
assert_eq!(meta.name, Some("test-transaction".to_string()));
}
#[test]
fn test_operation_with_description() {
let mut manager = TransactionManager::new();
manager.begin().unwrap();
manager
.add_operation_with_description(
"INSERT DATA { <x> <y> <z> }".to_string(),
Some("Add test triple".to_string()),
)
.unwrap();
let ops = manager.operations();
assert_eq!(ops.len(), 1);
assert_eq!(ops[0].description, Some("Add test triple".to_string()));
}
#[test]
fn test_operation_limit() {
let config = TransactionConfig {
max_operations: 2,
..Default::default()
};
let mut manager = TransactionManager::with_config(config);
manager.begin().unwrap();
manager
.add_operation("INSERT DATA { <x1> <y> <z> }".to_string())
.unwrap();
manager
.add_operation("INSERT DATA { <x2> <y> <z> }".to_string())
.unwrap();
let result = manager.add_operation("INSERT DATA { <x3> <y> <z> }".to_string());
assert!(result.is_err());
}
#[test]
fn test_transaction_stats() {
let mut manager = TransactionManager::new();
let stats = manager.stats();
assert_eq!(stats.total_transactions, 0);
assert_eq!(stats.current_operations, 0);
assert!(!stats.is_active);
manager.begin().unwrap();
manager
.add_operation("INSERT DATA { <x> <y> <z> }".to_string())
.unwrap();
let stats = manager.stats();
assert_eq!(stats.total_transactions, 1);
assert_eq!(stats.current_operations, 1);
assert!(stats.is_active);
}
#[test]
fn test_read_only_config() {
let config = TransactionConfig::read_only();
let mut manager = TransactionManager::with_config(config);
manager.begin().unwrap();
let meta = manager.metadata().unwrap();
assert!(meta.read_only);
}
#[test]
fn test_isolation_levels() {
let config = TransactionConfig::with_isolation(IsolationLevel::Serializable);
let mut manager = TransactionManager::with_config(config);
manager.begin().unwrap();
let meta = manager.metadata().unwrap();
assert_eq!(meta.isolation_level, IsolationLevel::Serializable);
}
#[test]
fn test_clear_transaction() {
let mut manager = TransactionManager::new();
manager.begin().unwrap();
manager
.add_operation("INSERT DATA { <x> <y> <z> }".to_string())
.unwrap();
manager.clear();
assert_eq!(manager.state(), TransactionState::Idle);
assert_eq!(manager.operation_count(), 0);
}
#[test]
fn test_config_update_while_active() {
let mut manager = TransactionManager::new();
manager.begin().unwrap();
let result = manager.set_config(TransactionConfig::default());
assert!(result.is_err());
}
#[test]
fn test_multiple_transactions() {
let mut manager = TransactionManager::new();
manager.begin().unwrap();
manager
.add_operation("INSERT DATA { <x1> <y> <z> }".to_string())
.unwrap();
manager.commit().unwrap();
manager.begin().unwrap();
manager
.add_operation("INSERT DATA { <x2> <y> <z> }".to_string())
.unwrap();
manager.commit().unwrap();
let stats = manager.stats();
assert_eq!(stats.total_transactions, 2);
}
}