ruvector_graph/
transaction.rs

1//! Transaction support for ACID guarantees with MVCC
2//!
3//! Provides multi-version concurrency control for high-throughput concurrent access
4
5use crate::edge::Edge;
6use crate::error::Result;
7use crate::hyperedge::{Hyperedge, HyperedgeId};
8use crate::node::Node;
9use crate::types::{EdgeId, NodeId};
10use dashmap::DashMap;
11use parking_lot::RwLock;
12use std::collections::{HashMap, HashSet};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{SystemTime, UNIX_EPOCH};
16use uuid::Uuid;
17
18/// Transaction isolation level
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum IsolationLevel {
21    /// Dirty reads allowed
22    ReadUncommitted,
23    /// Only committed data visible
24    ReadCommitted,
25    /// Repeatable reads (default)
26    RepeatableRead,
27    /// Full isolation
28    Serializable,
29}
30
31/// Transaction ID type
32pub type TxnId = u64;
33
34/// Timestamp for MVCC
35pub type Timestamp = u64;
36
37/// Get current timestamp
38fn now() -> Timestamp {
39    SystemTime::now()
40        .duration_since(UNIX_EPOCH)
41        .unwrap()
42        .as_micros() as u64
43}
44
45/// Versioned value for MVCC
46#[derive(Debug, Clone)]
47struct Version<T> {
48    /// Creation timestamp
49    created_at: Timestamp,
50    /// Deletion timestamp (None if not deleted)
51    deleted_at: Option<Timestamp>,
52    /// Transaction ID that created this version
53    created_by: TxnId,
54    /// Transaction ID that deleted this version
55    deleted_by: Option<TxnId>,
56    /// The actual value
57    value: T,
58}
59
60/// Transaction state
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62enum TxnState {
63    Active,
64    Committed,
65    Aborted,
66}
67
68/// Transaction metadata
69struct TxnMetadata {
70    id: TxnId,
71    state: TxnState,
72    isolation_level: IsolationLevel,
73    start_time: Timestamp,
74    commit_time: Option<Timestamp>,
75}
76
77/// Transaction manager for MVCC
78pub struct TransactionManager {
79    /// Next transaction ID
80    next_txn_id: AtomicU64,
81    /// Active transactions
82    active_txns: Arc<DashMap<TxnId, TxnMetadata>>,
83    /// Committed transactions (for cleanup)
84    committed_txns: Arc<DashMap<TxnId, Timestamp>>,
85    /// Node versions (key -> list of versions)
86    node_versions: Arc<DashMap<NodeId, Vec<Version<Node>>>>,
87    /// Edge versions
88    edge_versions: Arc<DashMap<EdgeId, Vec<Version<Edge>>>>,
89    /// Hyperedge versions
90    hyperedge_versions: Arc<DashMap<HyperedgeId, Vec<Version<Hyperedge>>>>,
91}
92
93impl TransactionManager {
94    /// Create a new transaction manager
95    pub fn new() -> Self {
96        Self {
97            next_txn_id: AtomicU64::new(1),
98            active_txns: Arc::new(DashMap::new()),
99            committed_txns: Arc::new(DashMap::new()),
100            node_versions: Arc::new(DashMap::new()),
101            edge_versions: Arc::new(DashMap::new()),
102            hyperedge_versions: Arc::new(DashMap::new()),
103        }
104    }
105
106    /// Begin a new transaction
107    pub fn begin(&self, isolation_level: IsolationLevel) -> Transaction {
108        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
109        let start_time = now();
110
111        let metadata = TxnMetadata {
112            id: txn_id,
113            state: TxnState::Active,
114            isolation_level,
115            start_time,
116            commit_time: None,
117        };
118
119        self.active_txns.insert(txn_id, metadata);
120
121        Transaction {
122            id: txn_id,
123            manager: Arc::new(self.clone()),
124            isolation_level,
125            start_time,
126            writes: Arc::new(RwLock::new(WriteSet::new())),
127        }
128    }
129
130    /// Commit a transaction
131    fn commit(&self, txn_id: TxnId, writes: &WriteSet) -> Result<()> {
132        let commit_time = now();
133
134        // Apply all writes
135        for (node_id, node) in &writes.nodes {
136            self.node_versions
137                .entry(node_id.clone())
138                .or_insert_with(Vec::new)
139                .push(Version {
140                    created_at: commit_time,
141                    deleted_at: None,
142                    created_by: txn_id,
143                    deleted_by: None,
144                    value: node.clone(),
145                });
146        }
147
148        for (edge_id, edge) in &writes.edges {
149            self.edge_versions
150                .entry(edge_id.clone())
151                .or_insert_with(Vec::new)
152                .push(Version {
153                    created_at: commit_time,
154                    deleted_at: None,
155                    created_by: txn_id,
156                    deleted_by: None,
157                    value: edge.clone(),
158                });
159        }
160
161        for (hyperedge_id, hyperedge) in &writes.hyperedges {
162            self.hyperedge_versions
163                .entry(hyperedge_id.clone())
164                .or_insert_with(Vec::new)
165                .push(Version {
166                    created_at: commit_time,
167                    deleted_at: None,
168                    created_by: txn_id,
169                    deleted_by: None,
170                    value: hyperedge.clone(),
171                });
172        }
173
174        // Mark deletes
175        for node_id in &writes.deleted_nodes {
176            if let Some(mut versions) = self.node_versions.get_mut(node_id) {
177                if let Some(last) = versions.last_mut() {
178                    last.deleted_at = Some(commit_time);
179                    last.deleted_by = Some(txn_id);
180                }
181            }
182        }
183
184        for edge_id in &writes.deleted_edges {
185            if let Some(mut versions) = self.edge_versions.get_mut(edge_id) {
186                if let Some(last) = versions.last_mut() {
187                    last.deleted_at = Some(commit_time);
188                    last.deleted_by = Some(txn_id);
189                }
190            }
191        }
192
193        // Update transaction state
194        if let Some(mut metadata) = self.active_txns.get_mut(&txn_id) {
195            metadata.state = TxnState::Committed;
196            metadata.commit_time = Some(commit_time);
197        }
198
199        self.active_txns.remove(&txn_id);
200        self.committed_txns.insert(txn_id, commit_time);
201
202        Ok(())
203    }
204
205    /// Abort a transaction
206    fn abort(&self, txn_id: TxnId) -> Result<()> {
207        if let Some(mut metadata) = self.active_txns.get_mut(&txn_id) {
208            metadata.state = TxnState::Aborted;
209        }
210        self.active_txns.remove(&txn_id);
211        Ok(())
212    }
213
214    /// Read a node with MVCC
215    fn read_node(&self, node_id: &NodeId, txn_id: TxnId, start_time: Timestamp) -> Option<Node> {
216        self.node_versions.get(node_id).and_then(|versions| {
217            versions
218                .iter()
219                .rev()
220                .find(|v| {
221                    v.created_at <= start_time
222                        && v.deleted_at.map_or(true, |d| d > start_time)
223                        && v.created_by != txn_id
224                })
225                .map(|v| v.value.clone())
226        })
227    }
228
229    /// Read an edge with MVCC
230    fn read_edge(&self, edge_id: &EdgeId, txn_id: TxnId, start_time: Timestamp) -> Option<Edge> {
231        self.edge_versions.get(edge_id).and_then(|versions| {
232            versions
233                .iter()
234                .rev()
235                .find(|v| {
236                    v.created_at <= start_time
237                        && v.deleted_at.map_or(true, |d| d > start_time)
238                        && v.created_by != txn_id
239                })
240                .map(|v| v.value.clone())
241        })
242    }
243}
244
245impl Clone for TransactionManager {
246    fn clone(&self) -> Self {
247        Self {
248            next_txn_id: AtomicU64::new(self.next_txn_id.load(Ordering::SeqCst)),
249            active_txns: Arc::clone(&self.active_txns),
250            committed_txns: Arc::clone(&self.committed_txns),
251            node_versions: Arc::clone(&self.node_versions),
252            edge_versions: Arc::clone(&self.edge_versions),
253            hyperedge_versions: Arc::clone(&self.hyperedge_versions),
254        }
255    }
256}
257
258impl Default for TransactionManager {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264/// Write set for a transaction
265#[derive(Debug, Clone, Default)]
266struct WriteSet {
267    nodes: HashMap<NodeId, Node>,
268    edges: HashMap<EdgeId, Edge>,
269    hyperedges: HashMap<HyperedgeId, Hyperedge>,
270    deleted_nodes: HashSet<NodeId>,
271    deleted_edges: HashSet<EdgeId>,
272    deleted_hyperedges: HashSet<HyperedgeId>,
273}
274
275impl WriteSet {
276    fn new() -> Self {
277        Self::default()
278    }
279}
280
281/// Transaction handle
282pub struct Transaction {
283    id: TxnId,
284    manager: Arc<TransactionManager>,
285    /// The isolation level for this transaction
286    pub isolation_level: IsolationLevel,
287    start_time: Timestamp,
288    writes: Arc<RwLock<WriteSet>>,
289}
290
291impl Transaction {
292    /// Begin a new standalone transaction
293    ///
294    /// This creates an internal TransactionManager for simple use cases.
295    /// For production use, prefer using a shared TransactionManager.
296    pub fn begin(isolation_level: IsolationLevel) -> Result<Self> {
297        let manager = TransactionManager::new();
298        Ok(manager.begin(isolation_level))
299    }
300
301    /// Get transaction ID
302    pub fn id(&self) -> TxnId {
303        self.id
304    }
305
306    /// Write a node (buffered until commit)
307    pub fn write_node(&self, node: Node) {
308        let mut writes = self.writes.write();
309        writes.nodes.insert(node.id.clone(), node);
310    }
311
312    /// Write an edge (buffered until commit)
313    pub fn write_edge(&self, edge: Edge) {
314        let mut writes = self.writes.write();
315        writes.edges.insert(edge.id.clone(), edge);
316    }
317
318    /// Write a hyperedge (buffered until commit)
319    pub fn write_hyperedge(&self, hyperedge: Hyperedge) {
320        let mut writes = self.writes.write();
321        writes.hyperedges.insert(hyperedge.id.clone(), hyperedge);
322    }
323
324    /// Delete a node (buffered until commit)
325    pub fn delete_node(&self, node_id: NodeId) {
326        let mut writes = self.writes.write();
327        writes.deleted_nodes.insert(node_id);
328    }
329
330    /// Delete an edge (buffered until commit)
331    pub fn delete_edge(&self, edge_id: EdgeId) {
332        let mut writes = self.writes.write();
333        writes.deleted_edges.insert(edge_id);
334    }
335
336    /// Read a node (with MVCC visibility)
337    pub fn read_node(&self, node_id: &NodeId) -> Option<Node> {
338        // Check write set first
339        {
340            let writes = self.writes.read();
341            if writes.deleted_nodes.contains(node_id) {
342                return None;
343            }
344            if let Some(node) = writes.nodes.get(node_id) {
345                return Some(node.clone());
346            }
347        }
348
349        // Read from MVCC store
350        self.manager.read_node(node_id, self.id, self.start_time)
351    }
352
353    /// Read an edge (with MVCC visibility)
354    pub fn read_edge(&self, edge_id: &EdgeId) -> Option<Edge> {
355        // Check write set first
356        {
357            let writes = self.writes.read();
358            if writes.deleted_edges.contains(edge_id) {
359                return None;
360            }
361            if let Some(edge) = writes.edges.get(edge_id) {
362                return Some(edge.clone());
363            }
364        }
365
366        // Read from MVCC store
367        self.manager.read_edge(edge_id, self.id, self.start_time)
368    }
369
370    /// Commit the transaction
371    pub fn commit(self) -> Result<()> {
372        let writes = self.writes.read();
373        self.manager.commit(self.id, &writes)
374    }
375
376    /// Rollback the transaction
377    pub fn rollback(self) -> Result<()> {
378        self.manager.abort(self.id)
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::node::NodeBuilder;
386
387    #[test]
388    fn test_transaction_basic() {
389        let manager = TransactionManager::new();
390        let txn = manager.begin(IsolationLevel::ReadCommitted);
391
392        assert_eq!(txn.isolation_level, IsolationLevel::ReadCommitted);
393        assert!(txn.id() > 0);
394    }
395
396    #[test]
397    fn test_mvcc_read_write() {
398        let manager = TransactionManager::new();
399
400        // Transaction 1: Write a node
401        let txn1 = manager.begin(IsolationLevel::ReadCommitted);
402        let node = NodeBuilder::new()
403            .label("Person")
404            .property("name", "Alice")
405            .build();
406        let node_id = node.id.clone();
407        txn1.write_node(node.clone());
408        txn1.commit().unwrap();
409
410        // Transaction 2: Read the node
411        let txn2 = manager.begin(IsolationLevel::ReadCommitted);
412        let read_node = txn2.read_node(&node_id);
413        assert!(read_node.is_some());
414        assert_eq!(read_node.unwrap().id, node_id);
415    }
416
417    #[test]
418    fn test_transaction_isolation() {
419        let manager = TransactionManager::new();
420
421        let node = NodeBuilder::new().build();
422        let node_id = node.id.clone();
423
424        // Txn1: Write but don't commit
425        let txn1 = manager.begin(IsolationLevel::ReadCommitted);
426        txn1.write_node(node.clone());
427
428        // Txn2: Should not see uncommitted write
429        let txn2 = manager.begin(IsolationLevel::ReadCommitted);
430        assert!(txn2.read_node(&node_id).is_none());
431
432        // Commit txn1
433        txn1.commit().unwrap();
434
435        // Txn3: Should see committed write
436        let txn3 = manager.begin(IsolationLevel::ReadCommitted);
437        assert!(txn3.read_node(&node_id).is_some());
438    }
439}