manifoldb/transaction/
manager.rs

1//! Transaction manager implementation.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6use manifoldb_core::TransactionError;
7use manifoldb_storage::{StorageEngine, StorageError};
8
9use super::batch_writer::{BatchWriter, BatchWriterConfig, BatchedTransaction};
10use super::handle::DatabaseTransaction;
11
12/// Strategy for synchronizing vector index updates with transactions.
13///
14/// This enum controls how vector index updates are handled relative to
15/// the storage transaction lifecycle.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum VectorSyncStrategy {
18    /// Update vector indexes synchronously within the same transaction.
19    ///
20    /// This provides strong consistency at the cost of slower writes.
21    /// All vector index updates are committed atomically with data changes.
22    ///
23    /// This is the default and recommended for correctness.
24    #[default]
25    Synchronous,
26
27    /// Queue vector index updates for asynchronous processing.
28    ///
29    /// This provides faster writes but eventual consistency for vector searches.
30    /// Vector search results may be slightly stale until updates are processed.
31    ///
32    /// Use this for high-throughput scenarios where temporary staleness is acceptable.
33    Async,
34
35    /// Use synchronous updates for small batches, async for large bulk operations.
36    ///
37    /// This provides a balance between write performance and consistency.
38    /// The threshold for switching modes is configurable.
39    Hybrid {
40        /// Number of vector updates before switching to async mode.
41        async_threshold: usize,
42    },
43}
44
45/// Configuration for the transaction manager.
46#[derive(Debug, Clone)]
47pub struct TransactionManagerConfig {
48    /// Strategy for vector index synchronization.
49    pub vector_sync_strategy: VectorSyncStrategy,
50
51    /// Configuration for write batching.
52    pub batch_writer_config: BatchWriterConfig,
53}
54
55impl Default for TransactionManagerConfig {
56    fn default() -> Self {
57        Self {
58            vector_sync_strategy: VectorSyncStrategy::Synchronous,
59            batch_writer_config: BatchWriterConfig::default(),
60        }
61    }
62}
63
64/// Coordinates transactions across storage, graph indexes, and vector indexes.
65///
66/// The `TransactionManager` is the central coordinator for all database transactions.
67/// It manages the lifecycle of transactions and ensures ACID guarantees across
68/// all subsystems.
69///
70/// # Transaction Semantics
71///
72/// - **Read transactions**: Provide snapshot isolation with multiple concurrent readers
73/// - **Write transactions**: Currently serialized (single writer at a time)
74/// - All index updates occur within the same transaction as data changes
75///
76/// # Thread Safety
77///
78/// `TransactionManager` is `Send + Sync` and can be safely shared across threads
79/// using `Arc<TransactionManager>`.
80///
81/// # Example
82///
83/// ```ignore
84/// use manifoldb::transaction::{TransactionManager, VectorSyncStrategy};
85/// use manifoldb_storage::backends::RedbEngine;
86///
87/// let engine = RedbEngine::open("db.redb")?;
88/// let manager = TransactionManager::new(engine);
89///
90/// // Concurrent reads are allowed
91/// let tx1 = manager.begin_read()?;
92/// let tx2 = manager.begin_read()?;
93///
94/// // Write transactions
95/// let mut tx = manager.begin_write()?;
96/// tx.put_entity(&entity)?;
97/// tx.commit()?;
98/// ```
99pub struct TransactionManager<E: StorageEngine> {
100    /// The underlying storage engine.
101    engine: Arc<E>,
102
103    /// Configuration for the transaction manager.
104    config: TransactionManagerConfig,
105
106    /// Counter for generating unique transaction IDs.
107    next_tx_id: AtomicU64,
108
109    /// Batch writer for concurrent write optimization.
110    batch_writer: BatchWriter<E>,
111}
112
113impl<E: StorageEngine> TransactionManager<E> {
114    /// Create a new transaction manager with the given storage engine.
115    ///
116    /// Uses the default configuration with synchronous vector index updates.
117    pub fn new(engine: E) -> Self {
118        Self::with_config(engine, TransactionManagerConfig::default())
119    }
120
121    /// Create a new transaction manager with custom configuration.
122    pub fn with_config(engine: E, config: TransactionManagerConfig) -> Self {
123        let engine = Arc::new(engine);
124        let batch_writer =
125            BatchWriter::new(Arc::clone(&engine), config.batch_writer_config.clone());
126        Self { engine, config, next_tx_id: AtomicU64::new(1), batch_writer }
127    }
128
129    /// Get the vector synchronization strategy.
130    #[must_use]
131    pub const fn vector_sync_strategy(&self) -> VectorSyncStrategy {
132        self.config.vector_sync_strategy
133    }
134
135    /// Begin a read-only transaction.
136    ///
137    /// Read transactions provide a consistent snapshot of the database.
138    /// Multiple read transactions can run concurrently.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the transaction cannot be started.
143    pub fn begin_read(&self) -> Result<DatabaseTransaction<E::Transaction<'_>>, TransactionError> {
144        let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
145        let storage_tx =
146            self.engine.begin_read().map_err(|e| storage_error_to_transaction_error(&e))?;
147
148        Ok(DatabaseTransaction::new_read(tx_id, storage_tx))
149    }
150
151    /// Begin a read-write transaction.
152    ///
153    /// Write transactions allow modifying the database. Currently, write
154    /// transactions are serialized (only one at a time).
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the transaction cannot be started.
159    pub fn begin_write(&self) -> Result<DatabaseTransaction<E::Transaction<'_>>, TransactionError> {
160        let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
161        let storage_tx =
162            self.engine.begin_write().map_err(|e| storage_error_to_transaction_error(&e))?;
163
164        Ok(DatabaseTransaction::new_write(tx_id, storage_tx, self.config.vector_sync_strategy))
165    }
166
167    /// Flush any buffered data to durable storage.
168    ///
169    /// This is typically called after committing important transactions.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the flush fails.
174    pub fn flush(&self) -> Result<(), TransactionError> {
175        self.engine.flush().map_err(|e| storage_error_to_transaction_error(&e))
176    }
177
178    /// Get a reference to the underlying storage engine.
179    ///
180    /// This is primarily useful for testing or advanced use cases.
181    #[must_use]
182    pub fn engine(&self) -> &E {
183        &self.engine
184    }
185
186    /// Get an Arc to the underlying storage engine.
187    ///
188    /// Returns a cloned Arc reference to the engine, useful for creating
189    /// components that need shared ownership of the engine.
190    #[must_use]
191    pub fn engine_arc(&self) -> Arc<E> {
192        Arc::clone(&self.engine)
193    }
194
195    /// Begin a batched write transaction.
196    ///
197    /// Batched transactions buffer writes locally and commit them through
198    /// the batch writer for group commit optimization. This can significantly
199    /// improve throughput under concurrent load.
200    ///
201    /// Unlike regular write transactions, batched transactions:
202    /// - Buffer all writes in memory until commit
203    /// - Are committed together with other concurrent transactions
204    /// - Provide read-your-own-writes semantics within the transaction
205    ///
206    /// # Example
207    ///
208    /// ```ignore
209    /// let mut tx = manager.begin_batched_write();
210    /// tx.put("table", b"key", b"value")?;
211    /// tx.commit()?;  // Batched with other concurrent commits
212    /// ```
213    #[must_use]
214    pub fn begin_batched_write(&self) -> BatchedTransaction<E> {
215        self.batch_writer.begin()
216    }
217
218    /// Get a reference to the batch writer.
219    ///
220    /// This provides access to the batch writer for manual control over
221    /// batched transactions, including flushing pending writes.
222    #[must_use]
223    pub fn batch_writer(&self) -> &BatchWriter<E> {
224        &self.batch_writer
225    }
226
227    /// Flush any pending batched writes immediately.
228    ///
229    /// This forces all pending batched transactions to be committed,
230    /// even if the batch size or flush interval hasn't been reached.
231    pub fn flush_batched(&self) -> Result<(), TransactionError> {
232        self.batch_writer.flush()
233    }
234
235    /// Get the number of pending batched transactions.
236    #[must_use]
237    pub fn pending_batched_count(&self) -> usize {
238        self.batch_writer.pending_count()
239    }
240
241    /// Get the batch writer configuration.
242    #[must_use]
243    pub fn batch_writer_config(&self) -> &BatchWriterConfig {
244        &self.config.batch_writer_config
245    }
246}
247
248/// Convert a storage error to a transaction error.
249fn storage_error_to_transaction_error(err: &StorageError) -> TransactionError {
250    match err {
251        StorageError::ReadOnly => TransactionError::ReadOnly,
252        StorageError::Conflict(msg) => TransactionError::Conflict(msg.clone()),
253        StorageError::Serialization(msg) => TransactionError::Serialization(msg.clone()),
254        StorageError::NotFound(msg) => TransactionError::EntityNotFound(msg.clone()),
255        StorageError::KeyNotFound => TransactionError::EntityNotFound("key not found".to_string()),
256        _ => TransactionError::Storage(err.to_string()),
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_vector_sync_strategy_default() {
266        let strategy = VectorSyncStrategy::default();
267        assert_eq!(strategy, VectorSyncStrategy::Synchronous);
268    }
269
270    #[test]
271    fn test_config_default() {
272        let config = TransactionManagerConfig::default();
273        assert_eq!(config.vector_sync_strategy, VectorSyncStrategy::Synchronous);
274    }
275}