oxirs_core/distributed/bft/
state_machine.rs1use super::types::*;
4use crate::model::{BlankNode, Literal, NamedNode, Triple};
5use anyhow::Result;
6use sha2::{Digest, Sha256};
7use std::collections::HashSet;
8
9pub struct RdfStateMachine {
11 triples: HashSet<Triple>,
13
14 operation_count: u64,
16
17 digest_cache: Option<(u64, Vec<u8>)>,
19}
20
21impl RdfStateMachine {
22 pub fn new() -> Self {
24 Self {
25 triples: HashSet::new(),
26 operation_count: 0,
27 digest_cache: None,
28 }
29 }
30
31 pub fn execute(&mut self, operation: RdfOperation) -> Result<OperationResult> {
33 self.operation_count += 1;
34 self.digest_cache = None; match operation {
37 RdfOperation::Insert(triple) => {
38 let t = self.deserialize_triple(triple)?;
39 self.triples.insert(t);
40 Ok(OperationResult::Success)
41 }
42
43 RdfOperation::Remove(triple) => {
44 let t = self.deserialize_triple(triple)?;
45 self.triples.remove(&t);
46 Ok(OperationResult::Success)
47 }
48
49 RdfOperation::BatchInsert(triples) => {
50 for triple in triples {
51 let t = self.deserialize_triple(triple)?;
52 self.triples.insert(t);
53 }
54 Ok(OperationResult::Success)
55 }
56
57 RdfOperation::BatchRemove(triples) => {
58 for triple in triples {
59 let t = self.deserialize_triple(triple)?;
60 self.triples.remove(&t);
61 }
62 Ok(OperationResult::Success)
63 }
64
65 RdfOperation::Query(_query) => {
66 let results: Vec<SerializableTriple> = self
68 .triples
69 .iter()
70 .take(10) .map(|t| self.serialize_triple(t))
72 .collect();
73 Ok(OperationResult::QueryResult(results))
74 }
75 }
76 }
77
78 pub fn get_state_digest(&self) -> Vec<u8> {
80 self.calculate_digest_readonly()
83 }
84
85 fn calculate_digest_readonly(&self) -> Vec<u8> {
87 let mut hasher = Sha256::new();
88
89 let mut sorted_triples: Vec<_> = self.triples.iter().collect();
91 sorted_triples.sort_by_key(|t| {
92 (
93 t.subject().to_string(),
94 t.predicate().to_string(),
95 t.object().to_string(),
96 )
97 });
98
99 for triple in sorted_triples {
100 hasher.update(triple.subject().to_string().as_bytes());
101 hasher.update(triple.predicate().to_string().as_bytes());
102 hasher.update(triple.object().to_string().as_bytes());
103 }
104
105 hasher.update(self.operation_count.to_le_bytes());
106 hasher.finalize().to_vec()
107 }
108
109 pub fn calculate_digest(&mut self) -> Vec<u8> {
111 if let Some((count, digest)) = &self.digest_cache {
113 if *count == self.operation_count {
114 return digest.clone();
115 }
116 }
117
118 let digest = self.calculate_digest_readonly();
120
121 self.digest_cache = Some((self.operation_count, digest.clone()));
123
124 digest
125 }
126
127 pub fn triple_count(&self) -> usize {
129 self.triples.len()
130 }
131
132 pub fn operation_count(&self) -> u64 {
134 self.operation_count
135 }
136
137 pub fn contains_triple(&self, triple: &SerializableTriple) -> Result<bool> {
139 let t = self.deserialize_triple(triple.clone())?;
140 Ok(self.triples.contains(&t))
141 }
142
143 pub fn get_all_triples(&self) -> Vec<SerializableTriple> {
145 self.triples
146 .iter()
147 .map(|t| self.serialize_triple(t))
148 .collect()
149 }
150
151 fn deserialize_triple(&self, st: SerializableTriple) -> Result<Triple> {
153 let subject = NamedNode::new(&st.subject)?;
154 let predicate = NamedNode::new(&st.predicate)?;
155
156 let object = match st.object_type {
157 ObjectType::NamedNode => crate::model::Object::NamedNode(NamedNode::new(&st.object)?),
158 ObjectType::BlankNode => crate::model::Object::BlankNode(BlankNode::new(&st.object)?),
159 ObjectType::Literal { datatype, language } => {
160 if let Some(lang) = language {
161 crate::model::Object::Literal(Literal::new_language_tagged_literal(
162 &st.object, &lang,
163 )?)
164 } else if let Some(dt) = datatype {
165 crate::model::Object::Literal(Literal::new_typed(
166 &st.object,
167 NamedNode::new(&dt)?,
168 ))
169 } else {
170 crate::model::Object::Literal(Literal::new(&st.object))
171 }
172 }
173 };
174
175 Ok(Triple::new(subject, predicate, object))
176 }
177
178 fn serialize_triple(&self, triple: &Triple) -> SerializableTriple {
180 let object_type = match triple.object() {
181 crate::model::Object::NamedNode(_) => ObjectType::NamedNode,
182 crate::model::Object::BlankNode(_) => ObjectType::BlankNode,
183 crate::model::Object::Literal(lit) => ObjectType::Literal {
184 datatype: if lit.datatype().as_str() != "http://www.w3.org/2001/XMLSchema#string" {
185 Some(lit.datatype().as_str().to_string())
186 } else {
187 None
188 },
189 language: lit.language().map(|l| l.to_string()),
190 },
191 _ => ObjectType::NamedNode, };
193
194 SerializableTriple {
195 subject: triple.subject().to_string(),
196 predicate: triple.predicate().to_string(),
197 object: triple.object().to_string(),
198 object_type,
199 }
200 }
201
202 pub fn reset(&mut self) {
204 self.triples.clear();
205 self.operation_count = 0;
206 self.digest_cache = None;
207 }
208
209 pub fn apply_batch(&mut self, operations: Vec<RdfOperation>) -> Result<Vec<OperationResult>> {
211 let mut results = Vec::new();
212 let initial_count = self.operation_count;
213 let initial_cache = self.digest_cache.clone();
214
215 for operation in operations {
217 match self.execute(operation) {
218 Ok(result) => results.push(result),
219 Err(e) => {
220 self.operation_count = initial_count;
222 self.digest_cache = initial_cache;
223 return Err(e);
224 }
225 }
226 }
227
228 Ok(results)
229 }
230}
231
232impl Default for RdfStateMachine {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn test_state_machine_basic_operations() {
244 let mut state_machine = RdfStateMachine::new();
245
246 let triple = SerializableTriple {
248 subject: "http://example.org/s".to_string(),
249 predicate: "http://example.org/p".to_string(),
250 object: "value".to_string(),
251 object_type: ObjectType::Literal {
252 datatype: None,
253 language: None,
254 },
255 };
256
257 let result = state_machine
258 .execute(RdfOperation::Insert(triple.clone()))
259 .expect("operation should succeed");
260 assert!(matches!(result, OperationResult::Success));
261 assert_eq!(state_machine.triple_count(), 1);
262 assert_eq!(state_machine.operation_count(), 1);
263
264 assert!(state_machine
266 .contains_triple(&triple)
267 .expect("contains_triple should succeed"));
268
269 let result = state_machine
271 .execute(RdfOperation::Remove(triple.clone()))
272 .expect("operation should succeed");
273 assert!(matches!(result, OperationResult::Success));
274 assert_eq!(state_machine.triple_count(), 0);
275 assert!(!state_machine
276 .contains_triple(&triple)
277 .expect("contains_triple should succeed"));
278 }
279
280 #[test]
281 fn test_state_machine_batch_operations() {
282 let mut state_machine = RdfStateMachine::new();
283
284 let triples = vec![
285 SerializableTriple {
286 subject: "http://example.org/s1".to_string(),
287 predicate: "http://example.org/p".to_string(),
288 object: "value1".to_string(),
289 object_type: ObjectType::Literal {
290 datatype: None,
291 language: None,
292 },
293 },
294 SerializableTriple {
295 subject: "http://example.org/s2".to_string(),
296 predicate: "http://example.org/p".to_string(),
297 object: "value2".to_string(),
298 object_type: ObjectType::Literal {
299 datatype: None,
300 language: None,
301 },
302 },
303 ];
304
305 let result = state_machine
307 .execute(RdfOperation::BatchInsert(triples.clone()))
308 .expect("operation should succeed");
309 assert!(matches!(result, OperationResult::Success));
310 assert_eq!(state_machine.triple_count(), 2);
311
312 let result = state_machine
314 .execute(RdfOperation::BatchRemove(triples))
315 .expect("operation should succeed");
316 assert!(matches!(result, OperationResult::Success));
317 assert_eq!(state_machine.triple_count(), 0);
318 }
319
320 #[test]
321 fn test_state_machine_digest_calculation() {
322 let mut state_machine = RdfStateMachine::new();
323
324 let triple = SerializableTriple {
325 subject: "http://example.org/s".to_string(),
326 predicate: "http://example.org/p".to_string(),
327 object: "value".to_string(),
328 object_type: ObjectType::Literal {
329 datatype: None,
330 language: None,
331 },
332 };
333
334 let digest1 = state_machine.calculate_digest();
336
337 state_machine
339 .execute(RdfOperation::Insert(triple.clone()))
340 .expect("operation should succeed");
341 let digest2 = state_machine.calculate_digest();
342
343 assert_ne!(digest1, digest2);
345
346 let digest3 = state_machine.calculate_digest();
348 assert_eq!(digest2, digest3);
349
350 let digest4 = state_machine.get_state_digest();
352 assert_eq!(digest2, digest4);
353 }
354
355 #[test]
356 fn test_query_operation() {
357 let mut state_machine = RdfStateMachine::new();
358
359 for i in 0..15 {
361 let triple = SerializableTriple {
362 subject: format!("http://example.org/s{i}"),
363 predicate: "http://example.org/p".to_string(),
364 object: format!("value{i}"),
365 object_type: ObjectType::Literal {
366 datatype: None,
367 language: None,
368 },
369 };
370 state_machine
371 .execute(RdfOperation::Insert(triple))
372 .expect("state machine execution should succeed");
373 }
374
375 let result = state_machine
377 .execute(RdfOperation::Query(
378 "SELECT * WHERE { ?s ?p ?o }".to_string(),
379 ))
380 .expect("operation should succeed");
381
382 if let OperationResult::QueryResult(results) = result {
383 assert_eq!(results.len(), 10); } else {
385 panic!("Expected QueryResult");
386 }
387 }
388
389 #[test]
390 fn test_different_object_types() {
391 let mut state_machine = RdfStateMachine::new();
392
393 let triple1 = SerializableTriple {
395 subject: "http://example.org/s".to_string(),
396 predicate: "http://example.org/p".to_string(),
397 object: "http://example.org/o".to_string(),
398 object_type: ObjectType::NamedNode,
399 };
400
401 let triple2 = SerializableTriple {
403 subject: "http://example.org/s".to_string(),
404 predicate: "http://example.org/p2".to_string(),
405 object: "_:blank1".to_string(),
406 object_type: ObjectType::BlankNode,
407 };
408
409 let triple3 = SerializableTriple {
411 subject: "http://example.org/s".to_string(),
412 predicate: "http://example.org/p3".to_string(),
413 object: "42".to_string(),
414 object_type: ObjectType::Literal {
415 datatype: Some("http://www.w3.org/2001/XMLSchema#integer".to_string()),
416 language: None,
417 },
418 };
419
420 let triple4 = SerializableTriple {
422 subject: "http://example.org/s".to_string(),
423 predicate: "http://example.org/p4".to_string(),
424 object: "hello".to_string(),
425 object_type: ObjectType::Literal {
426 datatype: None,
427 language: Some("en".to_string()),
428 },
429 };
430
431 state_machine
433 .execute(RdfOperation::Insert(triple1))
434 .expect("operation should succeed");
435 state_machine
436 .execute(RdfOperation::Insert(triple2))
437 .expect("operation should succeed");
438 state_machine
439 .execute(RdfOperation::Insert(triple3))
440 .expect("operation should succeed");
441 state_machine
442 .execute(RdfOperation::Insert(triple4))
443 .expect("operation should succeed");
444
445 assert_eq!(state_machine.triple_count(), 4);
446 }
447}