1use crate::{PatchOperation, RdfPatch, SparqlOperationType, StreamEvent};
11use anyhow::{anyhow, Result};
12use chrono::{DateTime, Utc};
13use regex::Regex;
14use serde::{Deserialize, Serialize};
15use std::collections::HashSet;
16use tracing::{debug, warn};
17
18pub struct DeltaComputer {
20 enable_optimization: bool,
21 track_provenance: bool,
22 current_context: Option<String>,
23 operation_counter: u64,
24}
25
26impl DeltaComputer {
27 pub fn new() -> Self {
28 Self {
29 enable_optimization: true,
30 track_provenance: false,
31 current_context: None,
32 operation_counter: 0,
33 }
34 }
35
36 pub fn with_optimization(mut self, enabled: bool) -> Self {
37 self.enable_optimization = enabled;
38 self
39 }
40
41 pub fn with_provenance(mut self, enabled: bool) -> Self {
42 self.track_provenance = enabled;
43 self
44 }
45
46 pub fn set_context(&mut self, context: Option<String>) {
47 self.current_context = context;
48 }
49
50 pub fn compute_delta(&mut self, update: &str) -> Result<Vec<StreamEvent>> {
52 let mut events = Vec::new();
53
54 let operations = self.parse_sparql_update(update)?;
56
57 for operation in operations {
58 let mut operation_events = self.process_update_operation(&operation)?;
59 events.append(&mut operation_events);
60 }
61
62 if self.enable_optimization {
63 events = self.optimize_events(events);
64 }
65
66 debug!("Computed {} delta events from SPARQL update", events.len());
67 Ok(events)
68 }
69
70 pub fn sparql_to_patch(&mut self, update: &str) -> Result<RdfPatch> {
72 let events = self.compute_delta(update)?;
73 self.delta_to_patch(&events)
74 }
75
76 pub fn delta_to_patch(&self, events: &[StreamEvent]) -> Result<RdfPatch> {
78 let mut patch = RdfPatch::new();
79
80 let mut _in_transaction = false;
82
83 for event in events {
84 let operation = match event {
85 StreamEvent::TripleAdded {
86 subject,
87 predicate,
88 object,
89 ..
90 } => PatchOperation::Add {
91 subject: subject.clone(),
92 predicate: predicate.clone(),
93 object: object.clone(),
94 },
95 StreamEvent::TripleRemoved {
96 subject,
97 predicate,
98 object,
99 ..
100 } => PatchOperation::Delete {
101 subject: subject.clone(),
102 predicate: predicate.clone(),
103 object: object.clone(),
104 },
105 StreamEvent::QuadAdded {
106 subject,
107 predicate,
108 object,
109 graph,
110 ..
111 } => {
112 if !patch.prefixes.contains_key("g") {
114 patch.add_operation(PatchOperation::AddPrefix {
115 prefix: "g".to_string(),
116 namespace: graph.clone(),
117 });
118 patch.prefixes.insert("g".to_string(), graph.clone());
119 }
120 PatchOperation::Add {
121 subject: subject.clone(),
122 predicate: predicate.clone(),
123 object: object.clone(),
124 }
125 }
126 StreamEvent::QuadRemoved {
127 subject,
128 predicate,
129 object,
130 graph,
131 ..
132 } => {
133 if !patch.prefixes.contains_key("g") {
135 patch.add_operation(PatchOperation::AddPrefix {
136 prefix: "g".to_string(),
137 namespace: graph.clone(),
138 });
139 patch.prefixes.insert("g".to_string(), graph.clone());
140 }
141 PatchOperation::Delete {
142 subject: subject.clone(),
143 predicate: predicate.clone(),
144 object: object.clone(),
145 }
146 }
147 StreamEvent::GraphCreated { graph, .. } => PatchOperation::AddGraph {
148 graph: graph.clone(),
149 },
150 StreamEvent::GraphDeleted { graph, .. } => PatchOperation::DeleteGraph {
151 graph: graph.clone(),
152 },
153 StreamEvent::GraphCleared { graph, .. } => {
154 if let Some(graph_uri) = graph {
155 PatchOperation::DeleteGraph {
156 graph: graph_uri.clone(),
157 }
158 } else {
159 continue;
161 }
162 }
163 StreamEvent::TransactionBegin { transaction_id, .. } => {
164 _in_transaction = true;
165 patch.transaction_id = Some(transaction_id.clone());
166 PatchOperation::TransactionBegin {
167 transaction_id: Some(transaction_id.clone()),
168 }
169 }
170 StreamEvent::TransactionCommit { .. } => {
171 _in_transaction = false;
172 PatchOperation::TransactionCommit
173 }
174 StreamEvent::TransactionAbort { .. } => {
175 _in_transaction = false;
176 PatchOperation::TransactionAbort
177 }
178 StreamEvent::SparqlUpdate { query, .. } => {
179 patch.add_operation(PatchOperation::Header {
181 key: "sparql-source".to_string(),
182 value: query.clone(),
183 });
184 patch
185 .headers
186 .insert("sparql-source".to_string(), query.clone());
187 continue;
188 }
189 StreamEvent::SchemaChanged { .. } | StreamEvent::Heartbeat { .. } => {
190 continue;
192 }
193 _ => {
195 continue;
196 }
197 };
198
199 patch.add_operation(operation);
200 }
201
202 debug!(
203 "Converted {} events to RDF patch with {} operations",
204 events.len(),
205 patch.operations.len()
206 );
207 Ok(patch)
208 }
209
210 fn parse_sparql_update(&mut self, update: &str) -> Result<Vec<UpdateOperation>> {
211 let mut operations = Vec::new();
212 let normalized = self.normalize_sparql(update);
213
214 let statements = self.split_statements(&normalized);
216
217 for statement in statements {
218 if let Some(operation) = self.parse_statement(&statement)? {
219 operations.push(operation);
220 }
221 }
222
223 Ok(operations)
224 }
225
226 fn normalize_sparql(&self, update: &str) -> String {
227 let re = Regex::new(r"\s+").unwrap();
229 re.replace_all(update.trim(), " ").to_string()
230 }
231
232 fn split_statements(&self, update: &str) -> Vec<String> {
233 let mut statements = Vec::new();
235 let mut current = String::new();
236 let mut in_quotes = false;
237 let mut brace_depth = 0;
238 let chars = update.chars().peekable();
239
240 for c in chars {
241 match c {
242 '"' => {
243 in_quotes = !in_quotes;
244 current.push(c);
245 }
246 '{' if !in_quotes => {
247 brace_depth += 1;
248 current.push(c);
249 }
250 '}' if !in_quotes => {
251 brace_depth -= 1;
252 current.push(c);
253 }
254 ';' if !in_quotes && brace_depth == 0 => {
255 if !current.trim().is_empty() {
256 statements.push(current.trim().to_string());
257 current.clear();
258 }
259 }
260 _ => {
261 current.push(c);
262 }
263 }
264 }
265
266 if !current.trim().is_empty() {
267 statements.push(current.trim().to_string());
268 }
269
270 statements
271 }
272
273 fn parse_statement(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
274 let upper = statement.to_uppercase();
275 debug!("Parsing statement: '{}'", statement);
276 debug!("Upper case: '{}'", upper);
277
278 if upper.contains("INSERT DATA") {
279 debug!("Matched INSERT DATA");
280 self.parse_insert_data(statement)
281 } else if upper.contains("DELETE DATA") {
282 debug!("Matched DELETE DATA");
283 self.parse_delete_data(statement)
284 } else if upper.contains("DELETE") && upper.contains("INSERT") {
285 debug!("Matched DELETE/INSERT");
286 self.parse_delete_insert(statement)
287 } else if upper.contains("INSERT") && upper.contains("WHERE") {
288 debug!("Matched INSERT WHERE");
289 self.parse_insert_where(statement)
290 } else if upper.contains("DELETE") && upper.contains("WHERE") {
291 debug!("Matched DELETE WHERE");
292 self.parse_delete_where(statement)
293 } else if upper.contains("CLEAR") {
294 debug!("Matched CLEAR");
295 self.parse_clear(statement)
296 } else if upper.contains("DROP") {
297 debug!("Matched DROP");
298 self.parse_drop(statement)
299 } else if upper.contains("CREATE") {
300 debug!("Matched CREATE");
301 self.parse_create(statement)
302 } else if upper.contains("LOAD") {
303 debug!("Matched LOAD");
304 self.parse_load(statement)
305 } else {
306 warn!("Unknown SPARQL update operation: {}", statement);
307 Ok(None)
308 }
309 }
310
311 fn parse_insert_data(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
312 if let Some(data_block) = self.extract_data_block(statement, "INSERT DATA")? {
313 let triples = self.parse_triples(&data_block)?;
314 Ok(Some(UpdateOperation::InsertData { triples }))
315 } else {
316 Ok(None)
317 }
318 }
319
320 fn parse_delete_data(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
321 if let Some(data_block) = self.extract_data_block(statement, "DELETE DATA")? {
322 let triples = self.parse_triples(&data_block)?;
323 Ok(Some(UpdateOperation::DeleteData { triples }))
324 } else {
325 Ok(None)
326 }
327 }
328
329 fn parse_insert_where(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
330 let insert_block = self.extract_data_block(statement, "INSERT")?;
332 let where_block = self.extract_data_block(statement, "WHERE")?;
333
334 if let (Some(insert), Some(where_clause)) = (insert_block, where_block) {
335 let insert_triples = self.parse_triples(&insert)?;
336 Ok(Some(UpdateOperation::InsertWhere {
337 insert: insert_triples,
338 where_clause,
339 }))
340 } else {
341 Ok(None)
342 }
343 }
344
345 fn parse_delete_where(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
346 if let Some(where_block) = self.extract_data_block(statement, "WHERE")? {
347 let delete_patterns = self.parse_triples(&where_block)?;
348 Ok(Some(UpdateOperation::DeleteWhere {
349 patterns: delete_patterns,
350 }))
351 } else {
352 Ok(None)
353 }
354 }
355
356 fn parse_delete_insert(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
357 let delete_block = self.extract_specific_block(statement, "DELETE")?;
359 let insert_block = self.extract_specific_block(statement, "INSERT")?;
360 let where_block = self.extract_specific_block(statement, "WHERE")?;
361
362 let delete_triples = if let Some(delete) = delete_block {
363 self.parse_triples(&delete)?
364 } else {
365 Vec::new()
366 };
367
368 let insert_triples = if let Some(insert) = insert_block {
369 self.parse_triples(&insert)?
370 } else {
371 Vec::new()
372 };
373
374 debug!(
375 "Parsed DELETE/INSERT: delete={} triples, insert={} triples",
376 delete_triples.len(),
377 insert_triples.len()
378 );
379
380 Ok(Some(UpdateOperation::DeleteInsert {
381 delete: delete_triples,
382 insert: insert_triples,
383 where_clause: where_block,
384 }))
385 }
386
387 fn parse_clear(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
388 let upper = statement.to_uppercase();
389 if upper.contains("CLEAR ALL") {
390 Ok(Some(UpdateOperation::ClearAll))
391 } else if upper.contains("CLEAR DEFAULT") {
392 Ok(Some(UpdateOperation::ClearDefault))
393 } else {
394 let graph = self.extract_graph_uri(statement)?;
396 Ok(Some(UpdateOperation::ClearGraph { graph }))
397 }
398 }
399
400 fn parse_drop(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
401 let graph = self.extract_graph_uri(statement)?;
402 Ok(Some(UpdateOperation::DropGraph { graph }))
403 }
404
405 fn parse_create(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
406 let graph = self.extract_graph_uri(statement)?;
407 Ok(Some(UpdateOperation::CreateGraph { graph }))
408 }
409
410 fn parse_load(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
411 let parts: Vec<&str> = statement.split_whitespace().collect();
413 if parts.len() >= 2 {
414 let url = parts[1].trim_matches('<').trim_matches('>').to_string();
415 let graph = if parts.len() > 3 && parts[2].to_uppercase() == "INTO" {
416 Some(parts[3].trim_matches('<').trim_matches('>').to_string())
417 } else {
418 None
419 };
420 Ok(Some(UpdateOperation::Load { url, graph }))
421 } else {
422 Err(anyhow!("Invalid LOAD statement: {}", statement))
423 }
424 }
425
426 fn extract_specific_block(&self, statement: &str, keyword: &str) -> Result<Option<String>> {
427 let upper = statement.to_uppercase();
429 let keyword_upper = keyword.to_uppercase();
430
431 let pattern = format!(r"{}\s*\{{", regex::escape(&keyword_upper));
433 let re = regex::Regex::new(&pattern)?;
434
435 if let Some(m) = re.find(&upper) {
436 let start_pos = m.start();
437 if let Some(brace_pos_relative) = statement[start_pos..].find('{') {
438 let brace_pos = start_pos + brace_pos_relative;
439
440 let mut brace_count = 1;
442 let mut end_pos = brace_pos + 1;
443 let chars: Vec<char> = statement.chars().collect();
444 let mut in_quotes = false;
445 let mut escape_next = false;
446
447 while end_pos < chars.len() && brace_count > 0 {
448 let c = chars[end_pos];
449
450 if escape_next {
451 escape_next = false;
452 } else {
453 match c {
454 '\\' if in_quotes => escape_next = true,
455 '"' => in_quotes = !in_quotes,
456 '{' if !in_quotes => brace_count += 1,
457 '}' if !in_quotes => {
458 brace_count -= 1;
459 if brace_count == 0 {
460 break;
461 }
462 }
463 _ => {}
464 }
465 }
466 end_pos += 1;
467 }
468
469 if brace_count == 0 {
470 let content = statement[brace_pos + 1..end_pos].trim();
471 return Ok(Some(content.to_string()));
472 }
473 }
474 }
475
476 Ok(None)
477 }
478
479 fn extract_data_block(&self, statement: &str, keyword: &str) -> Result<Option<String>> {
480 let upper = statement.to_uppercase();
481 let keyword_upper = keyword.to_uppercase();
482
483 if let Some(start) = upper.find(&keyword_upper) {
484 let after_keyword = statement[start + keyword.len()..].trim();
485
486 if let Some(open_brace) = after_keyword.find('{') {
487 let from_brace = &after_keyword[open_brace + 1..];
488
489 let mut brace_count = 1;
491 let mut end_pos = 0;
492 let mut in_quotes = false;
493 let mut escape_next = false;
494
495 for (i, c) in from_brace.char_indices() {
496 if escape_next {
497 escape_next = false;
498 } else {
499 match c {
500 '\\' if in_quotes => escape_next = true,
501 '"' => in_quotes = !in_quotes,
502 '{' if !in_quotes => brace_count += 1,
503 '}' if !in_quotes => {
504 brace_count -= 1;
505 if brace_count == 0 {
506 end_pos = i;
507 break;
508 }
509 }
510 _ => {}
511 }
512 }
513 }
514
515 if brace_count == 0 {
516 Ok(Some(from_brace[..end_pos].trim().to_string()))
517 } else {
518 Err(anyhow!("Unmatched braces in {}", keyword))
519 }
520 } else {
521 Ok(None)
522 }
523 } else {
524 Ok(None)
525 }
526 }
527
528 fn extract_graph_uri(&self, statement: &str) -> Result<Option<String>> {
529 let re = Regex::new(r"(?i)GRAPH\s+<([^>]+)>").unwrap();
531 if let Some(captures) = re.captures(statement) {
532 if let Some(uri) = captures.get(1) {
533 let normalized_uri = Self::normalize_uri(uri.as_str());
535 return Ok(Some(normalized_uri));
536 }
537 }
538
539 let upper = statement.to_uppercase();
541 let keywords = ["CLEAR", "DROP", "CREATE"];
542
543 for keyword in &keywords {
544 if let Some(keyword_pos) = upper.find(keyword) {
545 let after_keyword = &statement[keyword_pos + keyword.len()..];
546
547 if let Some(start) = after_keyword.find('<') {
549 if let Some(end) = after_keyword[start..].find('>') {
550 let uri = &after_keyword[start + 1..start + end];
551 let normalized_uri = Self::normalize_uri(uri);
552 return Ok(Some(normalized_uri));
553 }
554 }
555 }
556 }
557
558 Ok(None)
559 }
560
561 fn parse_triples(&self, data: &str) -> Result<Vec<Triple>> {
562 let mut triples = Vec::new();
563
564 let triple_statements = self.split_triple_statements(data);
566
567 for statement in triple_statements {
568 let statement = statement.trim();
569 if statement.is_empty() || statement.starts_with('#') {
570 continue;
571 }
572
573 if let Some(triple) = self.parse_triple_line(statement)? {
575 triples.push(triple);
576 }
577 }
578 Ok(triples)
579 }
580
581 fn split_triple_statements(&self, data: &str) -> Vec<String> {
582 let mut statements = Vec::new();
583 let mut current = String::new();
584 let mut in_quotes = false;
585 let mut in_uri = false;
586
587 for c in data.chars() {
588 match c {
589 '"' => {
590 in_quotes = !in_quotes;
591 current.push(c);
592 }
593 '<' if !in_quotes => {
594 in_uri = true;
595 current.push(c);
596 }
597 '>' if !in_quotes && in_uri => {
598 in_uri = false;
599 current.push(c);
600 }
601 '.' if !in_quotes && !in_uri => {
602 let stmt = current.trim().to_string();
604 if !stmt.is_empty() {
605 statements.push(stmt);
606 }
607 current.clear();
608 }
609 _ => {
610 current.push(c);
611 }
612 }
613 }
614
615 let stmt = current.trim().to_string();
617 if !stmt.is_empty() {
618 statements.push(stmt);
619 }
620
621 statements
622 }
623
624 fn parse_triple_line(&self, line: &str) -> Result<Option<Triple>> {
625 let mut parts = Vec::new();
626 let mut current_part = String::new();
627 let mut in_quotes = false;
628 let mut in_uri = false;
629 let mut escape_next = false;
630 let mut chars = line.chars().peekable();
631
632 while let Some(c) = chars.next() {
633 if escape_next {
634 current_part.push(c);
635 escape_next = false;
636 continue;
637 }
638
639 match c {
640 '\\' if in_quotes => {
641 escape_next = true;
642 current_part.push(c);
643 }
644 '"' => {
645 in_quotes = !in_quotes;
646 current_part.push(c);
647 }
648 '<' if !in_quotes => {
649 in_uri = true;
650 current_part.push(c);
651 }
652 '>' if !in_quotes && in_uri => {
653 in_uri = false;
654 current_part.push(c);
655 }
656 ' ' | '\t' if !in_quotes && !in_uri => {
657 if !current_part.is_empty() {
659 parts.push(current_part.trim().to_string());
660 current_part.clear();
661 }
662 while let Some(&next_c) = chars.peek() {
664 if next_c == ' ' || next_c == '\t' {
665 chars.next();
666 } else {
667 break;
668 }
669 }
670 }
671 _ => {
672 current_part.push(c);
673 }
674 }
675 }
676
677 if !current_part.is_empty() {
679 parts.push(current_part.trim().to_string());
680 }
681
682 if parts.len() >= 3 {
683 let subject = Self::expand_term(&parts[0]);
684 let predicate = Self::expand_term(&parts[1]);
685 let object = if parts.len() > 3 {
686 let joined = parts[2..].join(" ");
688 Self::expand_term(&joined)
689 } else {
690 Self::expand_term(&parts[2])
691 };
692
693 return Ok(Some(Triple {
694 subject,
695 predicate,
696 object,
697 }));
698 }
699
700 Ok(None)
701 }
702
703 fn expand_term(term: &str) -> String {
705 if term.starts_with('<') && term.ends_with('>') {
706 let uri = &term[1..term.len() - 1];
708 Self::normalize_uri(uri)
709 } else if term.starts_with('"') {
710 term.to_string()
712 } else if term.starts_with('_') {
713 term.to_string()
715 } else if term.contains("://") {
716 Self::normalize_uri(term)
718 } else {
719 term.to_string()
721 }
722 }
723
724 fn normalize_uri(uri: &str) -> String {
726 if uri.starts_with("http://")
728 || uri.starts_with("https://")
729 || uri.starts_with("HTTP://")
730 || uri.starts_with("HTTPS://")
731 {
732 let scheme_end = if uri.len() >= 8 && uri[..8].to_lowercase() == "https://" {
734 8
735 } else {
736 7
737 };
738
739 if let Some(path_start) = uri[scheme_end..].find('/') {
740 let scheme_and_domain = &uri[..scheme_end + path_start];
741 let path = &uri[scheme_end + path_start..];
742 format!("{}{}", scheme_and_domain.to_lowercase(), path)
743 } else {
744 uri.to_lowercase()
747 }
748 } else {
749 uri.to_string()
751 }
752 }
753
754 fn process_update_operation(
755 &mut self,
756 operation: &UpdateOperation,
757 ) -> Result<Vec<StreamEvent>> {
758 let mut events = Vec::new();
759 self.operation_counter += 1;
760
761 match operation {
762 UpdateOperation::InsertData { triples } => {
763 for triple in triples {
764 events.push(StreamEvent::TripleAdded {
765 subject: triple.subject.clone(),
766 predicate: triple.predicate.clone(),
767 object: triple.object.clone(),
768 graph: None,
769 metadata: Default::default(),
770 });
771 }
772 }
773 UpdateOperation::DeleteData { triples } => {
774 for triple in triples {
775 events.push(StreamEvent::TripleRemoved {
776 subject: triple.subject.clone(),
777 predicate: triple.predicate.clone(),
778 object: triple.object.clone(),
779 graph: None,
780 metadata: Default::default(),
781 });
782 }
783 }
784 UpdateOperation::InsertWhere { insert, .. } => {
785 for triple in insert {
788 events.push(StreamEvent::TripleAdded {
789 subject: triple.subject.clone(),
790 predicate: triple.predicate.clone(),
791 object: triple.object.clone(),
792 graph: None,
793 metadata: Default::default(),
794 });
795 }
796 }
797 UpdateOperation::DeleteWhere { patterns } => {
798 for pattern in patterns {
800 events.push(StreamEvent::TripleRemoved {
801 subject: pattern.subject.clone(),
802 predicate: pattern.predicate.clone(),
803 object: pattern.object.clone(),
804 graph: None,
805 metadata: Default::default(),
806 });
807 }
808 }
809 UpdateOperation::DeleteInsert { delete, insert, .. } => {
810 for triple in delete {
812 events.push(StreamEvent::TripleRemoved {
813 subject: triple.subject.clone(),
814 predicate: triple.predicate.clone(),
815 object: triple.object.clone(),
816 graph: None,
817 metadata: Default::default(),
818 });
819 }
820 for triple in insert {
821 events.push(StreamEvent::TripleAdded {
822 subject: triple.subject.clone(),
823 predicate: triple.predicate.clone(),
824 object: triple.object.clone(),
825 graph: None,
826 metadata: Default::default(),
827 });
828 }
829 }
830 UpdateOperation::ClearAll => {
831 events.push(StreamEvent::GraphCleared {
832 graph: None,
833 metadata: Default::default(),
834 });
835 }
836 UpdateOperation::ClearDefault => {
837 events.push(StreamEvent::GraphCleared {
838 graph: None,
839 metadata: Default::default(),
840 });
841 }
842 UpdateOperation::ClearGraph { graph } => {
843 events.push(StreamEvent::GraphCleared {
844 graph: graph.clone(),
845 metadata: Default::default(),
846 });
847 }
848 UpdateOperation::DropGraph { graph } => {
849 events.push(StreamEvent::GraphCleared {
850 graph: graph.clone(),
851 metadata: Default::default(),
852 });
853 }
854 UpdateOperation::CreateGraph { .. } => {
855 }
857 UpdateOperation::Load { .. } => {
858 events.push(StreamEvent::SparqlUpdate {
861 query: format!("Operation #{}: {:?}", self.operation_counter, operation),
862 operation_type: SparqlOperationType::Load,
863 metadata: Default::default(),
864 });
865 }
866 }
867
868 Ok(events)
869 }
870
871 fn optimize_events(&self, events: Vec<StreamEvent>) -> Vec<StreamEvent> {
872 let mut optimized = Vec::new();
874 let mut seen_operations = HashSet::new();
875 let original_count = events.len();
876
877 for event in events {
878 let event_key = match &event {
879 StreamEvent::TripleAdded {
880 subject,
881 predicate,
882 object,
883 ..
884 }
885 | StreamEvent::TripleRemoved {
886 subject,
887 predicate,
888 object,
889 ..
890 } => {
891 format!("{subject}|{predicate}|{object}")
892 }
893 StreamEvent::GraphCleared { graph, .. } => {
894 format!("graph_clear|{graph:?}")
895 }
896 StreamEvent::SparqlUpdate { query, .. } => {
897 format!("sparql|{query}")
898 }
899 _ => {
900 format!("other|{}", uuid::Uuid::new_v4())
902 }
903 };
904
905 if !seen_operations.contains(&event_key) {
906 seen_operations.insert(event_key);
907 optimized.push(event);
908 }
909 }
910
911 debug!("Optimized {} events to {}", original_count, optimized.len());
912 optimized
913 }
914}
915
916impl Default for DeltaComputer {
917 fn default() -> Self {
918 Self::new()
919 }
920}
921
922#[derive(Debug, Clone)]
924enum UpdateOperation {
925 InsertData {
926 triples: Vec<Triple>,
927 },
928 DeleteData {
929 triples: Vec<Triple>,
930 },
931 InsertWhere {
932 insert: Vec<Triple>,
933 where_clause: String,
934 },
935 DeleteWhere {
936 patterns: Vec<Triple>,
937 },
938 DeleteInsert {
939 delete: Vec<Triple>,
940 insert: Vec<Triple>,
941 where_clause: Option<String>,
942 },
943 ClearAll,
944 ClearDefault,
945 ClearGraph {
946 graph: Option<String>,
947 },
948 DropGraph {
949 graph: Option<String>,
950 },
951 CreateGraph {
952 graph: Option<String>,
953 },
954 Load {
955 url: String,
956 graph: Option<String>,
957 },
958}
959
960#[derive(Debug, Clone, Serialize, Deserialize)]
962struct Triple {
963 subject: String,
964 predicate: String,
965 object: String,
966}
967
968pub struct DeltaProcessor {
970 computer: DeltaComputer,
971 buffer: Vec<StreamEvent>,
972 batch_size: usize,
973 max_buffer_age: chrono::Duration,
974 last_flush: DateTime<Utc>,
975 stats: ProcessorStats,
976}
977
978#[derive(Debug, Default)]
979pub struct ProcessorStats {
980 updates_processed: u64,
981 events_generated: u64,
982 batches_sent: u64,
983 last_activity: Option<DateTime<Utc>>,
984}
985
986impl DeltaProcessor {
987 pub fn new() -> Self {
988 Self {
989 computer: DeltaComputer::new(),
990 buffer: Vec::new(),
991 batch_size: 100,
992 max_buffer_age: chrono::Duration::seconds(30),
993 last_flush: Utc::now(),
994 stats: ProcessorStats::default(),
995 }
996 }
997
998 pub fn with_optimization(mut self, enabled: bool) -> Self {
999 self.computer = self.computer.with_optimization(enabled);
1000 self
1001 }
1002
1003 pub fn with_batch_size(mut self, size: usize) -> Self {
1004 self.batch_size = size;
1005 self
1006 }
1007
1008 pub fn with_max_buffer_age(mut self, duration: chrono::Duration) -> Self {
1009 self.max_buffer_age = duration;
1010 self
1011 }
1012
1013 pub async fn process_update(&mut self, update: &str) -> Result<Vec<StreamEvent>> {
1015 let events = self.computer.compute_delta(update)?;
1016
1017 self.stats.updates_processed += 1;
1018 self.stats.events_generated += events.len() as u64;
1019 self.stats.last_activity = Some(Utc::now());
1020
1021 for event in &events {
1023 self.buffer.push(event.clone());
1024 }
1025
1026 let should_flush = self.buffer.len() >= self.batch_size
1028 || Utc::now() - self.last_flush > self.max_buffer_age;
1029
1030 if should_flush {
1031 self.flush();
1033 }
1034
1035 Ok(events)
1037 }
1038
1039 pub fn flush(&mut self) -> Vec<StreamEvent> {
1041 let events = self.buffer.clone();
1042 self.buffer.clear();
1043 self.last_flush = Utc::now();
1044
1045 if !events.is_empty() {
1046 self.stats.batches_sent += 1;
1047 debug!("Flushed {} buffered events", events.len());
1048 }
1049
1050 events
1051 }
1052
1053 pub fn get_stats(&self) -> &ProcessorStats {
1055 &self.stats
1056 }
1057
1058 pub async fn updates_to_patch(&mut self, updates: &[String]) -> Result<RdfPatch> {
1060 let mut all_events = Vec::new();
1061
1062 for update in updates {
1063 let events = self.computer.compute_delta(update)?;
1064 all_events.extend(events);
1065 }
1066
1067 self.computer.delta_to_patch(&all_events)
1068 }
1069}
1070
1071impl Default for DeltaProcessor {
1072 fn default() -> Self {
1073 Self::new()
1074 }
1075}
1076
1077pub struct BatchDeltaProcessor {
1079 processors: Vec<DeltaProcessor>,
1080 current_processor: usize,
1081 round_robin: bool,
1082}
1083
1084impl BatchDeltaProcessor {
1085 pub fn new(num_processors: usize) -> Self {
1086 let mut processors = Vec::new();
1087 for _ in 0..num_processors {
1088 processors.push(DeltaProcessor::new());
1089 }
1090
1091 Self {
1092 processors,
1093 current_processor: 0,
1094 round_robin: true,
1095 }
1096 }
1097
1098 pub async fn process_updates(&mut self, updates: &[String]) -> Result<Vec<StreamEvent>> {
1099 let mut all_events = Vec::new();
1100
1101 for update in updates {
1102 let processor_idx = if self.round_robin {
1103 let idx = self.current_processor;
1104 self.current_processor = (self.current_processor + 1) % self.processors.len();
1105 idx
1106 } else {
1107 0 };
1109
1110 let events = self.processors[processor_idx]
1111 .process_update(update)
1112 .await?;
1113 all_events.extend(events);
1114 }
1115
1116 Ok(all_events)
1117 }
1118
1119 pub fn flush_all(&mut self) -> Vec<StreamEvent> {
1120 let mut all_events = Vec::new();
1121 for processor in &mut self.processors {
1122 let events = processor.flush();
1123 all_events.extend(events);
1124 }
1125 all_events
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use super::*;
1132 use crate::{EventMetadata, StreamEvent};
1133 use std::collections::HashMap;
1134
1135 #[test]
1136 fn test_sparql_parsing() {
1137 let mut computer = DeltaComputer::new().with_optimization(false);
1138
1139 let update = r#"
1140 INSERT DATA {
1141 <http://example.org/person1> <http://example.org/name> "John Doe" .
1142 <http://example.org/person1> <http://example.org/age> "30" .
1143 }
1144 "#;
1145
1146 let events = computer.compute_delta(update).unwrap();
1147 assert_eq!(events.len(), 2);
1148
1149 match &events[0] {
1150 StreamEvent::TripleAdded {
1151 subject,
1152 predicate,
1153 object,
1154 ..
1155 } => {
1156 assert_eq!(subject, "http://example.org/person1");
1157 assert_eq!(predicate, "http://example.org/name");
1158 assert_eq!(object, "\"John Doe\"");
1159 }
1160 _ => panic!("Expected TripleAdded event"),
1161 }
1162 }
1163
1164 #[test]
1165 fn test_delete_data_parsing() {
1166 let mut computer = DeltaComputer::new();
1167
1168 let update = r#"
1169 DELETE DATA {
1170 <http://example.org/person1> <http://example.org/name> "John Doe" .
1171 }
1172 "#;
1173
1174 let events = computer.compute_delta(update).unwrap();
1175 assert_eq!(events.len(), 1);
1176
1177 match &events[0] {
1178 StreamEvent::TripleRemoved {
1179 subject,
1180 predicate,
1181 object,
1182 ..
1183 } => {
1184 assert_eq!(subject, "http://example.org/person1");
1185 assert_eq!(predicate, "http://example.org/name");
1186 assert_eq!(object, "\"John Doe\"");
1187 }
1188 _ => panic!("Expected TripleRemoved event"),
1189 }
1190 }
1191
1192 #[test]
1193 fn test_clear_graph() {
1194 let mut computer = DeltaComputer::new();
1195
1196 let update = "CLEAR GRAPH <http://example.org/graph>";
1197 let events = computer.compute_delta(update).unwrap();
1198 assert_eq!(events.len(), 1);
1199
1200 match &events[0] {
1201 StreamEvent::GraphCleared { graph, .. } => {
1202 assert_eq!(graph, &Some("http://example.org/graph".to_string()));
1203 }
1204 _ => panic!("Expected GraphCleared event"),
1205 }
1206 }
1207
1208 #[test]
1209 fn test_delete_insert() {
1210 let mut computer = DeltaComputer::new().with_optimization(false);
1211
1212 let update = r#"
1213 DELETE {
1214 <http://example.org/person1> <http://example.org/age> "30" .
1215 }
1216 INSERT {
1217 <http://example.org/person1> <http://example.org/age> "31" .
1218 }
1219 WHERE {
1220 <http://example.org/person1> <http://example.org/age> "30" .
1221 }
1222 "#;
1223
1224 let events = computer.compute_delta(update).unwrap();
1225 assert_eq!(events.len(), 2);
1226
1227 match &events[0] {
1229 StreamEvent::TripleRemoved {
1230 subject,
1231 predicate,
1232 object,
1233 ..
1234 } => {
1235 assert_eq!(subject, "http://example.org/person1");
1236 assert_eq!(predicate, "http://example.org/age");
1237 assert_eq!(object, "\"30\"");
1238 }
1239 _ => panic!("Expected TripleRemoved event"),
1240 }
1241
1242 match &events[1] {
1244 StreamEvent::TripleAdded {
1245 subject,
1246 predicate,
1247 object,
1248 ..
1249 } => {
1250 assert_eq!(subject, "http://example.org/person1");
1251 assert_eq!(predicate, "http://example.org/age");
1252 assert_eq!(object, "\"31\"");
1253 }
1254 _ => panic!("Expected TripleAdded event"),
1255 }
1256 }
1257
1258 #[test]
1259 fn test_delta_to_patch() {
1260 let computer = DeltaComputer::new();
1261
1262 let events = vec![
1263 StreamEvent::TripleAdded {
1264 subject: "http://example.org/s".to_string(),
1265 predicate: "http://example.org/p".to_string(),
1266 object: "http://example.org/o".to_string(),
1267 graph: None,
1268 metadata: EventMetadata {
1269 event_id: "test".to_string(),
1270 timestamp: Utc::now(),
1271 source: "test".to_string(),
1272 user: None,
1273 context: None,
1274 caused_by: None,
1275 version: "1.0".to_string(),
1276 properties: HashMap::new(),
1277 checksum: None,
1278 },
1279 },
1280 StreamEvent::TripleRemoved {
1281 subject: "http://example.org/s2".to_string(),
1282 predicate: "http://example.org/p2".to_string(),
1283 object: "http://example.org/o2".to_string(),
1284 graph: None,
1285 metadata: EventMetadata {
1286 event_id: "test2".to_string(),
1287 timestamp: Utc::now(),
1288 source: "test".to_string(),
1289 user: None,
1290 context: None,
1291 caused_by: None,
1292 version: "1.0".to_string(),
1293 properties: HashMap::new(),
1294 checksum: None,
1295 },
1296 },
1297 ];
1298
1299 let patch = computer.delta_to_patch(&events).unwrap();
1300 assert_eq!(patch.operations.len(), 2);
1301
1302 match &patch.operations[0] {
1303 PatchOperation::Add {
1304 subject,
1305 predicate,
1306 object,
1307 } => {
1308 assert_eq!(subject, "http://example.org/s");
1309 assert_eq!(predicate, "http://example.org/p");
1310 assert_eq!(object, "http://example.org/o");
1311 }
1312 _ => panic!("Expected Add operation"),
1313 }
1314
1315 match &patch.operations[1] {
1316 PatchOperation::Delete {
1317 subject,
1318 predicate,
1319 object,
1320 } => {
1321 assert_eq!(subject, "http://example.org/s2");
1322 assert_eq!(predicate, "http://example.org/p2");
1323 assert_eq!(object, "http://example.org/o2");
1324 }
1325 _ => panic!("Expected Delete operation"),
1326 }
1327 }
1328
1329 #[tokio::test]
1330 async fn test_delta_processor() {
1331 let mut processor = DeltaProcessor::new().with_batch_size(2);
1332
1333 let update1 = r#"
1334 INSERT DATA {
1335 <http://example.org/person1> <http://example.org/name> "John" .
1336 }
1337 "#;
1338
1339 let events1 = processor.process_update(update1).await.unwrap();
1340 assert_eq!(events1.len(), 1);
1342
1343 let update2 = r#"
1344 INSERT DATA {
1345 <http://example.org/person2> <http://example.org/name> "Jane" .
1346 }
1347 "#;
1348
1349 let events2 = processor.process_update(update2).await.unwrap();
1350 assert_eq!(events2.len(), 1);
1352
1353 let stats = processor.get_stats();
1354 assert_eq!(stats.updates_processed, 2);
1355 assert_eq!(stats.events_generated, 2);
1356 assert!(stats.last_activity.is_some());
1357 }
1358
1359 #[tokio::test]
1360 async fn test_batch_processor() {
1361 let mut batch_processor = BatchDeltaProcessor::new(2);
1362
1363 let updates = vec![
1364 r#"INSERT DATA { <http://example.org/p1> <http://example.org/name> "Person1" . }"#
1365 .to_string(),
1366 r#"INSERT DATA { <http://example.org/p2> <http://example.org/name> "Person2" . }"#
1367 .to_string(),
1368 r#"DELETE DATA { <http://example.org/p1> <http://example.org/old> "value" . }"#
1369 .to_string(),
1370 ];
1371
1372 let events = batch_processor.process_updates(&updates).await.unwrap();
1373 assert_eq!(events.len(), 3);
1374
1375 let add_count = events
1377 .iter()
1378 .filter(|e| matches!(e, StreamEvent::TripleAdded { .. }))
1379 .count();
1380 let remove_count = events
1381 .iter()
1382 .filter(|e| matches!(e, StreamEvent::TripleRemoved { .. }))
1383 .count();
1384
1385 assert_eq!(add_count, 2);
1386 assert_eq!(remove_count, 1);
1387 }
1388
1389 #[tokio::test]
1390 async fn test_updates_to_patch() {
1391 let mut processor = DeltaProcessor::new();
1392
1393 let updates = vec![
1394 r#"INSERT DATA { <http://example.org/s> <http://example.org/p> "value1" . }"#
1395 .to_string(),
1396 r#"DELETE DATA { <http://example.org/s> <http://example.org/p> "value1" . }"#
1397 .to_string(),
1398 r#"INSERT DATA { <http://example.org/s> <http://example.org/p> "value2" . }"#
1399 .to_string(),
1400 ];
1401
1402 let patch = processor.updates_to_patch(&updates).await.unwrap();
1403 assert_eq!(patch.operations.len(), 3);
1404
1405 assert!(matches!(patch.operations[0], PatchOperation::Add { .. }));
1407 assert!(matches!(patch.operations[1], PatchOperation::Delete { .. }));
1408 assert!(matches!(patch.operations[2], PatchOperation::Add { .. }));
1409 }
1410
1411 #[test]
1412 fn test_statement_splitting() {
1413 let computer = DeltaComputer::new();
1414
1415 let input = r#"
1416 INSERT DATA { <s1> <p1> "o1" . };
1417 DELETE DATA { <s2> <p2> "o2" . };
1418 CLEAR GRAPH <g1>
1419 "#;
1420
1421 let statements = computer.split_statements(input);
1422 assert_eq!(statements.len(), 3);
1423 assert!(statements[0].contains("INSERT DATA"));
1424 assert!(statements[1].contains("DELETE DATA"));
1425 assert!(statements[2].contains("CLEAR GRAPH"));
1426 }
1427
1428 #[test]
1429 fn test_triple_parsing() {
1430 let computer = DeltaComputer::new();
1431
1432 let data = r#"
1433 <http://example.org/subject> <http://example.org/predicate> "Object literal" .
1434 <http://example.org/s2> <http://example.org/p2> <http://example.org/o2> .
1435 "#;
1436
1437 let triples = computer.parse_triples(data).unwrap();
1438 assert_eq!(triples.len(), 2);
1439
1440 assert_eq!(triples[0].subject, "http://example.org/subject");
1441 assert_eq!(triples[0].predicate, "http://example.org/predicate");
1442 assert_eq!(triples[0].object, "\"Object literal\"");
1443
1444 assert_eq!(triples[1].subject, "http://example.org/s2");
1445 assert_eq!(triples[1].predicate, "http://example.org/p2");
1446 assert_eq!(triples[1].object, "http://example.org/o2");
1447 }
1448
1449 #[test]
1450 fn test_optimization() {
1451 let computer = DeltaComputer::new().with_optimization(true);
1452
1453 let events = vec![
1454 StreamEvent::TripleAdded {
1455 subject: "s".to_string(),
1456 predicate: "p".to_string(),
1457 object: "o".to_string(),
1458 graph: None,
1459 metadata: EventMetadata {
1460 event_id: "1".to_string(),
1461 timestamp: Utc::now(),
1462 source: "test".to_string(),
1463 user: None,
1464 context: None,
1465 caused_by: None,
1466 version: "1.0".to_string(),
1467 properties: HashMap::new(),
1468 checksum: None,
1469 },
1470 },
1471 StreamEvent::TripleAdded {
1472 subject: "s".to_string(),
1473 predicate: "p".to_string(),
1474 object: "o".to_string(),
1475 graph: None,
1476 metadata: EventMetadata {
1477 event_id: "2".to_string(),
1478 timestamp: Utc::now(),
1479 source: "test".to_string(),
1480 user: None,
1481 context: None,
1482 caused_by: None,
1483 version: "1.0".to_string(),
1484 properties: HashMap::new(),
1485 checksum: None,
1486 },
1487 },
1488 ];
1489
1490 let optimized = computer.optimize_events(events);
1491 assert_eq!(optimized.len(), 1);
1493 }
1494}