manifoldb/transaction/manager.rs
1//! Transaction manager implementation.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6use manifoldb_core::TransactionError;
7use manifoldb_storage::{StorageEngine, StorageError};
8
9use super::batch_writer::{BatchWriter, BatchWriterConfig, BatchedTransaction};
10use super::handle::DatabaseTransaction;
11
12/// Strategy for synchronizing vector index updates with transactions.
13///
14/// This enum controls how vector index updates are handled relative to
15/// the storage transaction lifecycle.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum VectorSyncStrategy {
18 /// Update vector indexes synchronously within the same transaction.
19 ///
20 /// This provides strong consistency at the cost of slower writes.
21 /// All vector index updates are committed atomically with data changes.
22 ///
23 /// This is the default and recommended for correctness.
24 #[default]
25 Synchronous,
26
27 /// Queue vector index updates for asynchronous processing.
28 ///
29 /// This provides faster writes but eventual consistency for vector searches.
30 /// Vector search results may be slightly stale until updates are processed.
31 ///
32 /// Use this for high-throughput scenarios where temporary staleness is acceptable.
33 Async,
34
35 /// Use synchronous updates for small batches, async for large bulk operations.
36 ///
37 /// This provides a balance between write performance and consistency.
38 /// The threshold for switching modes is configurable.
39 Hybrid {
40 /// Number of vector updates before switching to async mode.
41 async_threshold: usize,
42 },
43}
44
45/// Configuration for the transaction manager.
46#[derive(Debug, Clone)]
47pub struct TransactionManagerConfig {
48 /// Strategy for vector index synchronization.
49 pub vector_sync_strategy: VectorSyncStrategy,
50
51 /// Configuration for write batching.
52 pub batch_writer_config: BatchWriterConfig,
53}
54
55impl Default for TransactionManagerConfig {
56 fn default() -> Self {
57 Self {
58 vector_sync_strategy: VectorSyncStrategy::Synchronous,
59 batch_writer_config: BatchWriterConfig::default(),
60 }
61 }
62}
63
64/// Coordinates transactions across storage, graph indexes, and vector indexes.
65///
66/// The `TransactionManager` is the central coordinator for all database transactions.
67/// It manages the lifecycle of transactions and ensures ACID guarantees across
68/// all subsystems.
69///
70/// # Transaction Semantics
71///
72/// - **Read transactions**: Provide snapshot isolation with multiple concurrent readers
73/// - **Write transactions**: Currently serialized (single writer at a time)
74/// - All index updates occur within the same transaction as data changes
75///
76/// # Thread Safety
77///
78/// `TransactionManager` is `Send + Sync` and can be safely shared across threads
79/// using `Arc<TransactionManager>`.
80///
81/// # Example
82///
83/// ```ignore
84/// use manifoldb::transaction::{TransactionManager, VectorSyncStrategy};
85/// use manifoldb_storage::backends::RedbEngine;
86///
87/// let engine = RedbEngine::open("db.redb")?;
88/// let manager = TransactionManager::new(engine);
89///
90/// // Concurrent reads are allowed
91/// let tx1 = manager.begin_read()?;
92/// let tx2 = manager.begin_read()?;
93///
94/// // Write transactions
95/// let mut tx = manager.begin_write()?;
96/// tx.put_entity(&entity)?;
97/// tx.commit()?;
98/// ```
99pub struct TransactionManager<E: StorageEngine> {
100 /// The underlying storage engine.
101 engine: Arc<E>,
102
103 /// Configuration for the transaction manager.
104 config: TransactionManagerConfig,
105
106 /// Counter for generating unique transaction IDs.
107 next_tx_id: AtomicU64,
108
109 /// Batch writer for concurrent write optimization.
110 batch_writer: BatchWriter<E>,
111}
112
113impl<E: StorageEngine> TransactionManager<E> {
114 /// Create a new transaction manager with the given storage engine.
115 ///
116 /// Uses the default configuration with synchronous vector index updates.
117 pub fn new(engine: E) -> Self {
118 Self::with_config(engine, TransactionManagerConfig::default())
119 }
120
121 /// Create a new transaction manager with custom configuration.
122 pub fn with_config(engine: E, config: TransactionManagerConfig) -> Self {
123 let engine = Arc::new(engine);
124 let batch_writer =
125 BatchWriter::new(Arc::clone(&engine), config.batch_writer_config.clone());
126 Self { engine, config, next_tx_id: AtomicU64::new(1), batch_writer }
127 }
128
129 /// Get the vector synchronization strategy.
130 #[must_use]
131 pub const fn vector_sync_strategy(&self) -> VectorSyncStrategy {
132 self.config.vector_sync_strategy
133 }
134
135 /// Begin a read-only transaction.
136 ///
137 /// Read transactions provide a consistent snapshot of the database.
138 /// Multiple read transactions can run concurrently.
139 ///
140 /// # Errors
141 ///
142 /// Returns an error if the transaction cannot be started.
143 pub fn begin_read(&self) -> Result<DatabaseTransaction<E::Transaction<'_>>, TransactionError> {
144 let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
145 let storage_tx =
146 self.engine.begin_read().map_err(|e| storage_error_to_transaction_error(&e))?;
147
148 Ok(DatabaseTransaction::new_read(tx_id, storage_tx))
149 }
150
151 /// Begin a read-write transaction.
152 ///
153 /// Write transactions allow modifying the database. Currently, write
154 /// transactions are serialized (only one at a time).
155 ///
156 /// # Errors
157 ///
158 /// Returns an error if the transaction cannot be started.
159 pub fn begin_write(&self) -> Result<DatabaseTransaction<E::Transaction<'_>>, TransactionError> {
160 let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
161 let storage_tx =
162 self.engine.begin_write().map_err(|e| storage_error_to_transaction_error(&e))?;
163
164 Ok(DatabaseTransaction::new_write(tx_id, storage_tx, self.config.vector_sync_strategy))
165 }
166
167 /// Flush any buffered data to durable storage.
168 ///
169 /// This is typically called after committing important transactions.
170 ///
171 /// # Errors
172 ///
173 /// Returns an error if the flush fails.
174 pub fn flush(&self) -> Result<(), TransactionError> {
175 self.engine.flush().map_err(|e| storage_error_to_transaction_error(&e))
176 }
177
178 /// Get a reference to the underlying storage engine.
179 ///
180 /// This is primarily useful for testing or advanced use cases.
181 #[must_use]
182 pub fn engine(&self) -> &E {
183 &self.engine
184 }
185
186 /// Get an Arc to the underlying storage engine.
187 ///
188 /// Returns a cloned Arc reference to the engine, useful for creating
189 /// components that need shared ownership of the engine.
190 #[must_use]
191 pub fn engine_arc(&self) -> Arc<E> {
192 Arc::clone(&self.engine)
193 }
194
195 /// Begin a batched write transaction.
196 ///
197 /// Batched transactions buffer writes locally and commit them through
198 /// the batch writer for group commit optimization. This can significantly
199 /// improve throughput under concurrent load.
200 ///
201 /// Unlike regular write transactions, batched transactions:
202 /// - Buffer all writes in memory until commit
203 /// - Are committed together with other concurrent transactions
204 /// - Provide read-your-own-writes semantics within the transaction
205 ///
206 /// # Example
207 ///
208 /// ```ignore
209 /// let mut tx = manager.begin_batched_write();
210 /// tx.put("table", b"key", b"value")?;
211 /// tx.commit()?; // Batched with other concurrent commits
212 /// ```
213 #[must_use]
214 pub fn begin_batched_write(&self) -> BatchedTransaction<E> {
215 self.batch_writer.begin()
216 }
217
218 /// Get a reference to the batch writer.
219 ///
220 /// This provides access to the batch writer for manual control over
221 /// batched transactions, including flushing pending writes.
222 #[must_use]
223 pub fn batch_writer(&self) -> &BatchWriter<E> {
224 &self.batch_writer
225 }
226
227 /// Flush any pending batched writes immediately.
228 ///
229 /// This forces all pending batched transactions to be committed,
230 /// even if the batch size or flush interval hasn't been reached.
231 pub fn flush_batched(&self) -> Result<(), TransactionError> {
232 self.batch_writer.flush()
233 }
234
235 /// Get the number of pending batched transactions.
236 #[must_use]
237 pub fn pending_batched_count(&self) -> usize {
238 self.batch_writer.pending_count()
239 }
240
241 /// Get the batch writer configuration.
242 #[must_use]
243 pub fn batch_writer_config(&self) -> &BatchWriterConfig {
244 &self.config.batch_writer_config
245 }
246}
247
248/// Convert a storage error to a transaction error.
249fn storage_error_to_transaction_error(err: &StorageError) -> TransactionError {
250 match err {
251 StorageError::ReadOnly => TransactionError::ReadOnly,
252 StorageError::Conflict(msg) => TransactionError::Conflict(msg.clone()),
253 StorageError::Serialization(msg) => TransactionError::Serialization(msg.clone()),
254 StorageError::NotFound(msg) => TransactionError::EntityNotFound(msg.clone()),
255 StorageError::KeyNotFound => TransactionError::EntityNotFound("key not found".to_string()),
256 _ => TransactionError::Storage(err.to_string()),
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_vector_sync_strategy_default() {
266 let strategy = VectorSyncStrategy::default();
267 assert_eq!(strategy, VectorSyncStrategy::Synchronous);
268 }
269
270 #[test]
271 fn test_config_default() {
272 let config = TransactionManagerConfig::default();
273 assert_eq!(config.vector_sync_strategy, VectorSyncStrategy::Synchronous);
274 }
275}