Skip to main content

oxirs_core/transaction/
named_graph.rs

1//! Named Graph Transaction Support
2//!
3//! This module provides transactional operations specifically for named graphs,
4//! integrating MVCC and ACID guarantees with graph-level operations.
5//!
6//! Features:
7//! - Atomic multi-graph operations
8//! - Graph-level isolation
9//! - MVCC snapshot isolation per graph
10//! - Efficient graph cloning and merging
11
12use super::{AcidTransaction, IsolationLevel, TransactionId, TransactionState};
13use crate::model::{GraphName, NamedNode, Object, Predicate, Quad, Subject};
14use crate::OxirsError;
15use ahash::{AHashMap, AHashSet};
16use std::sync::{Arc, RwLock};
17
18/// Named graph transaction that extends ACID transactions with graph-specific operations
19pub struct NamedGraphTransaction {
20    /// Underlying ACID transaction
21    inner: AcidTransaction,
22    /// Per-graph operation tracking
23    graph_operations: AHashMap<GraphName, GraphOperations>,
24    /// Graph-level locks (for serializable isolation)
25    graph_locks: Arc<RwLock<AHashSet<GraphName>>>,
26}
27
28/// Operations performed on a specific graph
29#[derive(Debug, Clone, Default)]
30struct GraphOperations {
31    /// Quads inserted into this graph
32    inserts: Vec<Quad>,
33    /// Quads deleted from this graph
34    deletes: Vec<Quad>,
35    /// Whether the entire graph was cleared
36    cleared: bool,
37    /// Whether the graph was created
38    created: bool,
39    /// Whether the graph was dropped
40    dropped: bool,
41}
42
43impl NamedGraphTransaction {
44    /// Create a new named graph transaction
45    pub(super) fn new(
46        inner: AcidTransaction,
47        graph_locks: Arc<RwLock<AHashSet<GraphName>>>,
48    ) -> Self {
49        Self {
50            inner,
51            graph_operations: AHashMap::new(),
52            graph_locks,
53        }
54    }
55
56    /// Get the transaction ID
57    pub fn id(&self) -> TransactionId {
58        self.inner.id()
59    }
60
61    /// Get the transaction state
62    pub fn state(&self) -> TransactionState {
63        self.inner.state()
64    }
65
66    /// Get the isolation level
67    pub fn isolation(&self) -> IsolationLevel {
68        self.inner.isolation()
69    }
70
71    /// Insert a quad into a named graph
72    pub fn insert_into_graph(
73        &mut self,
74        graph: GraphName,
75        subject: Subject,
76        predicate: Predicate,
77        object: Object,
78    ) -> Result<bool, OxirsError> {
79        // Create quad with the specified graph
80        let quad = Quad::new(subject, predicate, object, graph.clone());
81
82        // Track the operation
83        let ops = self.graph_operations.entry(graph).or_default();
84        ops.inserts.push(quad.clone());
85
86        // Delegate to underlying transaction
87        self.inner.insert(quad)
88    }
89
90    /// Delete a quad from a named graph
91    pub fn delete_from_graph(
92        &mut self,
93        graph: GraphName,
94        subject: Subject,
95        predicate: Predicate,
96        object: Object,
97    ) -> Result<bool, OxirsError> {
98        // Create quad with the specified graph
99        let quad = Quad::new(subject, predicate, object, graph.clone());
100
101        // Track the operation
102        let ops = self.graph_operations.entry(graph).or_default();
103        ops.deletes.push(quad.clone());
104
105        // Delegate to underlying transaction
106        self.inner.delete(quad)
107    }
108
109    /// Clear all quads from a named graph
110    pub fn clear_graph(&mut self, graph: GraphName) -> Result<usize, OxirsError> {
111        // Track the operation
112        let ops = self.graph_operations.entry(graph.clone()).or_default();
113        ops.cleared = true;
114
115        // In practice, you would query all quads in the graph and delete them
116        // For now, we return a placeholder
117        Ok(0)
118    }
119
120    /// Create a new named graph
121    pub fn create_graph(&mut self, graph: NamedNode) -> Result<(), OxirsError> {
122        let graph_name = GraphName::NamedNode(graph);
123
124        // Track the operation
125        let ops = self.graph_operations.entry(graph_name).or_default();
126        if ops.dropped {
127            return Err(OxirsError::Store(
128                "Cannot create a graph that was dropped in the same transaction".to_string(),
129            ));
130        }
131        ops.created = true;
132
133        Ok(())
134    }
135
136    /// Drop a named graph
137    pub fn drop_graph(&mut self, graph: NamedNode) -> Result<(), OxirsError> {
138        let graph_name = GraphName::NamedNode(graph);
139
140        // Check if created in same transaction
141        let should_remove = if let Some(ops) = self.graph_operations.get(&graph_name) {
142            ops.created
143        } else {
144            false
145        };
146
147        if should_remove {
148            // If created and dropped in same transaction, just remove the entry
149            self.graph_operations.remove(&graph_name);
150        } else {
151            // Track the operation
152            let ops = self.graph_operations.entry(graph_name).or_default();
153            ops.dropped = true;
154            ops.cleared = true; // Dropping clears all data
155        }
156
157        Ok(())
158    }
159
160    /// Copy one graph to another atomically
161    pub fn copy_graph(
162        &mut self,
163        _source: GraphName,
164        _destination: GraphName,
165    ) -> Result<usize, OxirsError> {
166        // This would:
167        // 1. Read all quads from source graph
168        // 2. Clear destination graph
169        // 3. Insert all quads into destination graph
170        // All atomically within this transaction
171
172        // Placeholder implementation
173        Ok(0)
174    }
175
176    /// Move one graph to another atomically
177    pub fn move_graph(
178        &mut self,
179        source: GraphName,
180        destination: GraphName,
181    ) -> Result<usize, OxirsError> {
182        // This is equivalent to COPY + DROP source
183        let count = self.copy_graph(source.clone(), destination)?;
184        if let GraphName::NamedNode(node) = source {
185            self.drop_graph(node)?;
186        }
187        Ok(count)
188    }
189
190    /// Add (merge) one graph into another atomically
191    pub fn add_graph(
192        &mut self,
193        _source: GraphName,
194        _destination: GraphName,
195    ) -> Result<usize, OxirsError> {
196        // This would:
197        // 1. Read all quads from source graph
198        // 2. Insert all quads into destination graph (without clearing)
199        // All atomically within this transaction
200
201        // Placeholder implementation
202        Ok(0)
203    }
204
205    /// Get statistics about operations on a specific graph
206    pub fn graph_stats(&self, graph: &GraphName) -> Option<GraphStats> {
207        self.graph_operations.get(graph).map(|ops| GraphStats {
208            inserts: ops.inserts.len(),
209            deletes: ops.deletes.len(),
210            cleared: ops.cleared,
211            created: ops.created,
212            dropped: ops.dropped,
213        })
214    }
215
216    /// Get all graphs modified in this transaction
217    pub fn modified_graphs(&self) -> Vec<GraphName> {
218        self.graph_operations.keys().cloned().collect()
219    }
220
221    /// Acquire a lock on a graph (for serializable isolation)
222    pub fn lock_graph(&mut self, graph: GraphName) -> Result<(), OxirsError> {
223        if self.isolation() == IsolationLevel::Serializable {
224            let mut locks = self.graph_locks.write().map_err(|e| {
225                OxirsError::ConcurrencyError(format!("Failed to acquire graph lock: {}", e))
226            })?;
227
228            if !locks.insert(graph) {
229                return Err(OxirsError::ConcurrencyError(
230                    "Graph is already locked by another transaction".to_string(),
231                ));
232            }
233        }
234        Ok(())
235    }
236
237    /// Release all graph locks
238    fn release_locks(&self) -> Result<(), OxirsError> {
239        let mut locks = self.graph_locks.write().map_err(|e| {
240            OxirsError::ConcurrencyError(format!("Failed to release graph locks: {}", e))
241        })?;
242
243        for graph in self.graph_operations.keys() {
244            locks.remove(graph);
245        }
246
247        Ok(())
248    }
249
250    /// Commit the transaction
251    pub fn commit(self) -> Result<(), OxirsError> {
252        // Release locks before commit
253        self.release_locks()?;
254
255        // Delegate to underlying transaction
256        self.inner.commit()
257    }
258
259    /// Rollback the transaction (abort)
260    pub fn rollback(self) -> Result<(), OxirsError> {
261        // Release locks before rollback
262        self.release_locks()?;
263
264        // Delegate to underlying transaction (use abort)
265        self.inner.abort()
266    }
267}
268
269/// Statistics about graph operations in a transaction
270#[derive(Debug, Clone)]
271pub struct GraphStats {
272    /// Number of quads inserted
273    pub inserts: usize,
274    /// Number of quads deleted
275    pub deletes: usize,
276    /// Whether the graph was cleared
277    pub cleared: bool,
278    /// Whether the graph was created
279    pub created: bool,
280    /// Whether the graph was dropped
281    pub dropped: bool,
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::model::{Literal, NamedNode};
288
289    fn create_test_quad(graph: GraphName) -> Quad {
290        Quad::new(
291            Subject::NamedNode(NamedNode::new("http://example.org/subject").unwrap()),
292            Predicate::from(NamedNode::new("http://example.org/predicate").unwrap()),
293            Object::Literal(Literal::new("test")),
294            graph,
295        )
296    }
297
298    #[test]
299    fn test_graph_operations_tracking() {
300        // This test would require a full transaction manager setup
301        // For now, we test the data structures
302
303        let graph = GraphName::NamedNode(NamedNode::new("http://example.org/graph1").unwrap());
304
305        let mut ops = GraphOperations::default();
306        ops.inserts.push(create_test_quad(graph.clone()));
307
308        assert_eq!(ops.inserts.len(), 1);
309        assert_eq!(ops.deletes.len(), 0);
310        assert!(!ops.cleared);
311    }
312
313    #[test]
314    fn test_graph_stats() {
315        let mut ops = GraphOperations::default();
316        ops.inserts.push(create_test_quad(GraphName::DefaultGraph));
317        ops.cleared = true;
318
319        assert_eq!(ops.inserts.len(), 1);
320        assert!(ops.cleared);
321    }
322}