use std::sync::{Arc, Mutex, RwLock};
use ahash::AHashMap;
use uuid::Uuid;
use crate::error::{LaurusError, Result};
use crate::lexical::core::document::Document;
use crate::lexical::index::inverted::segment::manager::SegmentManager;
use crate::lexical::index::inverted::segment::merge_engine::MergeEngine;
use crate::maintenance::deletion::{DeletionManager, GlobalDeletionState};
use crate::storage::Storage;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadCommitted,
RepeatableRead,
Serializable,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
Active,
Preparing,
Committed,
Aborted,
}
#[derive(Debug, Clone)]
pub enum TransactionOperation {
AddDocument {
document: Document,
segment_id: Option<String>,
},
UpdateDocument {
field: String,
value: String,
new_document: Document,
},
DeleteDocuments { field: String, value: String },
MergeSegments {
segment_ids: Vec<String>,
strategy: crate::lexical::index::inverted::segment::manager::MergeStrategy,
},
}
#[derive(Debug)]
pub struct Transaction {
pub id: String,
pub isolation_level: IsolationLevel,
pub state: TransactionState,
pub start_time: u64,
pub operations: Vec<TransactionOperation>,
pub created_segments: Vec<String>,
pub modified_segments: Vec<String>,
pub pre_deletion_state: Option<GlobalDeletionState>,
pub thread_id: Option<std::thread::ThreadId>,
}
impl Transaction {
pub fn new(isolation_level: IsolationLevel) -> Self {
let id = Uuid::new_v4().to_string();
let start_time = crate::util::time::now_millis();
Transaction {
id,
isolation_level,
state: TransactionState::Active,
start_time,
operations: Vec::new(),
created_segments: Vec::new(),
modified_segments: Vec::new(),
pre_deletion_state: None,
thread_id: Some(std::thread::current().id()),
}
}
pub fn add_operation(&mut self, operation: TransactionOperation) -> Result<()> {
if self.state != TransactionState::Active {
return Err(LaurusError::index(
"Cannot add operations to inactive transaction",
));
}
self.operations.push(operation);
Ok(())
}
pub fn prepare(&mut self) -> Result<()> {
if self.state != TransactionState::Active {
return Err(LaurusError::index("Cannot prepare inactive transaction"));
}
self.state = TransactionState::Preparing;
Ok(())
}
pub fn commit(&mut self) -> Result<()> {
if self.state != TransactionState::Preparing {
return Err(LaurusError::index("Cannot commit unprepared transaction"));
}
self.state = TransactionState::Committed;
Ok(())
}
pub fn abort(&mut self) -> Result<()> {
if self.state == TransactionState::Committed {
return Err(LaurusError::index("Cannot abort committed transaction"));
}
self.state = TransactionState::Aborted;
Ok(())
}
pub fn is_active(&self) -> bool {
self.state == TransactionState::Active
}
pub fn duration_ms(&self) -> u64 {
let now = crate::util::time::now_millis();
now.saturating_sub(self.start_time)
}
}
#[derive(Debug)]
pub struct TransactionManager {
active_transactions: Arc<RwLock<AHashMap<String, Arc<Mutex<Transaction>>>>>,
transaction_counter: Arc<Mutex<u64>>,
#[allow(dead_code)]
storage: Arc<dyn Storage>,
}
impl TransactionManager {
pub fn new(storage: Arc<dyn Storage>) -> Self {
TransactionManager {
active_transactions: Arc::new(RwLock::new(AHashMap::new())),
transaction_counter: Arc::new(Mutex::new(0)),
storage,
}
}
#[deprecated(
since = "0.2.0",
note = "Use `new()` instead. Schema is no longer required."
)]
pub fn with_schema(
storage: Arc<dyn Storage>,
schema: Arc<crate::lexical::core::field::FieldValue>,
) -> Self {
let _ = schema; Self::new(storage)
}
pub fn begin_transaction(
&self,
isolation_level: IsolationLevel,
) -> Result<Arc<Mutex<Transaction>>> {
let transaction = Arc::new(Mutex::new(Transaction::new(isolation_level)));
let transaction_id = transaction.lock().unwrap().id.clone();
{
let mut counter = self.transaction_counter.lock().unwrap();
*counter += 1;
}
{
let mut active = self.active_transactions.write().unwrap();
active.insert(transaction_id, transaction.clone());
}
Ok(transaction)
}
pub fn commit_transaction(
&self,
transaction: Arc<Mutex<Transaction>>,
segment_manager: &mut SegmentManager,
deletion_manager: &mut DeletionManager,
merge_engine: &MergeEngine,
) -> Result<TransactionResult> {
let transaction_id = {
let mut txn = transaction.lock().unwrap();
txn.prepare()?;
txn.id.clone()
};
let result = self.execute_transaction_operations(
&transaction,
segment_manager,
deletion_manager,
merge_engine,
);
match result {
Ok(commit_result) => {
{
let mut txn = transaction.lock().unwrap();
txn.commit()?;
}
{
let mut active = self.active_transactions.write().unwrap();
active.remove(&transaction_id);
}
Ok(commit_result)
}
Err(e) => {
self.rollback_transaction(transaction, segment_manager, deletion_manager)?;
Err(e)
}
}
}
pub fn rollback_transaction(
&self,
transaction: Arc<Mutex<Transaction>>,
segment_manager: &mut SegmentManager,
deletion_manager: &mut DeletionManager,
) -> Result<()> {
let (transaction_id, created_segments, pre_deletion_state) = {
let mut txn = transaction.lock().unwrap();
txn.abort()?;
(
txn.id.clone(),
txn.created_segments.clone(),
txn.pre_deletion_state.clone(),
)
};
for segment_id in &created_segments {
segment_manager.remove_segment(segment_id)?;
}
if let Some(deletion_state) = pre_deletion_state {
deletion_manager.restore_global_state(deletion_state)?;
}
{
let mut active = self.active_transactions.write().unwrap();
active.remove(&transaction_id);
}
Ok(())
}
fn execute_transaction_operations(
&self,
transaction: &Arc<Mutex<Transaction>>,
_segment_manager: &mut SegmentManager,
deletion_manager: &mut DeletionManager,
_merge_engine: &MergeEngine,
) -> Result<TransactionResult> {
let operations = {
let txn = transaction.lock().unwrap();
txn.operations.clone()
};
let mut result = TransactionResult::new();
{
let mut txn = transaction.lock().unwrap();
txn.pre_deletion_state = Some(deletion_manager.get_global_state().clone());
}
for operation in operations {
match operation {
TransactionOperation::AddDocument {
document: _,
segment_id,
} => {
let actual_segment_id = if let Some(sid) = segment_id {
sid
} else {
format!("segment_{:06}", 0)
};
result.docs_added += 1;
{
let mut txn = transaction.lock().unwrap();
txn.modified_segments.push(actual_segment_id);
}
}
TransactionOperation::UpdateDocument {
field: _,
value: _,
new_document: _,
} => {
result.docs_deleted += 1;
result.docs_added += 1;
result.docs_updated += 1;
}
TransactionOperation::DeleteDocuments { field: _, value: _ } => {
result.docs_deleted += 1;
}
TransactionOperation::MergeSegments {
segment_ids,
strategy: _,
} => {
result.segments_merged += segment_ids.len();
result.merge_operations += 1;
}
}
}
Ok(result)
}
pub fn active_transaction_count(&self) -> usize {
self.active_transactions.read().unwrap().len()
}
pub fn total_transaction_count(&self) -> u64 {
*self.transaction_counter.lock().unwrap()
}
pub fn detect_deadlocks(&self) -> Vec<String> {
Vec::new()
}
pub fn get_transaction(&self, transaction_id: &str) -> Option<Arc<Mutex<Transaction>>> {
let active = self.active_transactions.read().unwrap();
active.get(transaction_id).cloned()
}
}
#[derive(Debug, Default)]
pub struct TransactionResult {
pub docs_added: u64,
pub docs_deleted: u64,
pub docs_updated: u64,
pub segments_merged: usize,
pub merge_operations: u64,
pub commit_time_ms: u64,
}
impl TransactionResult {
pub fn new() -> Self {
Self::default()
}
}
pub trait AtomicOperations {
fn execute_atomically<F, R>(&mut self, operations: F) -> Result<R>
where
F: FnOnce(&mut Self) -> Result<R>;
fn begin_atomic_session(&mut self) -> Result<String>;
fn commit_atomic_session(&mut self, session_id: &str) -> Result<TransactionResult>;
fn rollback_atomic_session(&mut self, session_id: &str) -> Result<()>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::MemoryStorage;
use crate::storage::memory::MemoryStorageConfig;
#[test]
fn test_transaction_creation() {
let txn = Transaction::new(IsolationLevel::ReadCommitted);
assert_eq!(txn.isolation_level, IsolationLevel::ReadCommitted);
assert_eq!(txn.state, TransactionState::Active);
assert!(txn.is_active());
assert!(!txn.id.is_empty());
assert!(txn.operations.is_empty());
}
#[test]
fn test_transaction_state_machine() {
let mut txn = Transaction::new(IsolationLevel::ReadCommitted);
assert!(txn.prepare().is_ok());
assert_eq!(txn.state, TransactionState::Preparing);
assert!(txn.commit().is_ok());
assert_eq!(txn.state, TransactionState::Committed);
assert!(!txn.is_active());
assert!(txn.abort().is_err());
}
#[test]
fn test_transaction_operations() {
let mut txn = Transaction::new(IsolationLevel::ReadCommitted);
let doc = crate::lexical::core::document::Document::builder()
.add_text("title", "Test")
.build();
let op = TransactionOperation::AddDocument {
document: doc,
segment_id: None,
};
assert!(txn.add_operation(op).is_ok());
assert_eq!(txn.operations.len(), 1);
txn.state = TransactionState::Committed;
let doc2 = crate::lexical::core::document::Document::builder()
.add_text("title", "Test2")
.build();
let op2 = TransactionOperation::AddDocument {
document: doc2,
segment_id: None,
};
assert!(txn.add_operation(op2).is_err());
}
#[test]
fn test_transaction_manager() {
let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default()));
let manager = TransactionManager::new(storage);
assert_eq!(manager.active_transaction_count(), 0);
assert_eq!(manager.total_transaction_count(), 0);
let txn = manager
.begin_transaction(IsolationLevel::ReadCommitted)
.unwrap();
assert_eq!(manager.active_transaction_count(), 1);
assert_eq!(manager.total_transaction_count(), 1);
let transaction_id = txn.lock().unwrap().id.clone();
let retrieved = manager.get_transaction(&transaction_id);
assert!(retrieved.is_some());
}
#[test]
fn test_isolation_levels() {
let levels = [
IsolationLevel::ReadCommitted,
IsolationLevel::RepeatableRead,
IsolationLevel::Serializable,
];
for level in levels {
let txn = Transaction::new(level);
assert_eq!(txn.isolation_level, level);
}
}
#[test]
fn test_transaction_duration() {
let txn = Transaction::new(IsolationLevel::ReadCommitted);
std::thread::sleep(std::time::Duration::from_millis(10));
assert!(txn.duration_ms() >= 10);
}
}