1use crate::edge::Edge;
6use crate::error::Result;
7use crate::hyperedge::{Hyperedge, HyperedgeId};
8use crate::node::Node;
9use crate::types::{EdgeId, NodeId};
10use dashmap::DashMap;
11use parking_lot::RwLock;
12use std::collections::{HashMap, HashSet};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{SystemTime, UNIX_EPOCH};
16use uuid::Uuid;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum IsolationLevel {
21 ReadUncommitted,
23 ReadCommitted,
25 RepeatableRead,
27 Serializable,
29}
30
31pub type TxnId = u64;
33
34pub type Timestamp = u64;
36
37fn now() -> Timestamp {
39 SystemTime::now()
40 .duration_since(UNIX_EPOCH)
41 .unwrap()
42 .as_micros() as u64
43}
44
45#[derive(Debug, Clone)]
47struct Version<T> {
48 created_at: Timestamp,
50 deleted_at: Option<Timestamp>,
52 created_by: TxnId,
54 deleted_by: Option<TxnId>,
56 value: T,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62enum TxnState {
63 Active,
64 Committed,
65 Aborted,
66}
67
68struct TxnMetadata {
70 id: TxnId,
71 state: TxnState,
72 isolation_level: IsolationLevel,
73 start_time: Timestamp,
74 commit_time: Option<Timestamp>,
75}
76
77pub struct TransactionManager {
79 next_txn_id: AtomicU64,
81 active_txns: Arc<DashMap<TxnId, TxnMetadata>>,
83 committed_txns: Arc<DashMap<TxnId, Timestamp>>,
85 node_versions: Arc<DashMap<NodeId, Vec<Version<Node>>>>,
87 edge_versions: Arc<DashMap<EdgeId, Vec<Version<Edge>>>>,
89 hyperedge_versions: Arc<DashMap<HyperedgeId, Vec<Version<Hyperedge>>>>,
91}
92
93impl TransactionManager {
94 pub fn new() -> Self {
96 Self {
97 next_txn_id: AtomicU64::new(1),
98 active_txns: Arc::new(DashMap::new()),
99 committed_txns: Arc::new(DashMap::new()),
100 node_versions: Arc::new(DashMap::new()),
101 edge_versions: Arc::new(DashMap::new()),
102 hyperedge_versions: Arc::new(DashMap::new()),
103 }
104 }
105
106 pub fn begin(&self, isolation_level: IsolationLevel) -> Transaction {
108 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
109 let start_time = now();
110
111 let metadata = TxnMetadata {
112 id: txn_id,
113 state: TxnState::Active,
114 isolation_level,
115 start_time,
116 commit_time: None,
117 };
118
119 self.active_txns.insert(txn_id, metadata);
120
121 Transaction {
122 id: txn_id,
123 manager: Arc::new(self.clone()),
124 isolation_level,
125 start_time,
126 writes: Arc::new(RwLock::new(WriteSet::new())),
127 }
128 }
129
130 fn commit(&self, txn_id: TxnId, writes: &WriteSet) -> Result<()> {
132 let commit_time = now();
133
134 for (node_id, node) in &writes.nodes {
136 self.node_versions
137 .entry(node_id.clone())
138 .or_insert_with(Vec::new)
139 .push(Version {
140 created_at: commit_time,
141 deleted_at: None,
142 created_by: txn_id,
143 deleted_by: None,
144 value: node.clone(),
145 });
146 }
147
148 for (edge_id, edge) in &writes.edges {
149 self.edge_versions
150 .entry(edge_id.clone())
151 .or_insert_with(Vec::new)
152 .push(Version {
153 created_at: commit_time,
154 deleted_at: None,
155 created_by: txn_id,
156 deleted_by: None,
157 value: edge.clone(),
158 });
159 }
160
161 for (hyperedge_id, hyperedge) in &writes.hyperedges {
162 self.hyperedge_versions
163 .entry(hyperedge_id.clone())
164 .or_insert_with(Vec::new)
165 .push(Version {
166 created_at: commit_time,
167 deleted_at: None,
168 created_by: txn_id,
169 deleted_by: None,
170 value: hyperedge.clone(),
171 });
172 }
173
174 for node_id in &writes.deleted_nodes {
176 if let Some(mut versions) = self.node_versions.get_mut(node_id) {
177 if let Some(last) = versions.last_mut() {
178 last.deleted_at = Some(commit_time);
179 last.deleted_by = Some(txn_id);
180 }
181 }
182 }
183
184 for edge_id in &writes.deleted_edges {
185 if let Some(mut versions) = self.edge_versions.get_mut(edge_id) {
186 if let Some(last) = versions.last_mut() {
187 last.deleted_at = Some(commit_time);
188 last.deleted_by = Some(txn_id);
189 }
190 }
191 }
192
193 if let Some(mut metadata) = self.active_txns.get_mut(&txn_id) {
195 metadata.state = TxnState::Committed;
196 metadata.commit_time = Some(commit_time);
197 }
198
199 self.active_txns.remove(&txn_id);
200 self.committed_txns.insert(txn_id, commit_time);
201
202 Ok(())
203 }
204
205 fn abort(&self, txn_id: TxnId) -> Result<()> {
207 if let Some(mut metadata) = self.active_txns.get_mut(&txn_id) {
208 metadata.state = TxnState::Aborted;
209 }
210 self.active_txns.remove(&txn_id);
211 Ok(())
212 }
213
214 fn read_node(&self, node_id: &NodeId, txn_id: TxnId, start_time: Timestamp) -> Option<Node> {
216 self.node_versions.get(node_id).and_then(|versions| {
217 versions
218 .iter()
219 .rev()
220 .find(|v| {
221 v.created_at <= start_time
222 && v.deleted_at.map_or(true, |d| d > start_time)
223 && v.created_by != txn_id
224 })
225 .map(|v| v.value.clone())
226 })
227 }
228
229 fn read_edge(&self, edge_id: &EdgeId, txn_id: TxnId, start_time: Timestamp) -> Option<Edge> {
231 self.edge_versions.get(edge_id).and_then(|versions| {
232 versions
233 .iter()
234 .rev()
235 .find(|v| {
236 v.created_at <= start_time
237 && v.deleted_at.map_or(true, |d| d > start_time)
238 && v.created_by != txn_id
239 })
240 .map(|v| v.value.clone())
241 })
242 }
243}
244
245impl Clone for TransactionManager {
246 fn clone(&self) -> Self {
247 Self {
248 next_txn_id: AtomicU64::new(self.next_txn_id.load(Ordering::SeqCst)),
249 active_txns: Arc::clone(&self.active_txns),
250 committed_txns: Arc::clone(&self.committed_txns),
251 node_versions: Arc::clone(&self.node_versions),
252 edge_versions: Arc::clone(&self.edge_versions),
253 hyperedge_versions: Arc::clone(&self.hyperedge_versions),
254 }
255 }
256}
257
258impl Default for TransactionManager {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264#[derive(Debug, Clone, Default)]
266struct WriteSet {
267 nodes: HashMap<NodeId, Node>,
268 edges: HashMap<EdgeId, Edge>,
269 hyperedges: HashMap<HyperedgeId, Hyperedge>,
270 deleted_nodes: HashSet<NodeId>,
271 deleted_edges: HashSet<EdgeId>,
272 deleted_hyperedges: HashSet<HyperedgeId>,
273}
274
275impl WriteSet {
276 fn new() -> Self {
277 Self::default()
278 }
279}
280
281pub struct Transaction {
283 id: TxnId,
284 manager: Arc<TransactionManager>,
285 pub isolation_level: IsolationLevel,
287 start_time: Timestamp,
288 writes: Arc<RwLock<WriteSet>>,
289}
290
291impl Transaction {
292 pub fn begin(isolation_level: IsolationLevel) -> Result<Self> {
297 let manager = TransactionManager::new();
298 Ok(manager.begin(isolation_level))
299 }
300
301 pub fn id(&self) -> TxnId {
303 self.id
304 }
305
306 pub fn write_node(&self, node: Node) {
308 let mut writes = self.writes.write();
309 writes.nodes.insert(node.id.clone(), node);
310 }
311
312 pub fn write_edge(&self, edge: Edge) {
314 let mut writes = self.writes.write();
315 writes.edges.insert(edge.id.clone(), edge);
316 }
317
318 pub fn write_hyperedge(&self, hyperedge: Hyperedge) {
320 let mut writes = self.writes.write();
321 writes.hyperedges.insert(hyperedge.id.clone(), hyperedge);
322 }
323
324 pub fn delete_node(&self, node_id: NodeId) {
326 let mut writes = self.writes.write();
327 writes.deleted_nodes.insert(node_id);
328 }
329
330 pub fn delete_edge(&self, edge_id: EdgeId) {
332 let mut writes = self.writes.write();
333 writes.deleted_edges.insert(edge_id);
334 }
335
336 pub fn read_node(&self, node_id: &NodeId) -> Option<Node> {
338 {
340 let writes = self.writes.read();
341 if writes.deleted_nodes.contains(node_id) {
342 return None;
343 }
344 if let Some(node) = writes.nodes.get(node_id) {
345 return Some(node.clone());
346 }
347 }
348
349 self.manager.read_node(node_id, self.id, self.start_time)
351 }
352
353 pub fn read_edge(&self, edge_id: &EdgeId) -> Option<Edge> {
355 {
357 let writes = self.writes.read();
358 if writes.deleted_edges.contains(edge_id) {
359 return None;
360 }
361 if let Some(edge) = writes.edges.get(edge_id) {
362 return Some(edge.clone());
363 }
364 }
365
366 self.manager.read_edge(edge_id, self.id, self.start_time)
368 }
369
370 pub fn commit(self) -> Result<()> {
372 let writes = self.writes.read();
373 self.manager.commit(self.id, &writes)
374 }
375
376 pub fn rollback(self) -> Result<()> {
378 self.manager.abort(self.id)
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::node::NodeBuilder;
386
387 #[test]
388 fn test_transaction_basic() {
389 let manager = TransactionManager::new();
390 let txn = manager.begin(IsolationLevel::ReadCommitted);
391
392 assert_eq!(txn.isolation_level, IsolationLevel::ReadCommitted);
393 assert!(txn.id() > 0);
394 }
395
396 #[test]
397 fn test_mvcc_read_write() {
398 let manager = TransactionManager::new();
399
400 let txn1 = manager.begin(IsolationLevel::ReadCommitted);
402 let node = NodeBuilder::new()
403 .label("Person")
404 .property("name", "Alice")
405 .build();
406 let node_id = node.id.clone();
407 txn1.write_node(node.clone());
408 txn1.commit().unwrap();
409
410 let txn2 = manager.begin(IsolationLevel::ReadCommitted);
412 let read_node = txn2.read_node(&node_id);
413 assert!(read_node.is_some());
414 assert_eq!(read_node.unwrap().id, node_id);
415 }
416
417 #[test]
418 fn test_transaction_isolation() {
419 let manager = TransactionManager::new();
420
421 let node = NodeBuilder::new().build();
422 let node_id = node.id.clone();
423
424 let txn1 = manager.begin(IsolationLevel::ReadCommitted);
426 txn1.write_node(node.clone());
427
428 let txn2 = manager.begin(IsolationLevel::ReadCommitted);
430 assert!(txn2.read_node(&node_id).is_none());
431
432 txn1.commit().unwrap();
434
435 let txn3 = manager.begin(IsolationLevel::ReadCommitted);
437 assert!(txn3.read_node(&node_id).is_some());
438 }
439}