use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use manifoldb_core::TransactionError;
use manifoldb_storage::{StorageEngine, StorageError};
use super::batch_writer::{BatchWriter, BatchWriterConfig, BatchedTransaction};
use super::handle::DatabaseTransaction;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VectorSyncStrategy {
#[default]
Synchronous,
Async,
Hybrid {
async_threshold: usize,
},
}
#[derive(Debug, Clone)]
pub struct TransactionManagerConfig {
pub vector_sync_strategy: VectorSyncStrategy,
pub batch_writer_config: BatchWriterConfig,
}
impl Default for TransactionManagerConfig {
fn default() -> Self {
Self {
vector_sync_strategy: VectorSyncStrategy::Synchronous,
batch_writer_config: BatchWriterConfig::default(),
}
}
}
pub struct TransactionManager<E: StorageEngine> {
engine: Arc<E>,
config: TransactionManagerConfig,
next_tx_id: AtomicU64,
batch_writer: BatchWriter<E>,
}
impl<E: StorageEngine> TransactionManager<E> {
pub fn new(engine: E) -> Self {
Self::with_config(engine, TransactionManagerConfig::default())
}
pub fn with_config(engine: E, config: TransactionManagerConfig) -> Self {
let engine = Arc::new(engine);
let batch_writer =
BatchWriter::new(Arc::clone(&engine), config.batch_writer_config.clone());
Self { engine, config, next_tx_id: AtomicU64::new(1), batch_writer }
}
#[must_use]
pub const fn vector_sync_strategy(&self) -> VectorSyncStrategy {
self.config.vector_sync_strategy
}
pub fn begin_read(&self) -> Result<DatabaseTransaction<E::Transaction<'_>>, TransactionError> {
let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
let storage_tx =
self.engine.begin_read().map_err(|e| storage_error_to_transaction_error(&e))?;
Ok(DatabaseTransaction::new_read(tx_id, storage_tx))
}
pub fn begin_write(&self) -> Result<DatabaseTransaction<E::Transaction<'_>>, TransactionError> {
let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
let storage_tx =
self.engine.begin_write().map_err(|e| storage_error_to_transaction_error(&e))?;
Ok(DatabaseTransaction::new_write(tx_id, storage_tx, self.config.vector_sync_strategy))
}
pub fn flush(&self) -> Result<(), TransactionError> {
self.engine.flush().map_err(|e| storage_error_to_transaction_error(&e))
}
#[must_use]
pub fn engine(&self) -> &E {
&self.engine
}
#[must_use]
pub fn engine_arc(&self) -> Arc<E> {
Arc::clone(&self.engine)
}
#[must_use]
pub fn begin_batched_write(&self) -> BatchedTransaction<E> {
self.batch_writer.begin()
}
#[must_use]
pub fn batch_writer(&self) -> &BatchWriter<E> {
&self.batch_writer
}
pub fn flush_batched(&self) -> Result<(), TransactionError> {
self.batch_writer.flush()
}
#[must_use]
pub fn pending_batched_count(&self) -> usize {
self.batch_writer.pending_count()
}
#[must_use]
pub fn batch_writer_config(&self) -> &BatchWriterConfig {
&self.config.batch_writer_config
}
}
fn storage_error_to_transaction_error(err: &StorageError) -> TransactionError {
match err {
StorageError::ReadOnly => TransactionError::ReadOnly,
StorageError::Conflict(msg) => TransactionError::Conflict(msg.clone()),
StorageError::Serialization(msg) => TransactionError::Serialization(msg.clone()),
StorageError::NotFound(msg) => TransactionError::EntityNotFound(msg.clone()),
StorageError::KeyNotFound => TransactionError::EntityNotFound("key not found".to_string()),
_ => TransactionError::Storage(err.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_vector_sync_strategy_default() {
let strategy = VectorSyncStrategy::default();
assert_eq!(strategy, VectorSyncStrategy::Synchronous);
}
#[test]
fn test_config_default() {
let config = TransactionManagerConfig::default();
assert_eq!(config.vector_sync_strategy, VectorSyncStrategy::Synchronous);
}
}