Skip to main content

oxirs_core/distributed/bft/
state_machine.rs

1//! RDF state machine for Byzantine fault tolerant consensus
2
3use super::types::*;
4use crate::model::{BlankNode, Literal, NamedNode, Triple};
5use anyhow::Result;
6use sha2::{Digest, Sha256};
7use std::collections::HashSet;
8
9/// RDF state machine
10pub struct RdfStateMachine {
11    /// Triple store
12    triples: HashSet<Triple>,
13
14    /// Operation counter
15    operation_count: u64,
16
17    /// State digest cache
18    digest_cache: Option<(u64, Vec<u8>)>,
19}
20
21impl RdfStateMachine {
22    /// Create a new RDF state machine
23    pub fn new() -> Self {
24        Self {
25            triples: HashSet::new(),
26            operation_count: 0,
27            digest_cache: None,
28        }
29    }
30
31    /// Execute an RDF operation
32    pub fn execute(&mut self, operation: RdfOperation) -> Result<OperationResult> {
33        self.operation_count += 1;
34        self.digest_cache = None; // Invalidate cache
35
36        match operation {
37            RdfOperation::Insert(triple) => {
38                let t = self.deserialize_triple(triple)?;
39                self.triples.insert(t);
40                Ok(OperationResult::Success)
41            }
42
43            RdfOperation::Remove(triple) => {
44                let t = self.deserialize_triple(triple)?;
45                self.triples.remove(&t);
46                Ok(OperationResult::Success)
47            }
48
49            RdfOperation::BatchInsert(triples) => {
50                for triple in triples {
51                    let t = self.deserialize_triple(triple)?;
52                    self.triples.insert(t);
53                }
54                Ok(OperationResult::Success)
55            }
56
57            RdfOperation::BatchRemove(triples) => {
58                for triple in triples {
59                    let t = self.deserialize_triple(triple)?;
60                    self.triples.remove(&t);
61                }
62                Ok(OperationResult::Success)
63            }
64
65            RdfOperation::Query(_query) => {
66                // Simplified - would execute SPARQL query
67                let results: Vec<SerializableTriple> = self
68                    .triples
69                    .iter()
70                    .take(10) // Limit results
71                    .map(|t| self.serialize_triple(t))
72                    .collect();
73                Ok(OperationResult::QueryResult(results))
74            }
75        }
76    }
77
78    /// Get state digest
79    pub fn get_state_digest(&self) -> Vec<u8> {
80        // For the read-only version, we need to calculate without mutating
81        // In a real implementation, we might use a different caching strategy
82        self.calculate_digest_readonly()
83    }
84
85    /// Calculate state digest (read-only version)
86    fn calculate_digest_readonly(&self) -> Vec<u8> {
87        let mut hasher = Sha256::new();
88
89        // Sort triples for deterministic digest
90        let mut sorted_triples: Vec<_> = self.triples.iter().collect();
91        sorted_triples.sort_by_key(|t| {
92            (
93                t.subject().to_string(),
94                t.predicate().to_string(),
95                t.object().to_string(),
96            )
97        });
98
99        for triple in sorted_triples {
100            hasher.update(triple.subject().to_string().as_bytes());
101            hasher.update(triple.predicate().to_string().as_bytes());
102            hasher.update(triple.object().to_string().as_bytes());
103        }
104
105        hasher.update(self.operation_count.to_le_bytes());
106        hasher.finalize().to_vec()
107    }
108
109    /// Calculate state digest (mutable version with caching)
110    pub fn calculate_digest(&mut self) -> Vec<u8> {
111        // Check cache
112        if let Some((count, digest)) = &self.digest_cache {
113            if *count == self.operation_count {
114                return digest.clone();
115            }
116        }
117
118        // Calculate new digest
119        let digest = self.calculate_digest_readonly();
120
121        // Cache the digest
122        self.digest_cache = Some((self.operation_count, digest.clone()));
123
124        digest
125    }
126
127    /// Get current triple count
128    pub fn triple_count(&self) -> usize {
129        self.triples.len()
130    }
131
132    /// Get operation count
133    pub fn operation_count(&self) -> u64 {
134        self.operation_count
135    }
136
137    /// Check if a triple exists
138    pub fn contains_triple(&self, triple: &SerializableTriple) -> Result<bool> {
139        let t = self.deserialize_triple(triple.clone())?;
140        Ok(self.triples.contains(&t))
141    }
142
143    /// Get all triples (for debugging/testing)
144    pub fn get_all_triples(&self) -> Vec<SerializableTriple> {
145        self.triples
146            .iter()
147            .map(|t| self.serialize_triple(t))
148            .collect()
149    }
150
151    /// Deserialize a triple from network format
152    fn deserialize_triple(&self, st: SerializableTriple) -> Result<Triple> {
153        let subject = NamedNode::new(&st.subject)?;
154        let predicate = NamedNode::new(&st.predicate)?;
155
156        let object = match st.object_type {
157            ObjectType::NamedNode => crate::model::Object::NamedNode(NamedNode::new(&st.object)?),
158            ObjectType::BlankNode => crate::model::Object::BlankNode(BlankNode::new(&st.object)?),
159            ObjectType::Literal { datatype, language } => {
160                if let Some(lang) = language {
161                    crate::model::Object::Literal(Literal::new_language_tagged_literal(
162                        &st.object, &lang,
163                    )?)
164                } else if let Some(dt) = datatype {
165                    crate::model::Object::Literal(Literal::new_typed(
166                        &st.object,
167                        NamedNode::new(&dt)?,
168                    ))
169                } else {
170                    crate::model::Object::Literal(Literal::new(&st.object))
171                }
172            }
173        };
174
175        Ok(Triple::new(subject, predicate, object))
176    }
177
178    /// Serialize a triple for network transmission
179    fn serialize_triple(&self, triple: &Triple) -> SerializableTriple {
180        let object_type = match triple.object() {
181            crate::model::Object::NamedNode(_) => ObjectType::NamedNode,
182            crate::model::Object::BlankNode(_) => ObjectType::BlankNode,
183            crate::model::Object::Literal(lit) => ObjectType::Literal {
184                datatype: if lit.datatype().as_str() != "http://www.w3.org/2001/XMLSchema#string" {
185                    Some(lit.datatype().as_str().to_string())
186                } else {
187                    None
188                },
189                language: lit.language().map(|l| l.to_string()),
190            },
191            _ => ObjectType::NamedNode, // Fallback
192        };
193
194        SerializableTriple {
195            subject: triple.subject().to_string(),
196            predicate: triple.predicate().to_string(),
197            object: triple.object().to_string(),
198            object_type,
199        }
200    }
201
202    /// Reset state machine to initial state
203    pub fn reset(&mut self) {
204        self.triples.clear();
205        self.operation_count = 0;
206        self.digest_cache = None;
207    }
208
209    /// Apply a batch of operations atomically
210    pub fn apply_batch(&mut self, operations: Vec<RdfOperation>) -> Result<Vec<OperationResult>> {
211        let mut results = Vec::new();
212        let initial_count = self.operation_count;
213        let initial_cache = self.digest_cache.clone();
214
215        // Try to apply all operations
216        for operation in operations {
217            match self.execute(operation) {
218                Ok(result) => results.push(result),
219                Err(e) => {
220                    // Rollback on any failure
221                    self.operation_count = initial_count;
222                    self.digest_cache = initial_cache;
223                    return Err(e);
224                }
225            }
226        }
227
228        Ok(results)
229    }
230}
231
232impl Default for RdfStateMachine {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn test_state_machine_basic_operations() {
244        let mut state_machine = RdfStateMachine::new();
245
246        // Test insert
247        let triple = SerializableTriple {
248            subject: "http://example.org/s".to_string(),
249            predicate: "http://example.org/p".to_string(),
250            object: "value".to_string(),
251            object_type: ObjectType::Literal {
252                datatype: None,
253                language: None,
254            },
255        };
256
257        let result = state_machine
258            .execute(RdfOperation::Insert(triple.clone()))
259            .expect("operation should succeed");
260        assert!(matches!(result, OperationResult::Success));
261        assert_eq!(state_machine.triple_count(), 1);
262        assert_eq!(state_machine.operation_count(), 1);
263
264        // Test contains
265        assert!(state_machine
266            .contains_triple(&triple)
267            .expect("contains_triple should succeed"));
268
269        // Test remove
270        let result = state_machine
271            .execute(RdfOperation::Remove(triple.clone()))
272            .expect("operation should succeed");
273        assert!(matches!(result, OperationResult::Success));
274        assert_eq!(state_machine.triple_count(), 0);
275        assert!(!state_machine
276            .contains_triple(&triple)
277            .expect("contains_triple should succeed"));
278    }
279
280    #[test]
281    fn test_state_machine_batch_operations() {
282        let mut state_machine = RdfStateMachine::new();
283
284        let triples = vec![
285            SerializableTriple {
286                subject: "http://example.org/s1".to_string(),
287                predicate: "http://example.org/p".to_string(),
288                object: "value1".to_string(),
289                object_type: ObjectType::Literal {
290                    datatype: None,
291                    language: None,
292                },
293            },
294            SerializableTriple {
295                subject: "http://example.org/s2".to_string(),
296                predicate: "http://example.org/p".to_string(),
297                object: "value2".to_string(),
298                object_type: ObjectType::Literal {
299                    datatype: None,
300                    language: None,
301                },
302            },
303        ];
304
305        // Test batch insert
306        let result = state_machine
307            .execute(RdfOperation::BatchInsert(triples.clone()))
308            .expect("operation should succeed");
309        assert!(matches!(result, OperationResult::Success));
310        assert_eq!(state_machine.triple_count(), 2);
311
312        // Test batch remove
313        let result = state_machine
314            .execute(RdfOperation::BatchRemove(triples))
315            .expect("operation should succeed");
316        assert!(matches!(result, OperationResult::Success));
317        assert_eq!(state_machine.triple_count(), 0);
318    }
319
320    #[test]
321    fn test_state_machine_digest_calculation() {
322        let mut state_machine = RdfStateMachine::new();
323
324        let triple = SerializableTriple {
325            subject: "http://example.org/s".to_string(),
326            predicate: "http://example.org/p".to_string(),
327            object: "value".to_string(),
328            object_type: ObjectType::Literal {
329                datatype: None,
330                language: None,
331            },
332        };
333
334        // Initial digest
335        let digest1 = state_machine.calculate_digest();
336
337        // Add triple
338        state_machine
339            .execute(RdfOperation::Insert(triple.clone()))
340            .expect("operation should succeed");
341        let digest2 = state_machine.calculate_digest();
342
343        // Digests should be different
344        assert_ne!(digest1, digest2);
345
346        // Same state should produce same digest
347        let digest3 = state_machine.calculate_digest();
348        assert_eq!(digest2, digest3);
349
350        // Read-only version should match
351        let digest4 = state_machine.get_state_digest();
352        assert_eq!(digest2, digest4);
353    }
354
355    #[test]
356    fn test_query_operation() {
357        let mut state_machine = RdfStateMachine::new();
358
359        // Add some triples
360        for i in 0..15 {
361            let triple = SerializableTriple {
362                subject: format!("http://example.org/s{i}"),
363                predicate: "http://example.org/p".to_string(),
364                object: format!("value{i}"),
365                object_type: ObjectType::Literal {
366                    datatype: None,
367                    language: None,
368                },
369            };
370            state_machine
371                .execute(RdfOperation::Insert(triple))
372                .expect("state machine execution should succeed");
373        }
374
375        // Test query (should return max 10 results)
376        let result = state_machine
377            .execute(RdfOperation::Query(
378                "SELECT * WHERE { ?s ?p ?o }".to_string(),
379            ))
380            .expect("operation should succeed");
381
382        if let OperationResult::QueryResult(results) = result {
383            assert_eq!(results.len(), 10); // Limited to 10 results
384        } else {
385            panic!("Expected QueryResult");
386        }
387    }
388
389    #[test]
390    fn test_different_object_types() {
391        let mut state_machine = RdfStateMachine::new();
392
393        // Test NamedNode object
394        let triple1 = SerializableTriple {
395            subject: "http://example.org/s".to_string(),
396            predicate: "http://example.org/p".to_string(),
397            object: "http://example.org/o".to_string(),
398            object_type: ObjectType::NamedNode,
399        };
400
401        // Test BlankNode object
402        let triple2 = SerializableTriple {
403            subject: "http://example.org/s".to_string(),
404            predicate: "http://example.org/p2".to_string(),
405            object: "_:blank1".to_string(),
406            object_type: ObjectType::BlankNode,
407        };
408
409        // Test typed literal
410        let triple3 = SerializableTriple {
411            subject: "http://example.org/s".to_string(),
412            predicate: "http://example.org/p3".to_string(),
413            object: "42".to_string(),
414            object_type: ObjectType::Literal {
415                datatype: Some("http://www.w3.org/2001/XMLSchema#integer".to_string()),
416                language: None,
417            },
418        };
419
420        // Test language-tagged literal
421        let triple4 = SerializableTriple {
422            subject: "http://example.org/s".to_string(),
423            predicate: "http://example.org/p4".to_string(),
424            object: "hello".to_string(),
425            object_type: ObjectType::Literal {
426                datatype: None,
427                language: Some("en".to_string()),
428            },
429        };
430
431        // Insert all triples
432        state_machine
433            .execute(RdfOperation::Insert(triple1))
434            .expect("operation should succeed");
435        state_machine
436            .execute(RdfOperation::Insert(triple2))
437            .expect("operation should succeed");
438        state_machine
439            .execute(RdfOperation::Insert(triple3))
440            .expect("operation should succeed");
441        state_machine
442            .execute(RdfOperation::Insert(triple4))
443            .expect("operation should succeed");
444
445        assert_eq!(state_machine.triple_count(), 4);
446    }
447}