oxirs_stream/
delta.rs

1//! # SPARQL Update Delta Support
2//!
3//! Delta computation and streaming for SPARQL Updates.
4//!
5//! This module provides sophisticated delta computation for SPARQL Updates,
6//! converting update operations into fine-grained change events and RDF Patches.
7//! It supports tracking changes at the triple level and provides efficient
8//! streaming of update operations.
9
10use crate::{PatchOperation, RdfPatch, SparqlOperationType, StreamEvent};
11use anyhow::{anyhow, Result};
12use chrono::{DateTime, Utc};
13use regex::Regex;
14use serde::{Deserialize, Serialize};
15use std::collections::HashSet;
16use tracing::{debug, warn};
17
18/// Delta computation for SPARQL Updates with advanced parsing
19pub struct DeltaComputer {
20    enable_optimization: bool,
21    track_provenance: bool,
22    current_context: Option<String>,
23    operation_counter: u64,
24}
25
26impl DeltaComputer {
27    pub fn new() -> Self {
28        Self {
29            enable_optimization: true,
30            track_provenance: false,
31            current_context: None,
32            operation_counter: 0,
33        }
34    }
35
36    pub fn with_optimization(mut self, enabled: bool) -> Self {
37        self.enable_optimization = enabled;
38        self
39    }
40
41    pub fn with_provenance(mut self, enabled: bool) -> Self {
42        self.track_provenance = enabled;
43        self
44    }
45
46    pub fn set_context(&mut self, context: Option<String>) {
47        self.current_context = context;
48    }
49
50    /// Compute delta from SPARQL Update
51    pub fn compute_delta(&mut self, update: &str) -> Result<Vec<StreamEvent>> {
52        let mut events = Vec::new();
53
54        // Parse the SPARQL update to identify operations
55        let operations = self.parse_sparql_update(update)?;
56
57        for operation in operations {
58            let mut operation_events = self.process_update_operation(&operation)?;
59            events.append(&mut operation_events);
60        }
61
62        if self.enable_optimization {
63            events = self.optimize_events(events);
64        }
65
66        debug!("Computed {} delta events from SPARQL update", events.len());
67        Ok(events)
68    }
69
70    /// Convert SPARQL Update directly to RDF Patch
71    pub fn sparql_to_patch(&mut self, update: &str) -> Result<RdfPatch> {
72        let events = self.compute_delta(update)?;
73        self.delta_to_patch(&events)
74    }
75
76    /// Convert delta to RDF Patch
77    pub fn delta_to_patch(&self, events: &[StreamEvent]) -> Result<RdfPatch> {
78        let mut patch = RdfPatch::new();
79
80        // Track if we're in a transaction
81        let mut _in_transaction = false;
82
83        for event in events {
84            let operation = match event {
85                StreamEvent::TripleAdded {
86                    subject,
87                    predicate,
88                    object,
89                    ..
90                } => PatchOperation::Add {
91                    subject: subject.clone(),
92                    predicate: predicate.clone(),
93                    object: object.clone(),
94                },
95                StreamEvent::TripleRemoved {
96                    subject,
97                    predicate,
98                    object,
99                    ..
100                } => PatchOperation::Delete {
101                    subject: subject.clone(),
102                    predicate: predicate.clone(),
103                    object: object.clone(),
104                },
105                StreamEvent::QuadAdded {
106                    subject,
107                    predicate,
108                    object,
109                    graph,
110                    ..
111                } => {
112                    // Add graph to patch prefixes if needed
113                    if !patch.prefixes.contains_key("g") {
114                        patch.add_operation(PatchOperation::AddPrefix {
115                            prefix: "g".to_string(),
116                            namespace: graph.clone(),
117                        });
118                        patch.prefixes.insert("g".to_string(), graph.clone());
119                    }
120                    PatchOperation::Add {
121                        subject: subject.clone(),
122                        predicate: predicate.clone(),
123                        object: object.clone(),
124                    }
125                }
126                StreamEvent::QuadRemoved {
127                    subject,
128                    predicate,
129                    object,
130                    graph,
131                    ..
132                } => {
133                    // Add graph to patch prefixes if needed
134                    if !patch.prefixes.contains_key("g") {
135                        patch.add_operation(PatchOperation::AddPrefix {
136                            prefix: "g".to_string(),
137                            namespace: graph.clone(),
138                        });
139                        patch.prefixes.insert("g".to_string(), graph.clone());
140                    }
141                    PatchOperation::Delete {
142                        subject: subject.clone(),
143                        predicate: predicate.clone(),
144                        object: object.clone(),
145                    }
146                }
147                StreamEvent::GraphCreated { graph, .. } => PatchOperation::AddGraph {
148                    graph: graph.clone(),
149                },
150                StreamEvent::GraphDeleted { graph, .. } => PatchOperation::DeleteGraph {
151                    graph: graph.clone(),
152                },
153                StreamEvent::GraphCleared { graph, .. } => {
154                    if let Some(graph_uri) = graph {
155                        PatchOperation::DeleteGraph {
156                            graph: graph_uri.clone(),
157                        }
158                    } else {
159                        // Default graph clear - we'll represent this as a special operation
160                        continue;
161                    }
162                }
163                StreamEvent::TransactionBegin { transaction_id, .. } => {
164                    _in_transaction = true;
165                    patch.transaction_id = Some(transaction_id.clone());
166                    PatchOperation::TransactionBegin {
167                        transaction_id: Some(transaction_id.clone()),
168                    }
169                }
170                StreamEvent::TransactionCommit { .. } => {
171                    _in_transaction = false;
172                    PatchOperation::TransactionCommit
173                }
174                StreamEvent::TransactionAbort { .. } => {
175                    _in_transaction = false;
176                    PatchOperation::TransactionAbort
177                }
178                StreamEvent::SparqlUpdate { query, .. } => {
179                    // Add SPARQL query as a header for provenance
180                    patch.add_operation(PatchOperation::Header {
181                        key: "sparql-source".to_string(),
182                        value: query.clone(),
183                    });
184                    patch
185                        .headers
186                        .insert("sparql-source".to_string(), query.clone());
187                    continue;
188                }
189                StreamEvent::SchemaChanged { .. } | StreamEvent::Heartbeat { .. } => {
190                    // These events don't translate to patch operations
191                    continue;
192                }
193                // Catch-all for remaining variants that don't translate to patch operations
194                _ => {
195                    continue;
196                }
197            };
198
199            patch.add_operation(operation);
200        }
201
202        debug!(
203            "Converted {} events to RDF patch with {} operations",
204            events.len(),
205            patch.operations.len()
206        );
207        Ok(patch)
208    }
209
210    fn parse_sparql_update(&mut self, update: &str) -> Result<Vec<UpdateOperation>> {
211        let mut operations = Vec::new();
212        let normalized = self.normalize_sparql(update);
213
214        // Split into individual operations (simplified)
215        let statements = self.split_statements(&normalized);
216
217        for statement in statements {
218            if let Some(operation) = self.parse_statement(&statement)? {
219                operations.push(operation);
220            }
221        }
222
223        Ok(operations)
224    }
225
226    fn normalize_sparql(&self, update: &str) -> String {
227        // Basic normalization - remove extra whitespace, normalize line endings
228        let re = Regex::new(r"\s+").unwrap();
229        re.replace_all(update.trim(), " ").to_string()
230    }
231
232    fn split_statements(&self, update: &str) -> Vec<String> {
233        // Split on semicolons that aren't inside quotes or braces
234        let mut statements = Vec::new();
235        let mut current = String::new();
236        let mut in_quotes = false;
237        let mut brace_depth = 0;
238        let chars = update.chars().peekable();
239
240        for c in chars {
241            match c {
242                '"' => {
243                    in_quotes = !in_quotes;
244                    current.push(c);
245                }
246                '{' if !in_quotes => {
247                    brace_depth += 1;
248                    current.push(c);
249                }
250                '}' if !in_quotes => {
251                    brace_depth -= 1;
252                    current.push(c);
253                }
254                ';' if !in_quotes && brace_depth == 0 => {
255                    if !current.trim().is_empty() {
256                        statements.push(current.trim().to_string());
257                        current.clear();
258                    }
259                }
260                _ => {
261                    current.push(c);
262                }
263            }
264        }
265
266        if !current.trim().is_empty() {
267            statements.push(current.trim().to_string());
268        }
269
270        statements
271    }
272
273    fn parse_statement(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
274        let upper = statement.to_uppercase();
275        debug!("Parsing statement: '{}'", statement);
276        debug!("Upper case: '{}'", upper);
277
278        if upper.contains("INSERT DATA") {
279            debug!("Matched INSERT DATA");
280            self.parse_insert_data(statement)
281        } else if upper.contains("DELETE DATA") {
282            debug!("Matched DELETE DATA");
283            self.parse_delete_data(statement)
284        } else if upper.contains("DELETE") && upper.contains("INSERT") {
285            debug!("Matched DELETE/INSERT");
286            self.parse_delete_insert(statement)
287        } else if upper.contains("INSERT") && upper.contains("WHERE") {
288            debug!("Matched INSERT WHERE");
289            self.parse_insert_where(statement)
290        } else if upper.contains("DELETE") && upper.contains("WHERE") {
291            debug!("Matched DELETE WHERE");
292            self.parse_delete_where(statement)
293        } else if upper.contains("CLEAR") {
294            debug!("Matched CLEAR");
295            self.parse_clear(statement)
296        } else if upper.contains("DROP") {
297            debug!("Matched DROP");
298            self.parse_drop(statement)
299        } else if upper.contains("CREATE") {
300            debug!("Matched CREATE");
301            self.parse_create(statement)
302        } else if upper.contains("LOAD") {
303            debug!("Matched LOAD");
304            self.parse_load(statement)
305        } else {
306            warn!("Unknown SPARQL update operation: {}", statement);
307            Ok(None)
308        }
309    }
310
311    fn parse_insert_data(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
312        if let Some(data_block) = self.extract_data_block(statement, "INSERT DATA")? {
313            let triples = self.parse_triples(&data_block)?;
314            Ok(Some(UpdateOperation::InsertData { triples }))
315        } else {
316            Ok(None)
317        }
318    }
319
320    fn parse_delete_data(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
321        if let Some(data_block) = self.extract_data_block(statement, "DELETE DATA")? {
322            let triples = self.parse_triples(&data_block)?;
323            Ok(Some(UpdateOperation::DeleteData { triples }))
324        } else {
325            Ok(None)
326        }
327    }
328
329    fn parse_insert_where(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
330        // Simplified parsing for INSERT ... WHERE
331        let insert_block = self.extract_data_block(statement, "INSERT")?;
332        let where_block = self.extract_data_block(statement, "WHERE")?;
333
334        if let (Some(insert), Some(where_clause)) = (insert_block, where_block) {
335            let insert_triples = self.parse_triples(&insert)?;
336            Ok(Some(UpdateOperation::InsertWhere {
337                insert: insert_triples,
338                where_clause,
339            }))
340        } else {
341            Ok(None)
342        }
343    }
344
345    fn parse_delete_where(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
346        if let Some(where_block) = self.extract_data_block(statement, "WHERE")? {
347            let delete_patterns = self.parse_triples(&where_block)?;
348            Ok(Some(UpdateOperation::DeleteWhere {
349                patterns: delete_patterns,
350            }))
351        } else {
352            Ok(None)
353        }
354    }
355
356    fn parse_delete_insert(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
357        // Use more specific parsing for DELETE/INSERT WHERE statements
358        let delete_block = self.extract_specific_block(statement, "DELETE")?;
359        let insert_block = self.extract_specific_block(statement, "INSERT")?;
360        let where_block = self.extract_specific_block(statement, "WHERE")?;
361
362        let delete_triples = if let Some(delete) = delete_block {
363            self.parse_triples(&delete)?
364        } else {
365            Vec::new()
366        };
367
368        let insert_triples = if let Some(insert) = insert_block {
369            self.parse_triples(&insert)?
370        } else {
371            Vec::new()
372        };
373
374        debug!(
375            "Parsed DELETE/INSERT: delete={} triples, insert={} triples",
376            delete_triples.len(),
377            insert_triples.len()
378        );
379
380        Ok(Some(UpdateOperation::DeleteInsert {
381            delete: delete_triples,
382            insert: insert_triples,
383            where_clause: where_block,
384        }))
385    }
386
387    fn parse_clear(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
388        let upper = statement.to_uppercase();
389        if upper.contains("CLEAR ALL") {
390            Ok(Some(UpdateOperation::ClearAll))
391        } else if upper.contains("CLEAR DEFAULT") {
392            Ok(Some(UpdateOperation::ClearDefault))
393        } else {
394            // Extract graph URI
395            let graph = self.extract_graph_uri(statement)?;
396            Ok(Some(UpdateOperation::ClearGraph { graph }))
397        }
398    }
399
400    fn parse_drop(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
401        let graph = self.extract_graph_uri(statement)?;
402        Ok(Some(UpdateOperation::DropGraph { graph }))
403    }
404
405    fn parse_create(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
406        let graph = self.extract_graph_uri(statement)?;
407        Ok(Some(UpdateOperation::CreateGraph { graph }))
408    }
409
410    fn parse_load(&mut self, statement: &str) -> Result<Option<UpdateOperation>> {
411        // Extract URL and optional graph
412        let parts: Vec<&str> = statement.split_whitespace().collect();
413        if parts.len() >= 2 {
414            let url = parts[1].trim_matches('<').trim_matches('>').to_string();
415            let graph = if parts.len() > 3 && parts[2].to_uppercase() == "INTO" {
416                Some(parts[3].trim_matches('<').trim_matches('>').to_string())
417            } else {
418                None
419            };
420            Ok(Some(UpdateOperation::Load { url, graph }))
421        } else {
422            Err(anyhow!("Invalid LOAD statement: {}", statement))
423        }
424    }
425
426    fn extract_specific_block(&self, statement: &str, keyword: &str) -> Result<Option<String>> {
427        // More precise parsing for DELETE/INSERT WHERE statements
428        let upper = statement.to_uppercase();
429        let keyword_upper = keyword.to_uppercase();
430
431        // Find the keyword followed by whitespace and then a '{'
432        let pattern = format!(r"{}\s*\{{", regex::escape(&keyword_upper));
433        let re = regex::Regex::new(&pattern)?;
434
435        if let Some(m) = re.find(&upper) {
436            let start_pos = m.start();
437            if let Some(brace_pos_relative) = statement[start_pos..].find('{') {
438                let brace_pos = start_pos + brace_pos_relative;
439
440                // Find matching closing brace with proper quote handling
441                let mut brace_count = 1;
442                let mut end_pos = brace_pos + 1;
443                let chars: Vec<char> = statement.chars().collect();
444                let mut in_quotes = false;
445                let mut escape_next = false;
446
447                while end_pos < chars.len() && brace_count > 0 {
448                    let c = chars[end_pos];
449
450                    if escape_next {
451                        escape_next = false;
452                    } else {
453                        match c {
454                            '\\' if in_quotes => escape_next = true,
455                            '"' => in_quotes = !in_quotes,
456                            '{' if !in_quotes => brace_count += 1,
457                            '}' if !in_quotes => {
458                                brace_count -= 1;
459                                if brace_count == 0 {
460                                    break;
461                                }
462                            }
463                            _ => {}
464                        }
465                    }
466                    end_pos += 1;
467                }
468
469                if brace_count == 0 {
470                    let content = statement[brace_pos + 1..end_pos].trim();
471                    return Ok(Some(content.to_string()));
472                }
473            }
474        }
475
476        Ok(None)
477    }
478
479    fn extract_data_block(&self, statement: &str, keyword: &str) -> Result<Option<String>> {
480        let upper = statement.to_uppercase();
481        let keyword_upper = keyword.to_uppercase();
482
483        if let Some(start) = upper.find(&keyword_upper) {
484            let after_keyword = statement[start + keyword.len()..].trim();
485
486            if let Some(open_brace) = after_keyword.find('{') {
487                let from_brace = &after_keyword[open_brace + 1..];
488
489                // Find matching closing brace with proper quote handling
490                let mut brace_count = 1;
491                let mut end_pos = 0;
492                let mut in_quotes = false;
493                let mut escape_next = false;
494
495                for (i, c) in from_brace.char_indices() {
496                    if escape_next {
497                        escape_next = false;
498                    } else {
499                        match c {
500                            '\\' if in_quotes => escape_next = true,
501                            '"' => in_quotes = !in_quotes,
502                            '{' if !in_quotes => brace_count += 1,
503                            '}' if !in_quotes => {
504                                brace_count -= 1;
505                                if brace_count == 0 {
506                                    end_pos = i;
507                                    break;
508                                }
509                            }
510                            _ => {}
511                        }
512                    }
513                }
514
515                if brace_count == 0 {
516                    Ok(Some(from_brace[..end_pos].trim().to_string()))
517                } else {
518                    Err(anyhow!("Unmatched braces in {}", keyword))
519                }
520            } else {
521                Ok(None)
522            }
523        } else {
524            Ok(None)
525        }
526    }
527
528    fn extract_graph_uri(&self, statement: &str) -> Result<Option<String>> {
529        // Simple graph URI extraction - look for GRAPH <uri> pattern
530        let re = Regex::new(r"(?i)GRAPH\s+<([^>]+)>").unwrap();
531        if let Some(captures) = re.captures(statement) {
532            if let Some(uri) = captures.get(1) {
533                // Normalize the URI
534                let normalized_uri = Self::normalize_uri(uri.as_str());
535                return Ok(Some(normalized_uri));
536            }
537        }
538
539        // Look for URI immediately after graph-related keywords (CLEAR, DROP, CREATE)
540        let upper = statement.to_uppercase();
541        let keywords = ["CLEAR", "DROP", "CREATE"];
542
543        for keyword in &keywords {
544            if let Some(keyword_pos) = upper.find(keyword) {
545                let after_keyword = &statement[keyword_pos + keyword.len()..];
546
547                // Find the first URI in angle brackets after the keyword
548                if let Some(start) = after_keyword.find('<') {
549                    if let Some(end) = after_keyword[start..].find('>') {
550                        let uri = &after_keyword[start + 1..start + end];
551                        let normalized_uri = Self::normalize_uri(uri);
552                        return Ok(Some(normalized_uri));
553                    }
554                }
555            }
556        }
557
558        Ok(None)
559    }
560
561    fn parse_triples(&self, data: &str) -> Result<Vec<Triple>> {
562        let mut triples = Vec::new();
563
564        // First split the data into individual triple statements
565        let triple_statements = self.split_triple_statements(data);
566
567        for statement in triple_statements {
568            let statement = statement.trim();
569            if statement.is_empty() || statement.starts_with('#') {
570                continue;
571            }
572
573            // Parse subject, predicate, object while respecting quotes
574            if let Some(triple) = self.parse_triple_line(statement)? {
575                triples.push(triple);
576            }
577        }
578        Ok(triples)
579    }
580
581    fn split_triple_statements(&self, data: &str) -> Vec<String> {
582        let mut statements = Vec::new();
583        let mut current = String::new();
584        let mut in_quotes = false;
585        let mut in_uri = false;
586
587        for c in data.chars() {
588            match c {
589                '"' => {
590                    in_quotes = !in_quotes;
591                    current.push(c);
592                }
593                '<' if !in_quotes => {
594                    in_uri = true;
595                    current.push(c);
596                }
597                '>' if !in_quotes && in_uri => {
598                    in_uri = false;
599                    current.push(c);
600                }
601                '.' if !in_quotes && !in_uri => {
602                    // End of triple statement
603                    let stmt = current.trim().to_string();
604                    if !stmt.is_empty() {
605                        statements.push(stmt);
606                    }
607                    current.clear();
608                }
609                _ => {
610                    current.push(c);
611                }
612            }
613        }
614
615        // Add any remaining content
616        let stmt = current.trim().to_string();
617        if !stmt.is_empty() {
618            statements.push(stmt);
619        }
620
621        statements
622    }
623
624    fn parse_triple_line(&self, line: &str) -> Result<Option<Triple>> {
625        let mut parts = Vec::new();
626        let mut current_part = String::new();
627        let mut in_quotes = false;
628        let mut in_uri = false;
629        let mut escape_next = false;
630        let mut chars = line.chars().peekable();
631
632        while let Some(c) = chars.next() {
633            if escape_next {
634                current_part.push(c);
635                escape_next = false;
636                continue;
637            }
638
639            match c {
640                '\\' if in_quotes => {
641                    escape_next = true;
642                    current_part.push(c);
643                }
644                '"' => {
645                    in_quotes = !in_quotes;
646                    current_part.push(c);
647                }
648                '<' if !in_quotes => {
649                    in_uri = true;
650                    current_part.push(c);
651                }
652                '>' if !in_quotes && in_uri => {
653                    in_uri = false;
654                    current_part.push(c);
655                }
656                ' ' | '\t' if !in_quotes && !in_uri => {
657                    // Only split on whitespace outside quotes and URIs
658                    if !current_part.is_empty() {
659                        parts.push(current_part.trim().to_string());
660                        current_part.clear();
661                    }
662                    // Skip consecutive whitespace
663                    while let Some(&next_c) = chars.peek() {
664                        if next_c == ' ' || next_c == '\t' {
665                            chars.next();
666                        } else {
667                            break;
668                        }
669                    }
670                }
671                _ => {
672                    current_part.push(c);
673                }
674            }
675        }
676
677        // Add the last part
678        if !current_part.is_empty() {
679            parts.push(current_part.trim().to_string());
680        }
681
682        if parts.len() >= 3 {
683            let subject = Self::expand_term(&parts[0]);
684            let predicate = Self::expand_term(&parts[1]);
685            let object = if parts.len() > 3 {
686                // Join remaining parts as object (for complex literals with multiple parts)
687                let joined = parts[2..].join(" ");
688                Self::expand_term(&joined)
689            } else {
690                Self::expand_term(&parts[2])
691            };
692
693            return Ok(Some(Triple {
694                subject,
695                predicate,
696                object,
697            }));
698        }
699
700        Ok(None)
701    }
702
703    /// Expand and normalize terms (URIs, literals, etc.)
704    fn expand_term(term: &str) -> String {
705        if term.starts_with('<') && term.ends_with('>') {
706            // Full URI - strip angle brackets and normalize
707            let uri = &term[1..term.len() - 1];
708            Self::normalize_uri(uri)
709        } else if term.starts_with('"') {
710            // Literal - return as-is
711            term.to_string()
712        } else if term.starts_with('_') {
713            // Blank node - return as-is
714            term.to_string()
715        } else if term.contains("://") {
716            // Bare URI without brackets - normalize
717            Self::normalize_uri(term)
718        } else {
719            // Return as-is for other terms (prefixed names, etc.)
720            term.to_string()
721        }
722    }
723
724    /// Normalize URI by converting to lowercase (for scheme and domain)
725    fn normalize_uri(uri: &str) -> String {
726        // Convert HTTP(S) schemes and domains to lowercase, preserve path case
727        if uri.starts_with("http://")
728            || uri.starts_with("https://")
729            || uri.starts_with("HTTP://")
730            || uri.starts_with("HTTPS://")
731        {
732            // Find the path start position
733            let scheme_end = if uri.len() >= 8 && uri[..8].to_lowercase() == "https://" {
734                8
735            } else {
736                7
737            };
738
739            if let Some(path_start) = uri[scheme_end..].find('/') {
740                let scheme_and_domain = &uri[..scheme_end + path_start];
741                let path = &uri[scheme_end + path_start..];
742                format!("{}{}", scheme_and_domain.to_lowercase(), path)
743            } else {
744                // No path, just normalize scheme and domain
745                // Don't remove trailing slashes as they might be significant
746                uri.to_lowercase()
747            }
748        } else {
749            // For non-HTTP URIs, return as-is to preserve case sensitivity
750            uri.to_string()
751        }
752    }
753
754    fn process_update_operation(
755        &mut self,
756        operation: &UpdateOperation,
757    ) -> Result<Vec<StreamEvent>> {
758        let mut events = Vec::new();
759        self.operation_counter += 1;
760
761        match operation {
762            UpdateOperation::InsertData { triples } => {
763                for triple in triples {
764                    events.push(StreamEvent::TripleAdded {
765                        subject: triple.subject.clone(),
766                        predicate: triple.predicate.clone(),
767                        object: triple.object.clone(),
768                        graph: None,
769                        metadata: Default::default(),
770                    });
771                }
772            }
773            UpdateOperation::DeleteData { triples } => {
774                for triple in triples {
775                    events.push(StreamEvent::TripleRemoved {
776                        subject: triple.subject.clone(),
777                        predicate: triple.predicate.clone(),
778                        object: triple.object.clone(),
779                        graph: None,
780                        metadata: Default::default(),
781                    });
782                }
783            }
784            UpdateOperation::InsertWhere { insert, .. } => {
785                // For INSERT WHERE, we generate add events for the insert patterns
786                // In a real implementation, we'd need to evaluate the WHERE clause
787                for triple in insert {
788                    events.push(StreamEvent::TripleAdded {
789                        subject: triple.subject.clone(),
790                        predicate: triple.predicate.clone(),
791                        object: triple.object.clone(),
792                        graph: None,
793                        metadata: Default::default(),
794                    });
795                }
796            }
797            UpdateOperation::DeleteWhere { patterns } => {
798                // For DELETE WHERE, we generate remove events for matching patterns
799                for pattern in patterns {
800                    events.push(StreamEvent::TripleRemoved {
801                        subject: pattern.subject.clone(),
802                        predicate: pattern.predicate.clone(),
803                        object: pattern.object.clone(),
804                        graph: None,
805                        metadata: Default::default(),
806                    });
807                }
808            }
809            UpdateOperation::DeleteInsert { delete, insert, .. } => {
810                // Process deletes first, then inserts
811                for triple in delete {
812                    events.push(StreamEvent::TripleRemoved {
813                        subject: triple.subject.clone(),
814                        predicate: triple.predicate.clone(),
815                        object: triple.object.clone(),
816                        graph: None,
817                        metadata: Default::default(),
818                    });
819                }
820                for triple in insert {
821                    events.push(StreamEvent::TripleAdded {
822                        subject: triple.subject.clone(),
823                        predicate: triple.predicate.clone(),
824                        object: triple.object.clone(),
825                        graph: None,
826                        metadata: Default::default(),
827                    });
828                }
829            }
830            UpdateOperation::ClearAll => {
831                events.push(StreamEvent::GraphCleared {
832                    graph: None,
833                    metadata: Default::default(),
834                });
835            }
836            UpdateOperation::ClearDefault => {
837                events.push(StreamEvent::GraphCleared {
838                    graph: None,
839                    metadata: Default::default(),
840                });
841            }
842            UpdateOperation::ClearGraph { graph } => {
843                events.push(StreamEvent::GraphCleared {
844                    graph: graph.clone(),
845                    metadata: Default::default(),
846                });
847            }
848            UpdateOperation::DropGraph { graph } => {
849                events.push(StreamEvent::GraphCleared {
850                    graph: graph.clone(),
851                    metadata: Default::default(),
852                });
853            }
854            UpdateOperation::CreateGraph { .. } => {
855                // Graph creation doesn't generate data events
856            }
857            UpdateOperation::Load { .. } => {
858                // LOAD operations would generate events based on loaded data
859                // For now, we just record the operation
860                events.push(StreamEvent::SparqlUpdate {
861                    query: format!("Operation #{}: {:?}", self.operation_counter, operation),
862                    operation_type: SparqlOperationType::Load,
863                    metadata: Default::default(),
864                });
865            }
866        }
867
868        Ok(events)
869    }
870
871    fn optimize_events(&self, events: Vec<StreamEvent>) -> Vec<StreamEvent> {
872        // Remove redundant add/remove pairs
873        let mut optimized = Vec::new();
874        let mut seen_operations = HashSet::new();
875        let original_count = events.len();
876
877        for event in events {
878            let event_key = match &event {
879                StreamEvent::TripleAdded {
880                    subject,
881                    predicate,
882                    object,
883                    ..
884                }
885                | StreamEvent::TripleRemoved {
886                    subject,
887                    predicate,
888                    object,
889                    ..
890                } => {
891                    format!("{subject}|{predicate}|{object}")
892                }
893                StreamEvent::GraphCleared { graph, .. } => {
894                    format!("graph_clear|{graph:?}")
895                }
896                StreamEvent::SparqlUpdate { query, .. } => {
897                    format!("sparql|{query}")
898                }
899                _ => {
900                    // Other events get unique keys to avoid deduplication
901                    format!("other|{}", uuid::Uuid::new_v4())
902                }
903            };
904
905            if !seen_operations.contains(&event_key) {
906                seen_operations.insert(event_key);
907                optimized.push(event);
908            }
909        }
910
911        debug!("Optimized {} events to {}", original_count, optimized.len());
912        optimized
913    }
914}
915
916impl Default for DeltaComputer {
917    fn default() -> Self {
918        Self::new()
919    }
920}
921
922/// Parsed SPARQL Update operations
923#[derive(Debug, Clone)]
924enum UpdateOperation {
925    InsertData {
926        triples: Vec<Triple>,
927    },
928    DeleteData {
929        triples: Vec<Triple>,
930    },
931    InsertWhere {
932        insert: Vec<Triple>,
933        where_clause: String,
934    },
935    DeleteWhere {
936        patterns: Vec<Triple>,
937    },
938    DeleteInsert {
939        delete: Vec<Triple>,
940        insert: Vec<Triple>,
941        where_clause: Option<String>,
942    },
943    ClearAll,
944    ClearDefault,
945    ClearGraph {
946        graph: Option<String>,
947    },
948    DropGraph {
949        graph: Option<String>,
950    },
951    CreateGraph {
952        graph: Option<String>,
953    },
954    Load {
955        url: String,
956        graph: Option<String>,
957    },
958}
959
960/// Simple triple representation
961#[derive(Debug, Clone, Serialize, Deserialize)]
962struct Triple {
963    subject: String,
964    predicate: String,
965    object: String,
966}
967
968/// Delta stream processor with batching and buffering
969pub struct DeltaProcessor {
970    computer: DeltaComputer,
971    buffer: Vec<StreamEvent>,
972    batch_size: usize,
973    max_buffer_age: chrono::Duration,
974    last_flush: DateTime<Utc>,
975    stats: ProcessorStats,
976}
977
978#[derive(Debug, Default)]
979pub struct ProcessorStats {
980    updates_processed: u64,
981    events_generated: u64,
982    batches_sent: u64,
983    last_activity: Option<DateTime<Utc>>,
984}
985
986impl DeltaProcessor {
987    pub fn new() -> Self {
988        Self {
989            computer: DeltaComputer::new(),
990            buffer: Vec::new(),
991            batch_size: 100,
992            max_buffer_age: chrono::Duration::seconds(30),
993            last_flush: Utc::now(),
994            stats: ProcessorStats::default(),
995        }
996    }
997
998    pub fn with_optimization(mut self, enabled: bool) -> Self {
999        self.computer = self.computer.with_optimization(enabled);
1000        self
1001    }
1002
1003    pub fn with_batch_size(mut self, size: usize) -> Self {
1004        self.batch_size = size;
1005        self
1006    }
1007
1008    pub fn with_max_buffer_age(mut self, duration: chrono::Duration) -> Self {
1009        self.max_buffer_age = duration;
1010        self
1011    }
1012
1013    /// Process SPARQL Update and generate stream events
1014    pub async fn process_update(&mut self, update: &str) -> Result<Vec<StreamEvent>> {
1015        let events = self.computer.compute_delta(update)?;
1016
1017        self.stats.updates_processed += 1;
1018        self.stats.events_generated += events.len() as u64;
1019        self.stats.last_activity = Some(Utc::now());
1020
1021        // Add to buffer
1022        for event in &events {
1023            self.buffer.push(event.clone());
1024        }
1025
1026        // Check if we should flush (but don't return flushed events, just trigger the flush)
1027        let should_flush = self.buffer.len() >= self.batch_size
1028            || Utc::now() - self.last_flush > self.max_buffer_age;
1029
1030        if should_flush {
1031            // Trigger flush but don't return all buffered events
1032            self.flush();
1033        }
1034
1035        // Always return only the events from this specific update
1036        Ok(events)
1037    }
1038
1039    /// Force flush buffered events
1040    pub fn flush(&mut self) -> Vec<StreamEvent> {
1041        let events = self.buffer.clone();
1042        self.buffer.clear();
1043        self.last_flush = Utc::now();
1044
1045        if !events.is_empty() {
1046            self.stats.batches_sent += 1;
1047            debug!("Flushed {} buffered events", events.len());
1048        }
1049
1050        events
1051    }
1052
1053    /// Get processor statistics
1054    pub fn get_stats(&self) -> &ProcessorStats {
1055        &self.stats
1056    }
1057
1058    /// Convert multiple updates to a single patch
1059    pub async fn updates_to_patch(&mut self, updates: &[String]) -> Result<RdfPatch> {
1060        let mut all_events = Vec::new();
1061
1062        for update in updates {
1063            let events = self.computer.compute_delta(update)?;
1064            all_events.extend(events);
1065        }
1066
1067        self.computer.delta_to_patch(&all_events)
1068    }
1069}
1070
1071impl Default for DeltaProcessor {
1072    fn default() -> Self {
1073        Self::new()
1074    }
1075}
1076
1077/// Batch processor for handling multiple updates efficiently
1078pub struct BatchDeltaProcessor {
1079    processors: Vec<DeltaProcessor>,
1080    current_processor: usize,
1081    round_robin: bool,
1082}
1083
1084impl BatchDeltaProcessor {
1085    pub fn new(num_processors: usize) -> Self {
1086        let mut processors = Vec::new();
1087        for _ in 0..num_processors {
1088            processors.push(DeltaProcessor::new());
1089        }
1090
1091        Self {
1092            processors,
1093            current_processor: 0,
1094            round_robin: true,
1095        }
1096    }
1097
1098    pub async fn process_updates(&mut self, updates: &[String]) -> Result<Vec<StreamEvent>> {
1099        let mut all_events = Vec::new();
1100
1101        for update in updates {
1102            let processor_idx = if self.round_robin {
1103                let idx = self.current_processor;
1104                self.current_processor = (self.current_processor + 1) % self.processors.len();
1105                idx
1106            } else {
1107                0 // Use first processor for sequential processing
1108            };
1109
1110            let events = self.processors[processor_idx]
1111                .process_update(update)
1112                .await?;
1113            all_events.extend(events);
1114        }
1115
1116        Ok(all_events)
1117    }
1118
1119    pub fn flush_all(&mut self) -> Vec<StreamEvent> {
1120        let mut all_events = Vec::new();
1121        for processor in &mut self.processors {
1122            let events = processor.flush();
1123            all_events.extend(events);
1124        }
1125        all_events
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use super::*;
1132    use crate::{EventMetadata, StreamEvent};
1133    use std::collections::HashMap;
1134
1135    #[test]
1136    fn test_sparql_parsing() {
1137        let mut computer = DeltaComputer::new().with_optimization(false);
1138
1139        let update = r#"
1140            INSERT DATA {
1141                <http://example.org/person1> <http://example.org/name> "John Doe" .
1142                <http://example.org/person1> <http://example.org/age> "30" .
1143            }
1144        "#;
1145
1146        let events = computer.compute_delta(update).unwrap();
1147        assert_eq!(events.len(), 2);
1148
1149        match &events[0] {
1150            StreamEvent::TripleAdded {
1151                subject,
1152                predicate,
1153                object,
1154                ..
1155            } => {
1156                assert_eq!(subject, "http://example.org/person1");
1157                assert_eq!(predicate, "http://example.org/name");
1158                assert_eq!(object, "\"John Doe\"");
1159            }
1160            _ => panic!("Expected TripleAdded event"),
1161        }
1162    }
1163
1164    #[test]
1165    fn test_delete_data_parsing() {
1166        let mut computer = DeltaComputer::new();
1167
1168        let update = r#"
1169            DELETE DATA {
1170                <http://example.org/person1> <http://example.org/name> "John Doe" .
1171            }
1172        "#;
1173
1174        let events = computer.compute_delta(update).unwrap();
1175        assert_eq!(events.len(), 1);
1176
1177        match &events[0] {
1178            StreamEvent::TripleRemoved {
1179                subject,
1180                predicate,
1181                object,
1182                ..
1183            } => {
1184                assert_eq!(subject, "http://example.org/person1");
1185                assert_eq!(predicate, "http://example.org/name");
1186                assert_eq!(object, "\"John Doe\"");
1187            }
1188            _ => panic!("Expected TripleRemoved event"),
1189        }
1190    }
1191
1192    #[test]
1193    fn test_clear_graph() {
1194        let mut computer = DeltaComputer::new();
1195
1196        let update = "CLEAR GRAPH <http://example.org/graph>";
1197        let events = computer.compute_delta(update).unwrap();
1198        assert_eq!(events.len(), 1);
1199
1200        match &events[0] {
1201            StreamEvent::GraphCleared { graph, .. } => {
1202                assert_eq!(graph, &Some("http://example.org/graph".to_string()));
1203            }
1204            _ => panic!("Expected GraphCleared event"),
1205        }
1206    }
1207
1208    #[test]
1209    fn test_delete_insert() {
1210        let mut computer = DeltaComputer::new().with_optimization(false);
1211
1212        let update = r#"
1213            DELETE {
1214                <http://example.org/person1> <http://example.org/age> "30" .
1215            }
1216            INSERT {
1217                <http://example.org/person1> <http://example.org/age> "31" .
1218            }
1219            WHERE {
1220                <http://example.org/person1> <http://example.org/age> "30" .
1221            }
1222        "#;
1223
1224        let events = computer.compute_delta(update).unwrap();
1225        assert_eq!(events.len(), 2);
1226
1227        // First event should be a delete
1228        match &events[0] {
1229            StreamEvent::TripleRemoved {
1230                subject,
1231                predicate,
1232                object,
1233                ..
1234            } => {
1235                assert_eq!(subject, "http://example.org/person1");
1236                assert_eq!(predicate, "http://example.org/age");
1237                assert_eq!(object, "\"30\"");
1238            }
1239            _ => panic!("Expected TripleRemoved event"),
1240        }
1241
1242        // Second event should be an insert
1243        match &events[1] {
1244            StreamEvent::TripleAdded {
1245                subject,
1246                predicate,
1247                object,
1248                ..
1249            } => {
1250                assert_eq!(subject, "http://example.org/person1");
1251                assert_eq!(predicate, "http://example.org/age");
1252                assert_eq!(object, "\"31\"");
1253            }
1254            _ => panic!("Expected TripleAdded event"),
1255        }
1256    }
1257
1258    #[test]
1259    fn test_delta_to_patch() {
1260        let computer = DeltaComputer::new();
1261
1262        let events = vec![
1263            StreamEvent::TripleAdded {
1264                subject: "http://example.org/s".to_string(),
1265                predicate: "http://example.org/p".to_string(),
1266                object: "http://example.org/o".to_string(),
1267                graph: None,
1268                metadata: EventMetadata {
1269                    event_id: "test".to_string(),
1270                    timestamp: Utc::now(),
1271                    source: "test".to_string(),
1272                    user: None,
1273                    context: None,
1274                    caused_by: None,
1275                    version: "1.0".to_string(),
1276                    properties: HashMap::new(),
1277                    checksum: None,
1278                },
1279            },
1280            StreamEvent::TripleRemoved {
1281                subject: "http://example.org/s2".to_string(),
1282                predicate: "http://example.org/p2".to_string(),
1283                object: "http://example.org/o2".to_string(),
1284                graph: None,
1285                metadata: EventMetadata {
1286                    event_id: "test2".to_string(),
1287                    timestamp: Utc::now(),
1288                    source: "test".to_string(),
1289                    user: None,
1290                    context: None,
1291                    caused_by: None,
1292                    version: "1.0".to_string(),
1293                    properties: HashMap::new(),
1294                    checksum: None,
1295                },
1296            },
1297        ];
1298
1299        let patch = computer.delta_to_patch(&events).unwrap();
1300        assert_eq!(patch.operations.len(), 2);
1301
1302        match &patch.operations[0] {
1303            PatchOperation::Add {
1304                subject,
1305                predicate,
1306                object,
1307            } => {
1308                assert_eq!(subject, "http://example.org/s");
1309                assert_eq!(predicate, "http://example.org/p");
1310                assert_eq!(object, "http://example.org/o");
1311            }
1312            _ => panic!("Expected Add operation"),
1313        }
1314
1315        match &patch.operations[1] {
1316            PatchOperation::Delete {
1317                subject,
1318                predicate,
1319                object,
1320            } => {
1321                assert_eq!(subject, "http://example.org/s2");
1322                assert_eq!(predicate, "http://example.org/p2");
1323                assert_eq!(object, "http://example.org/o2");
1324            }
1325            _ => panic!("Expected Delete operation"),
1326        }
1327    }
1328
1329    #[tokio::test]
1330    async fn test_delta_processor() {
1331        let mut processor = DeltaProcessor::new().with_batch_size(2);
1332
1333        let update1 = r#"
1334            INSERT DATA {
1335                <http://example.org/person1> <http://example.org/name> "John" .
1336            }
1337        "#;
1338
1339        let events1 = processor.process_update(update1).await.unwrap();
1340        // Always returns events from the current update
1341        assert_eq!(events1.len(), 1);
1342
1343        let update2 = r#"
1344            INSERT DATA {
1345                <http://example.org/person2> <http://example.org/name> "Jane" .
1346            }
1347        "#;
1348
1349        let events2 = processor.process_update(update2).await.unwrap();
1350        // Returns events from this update (internal flush is triggered but doesn't affect return value)
1351        assert_eq!(events2.len(), 1);
1352
1353        let stats = processor.get_stats();
1354        assert_eq!(stats.updates_processed, 2);
1355        assert_eq!(stats.events_generated, 2);
1356        assert!(stats.last_activity.is_some());
1357    }
1358
1359    #[tokio::test]
1360    async fn test_batch_processor() {
1361        let mut batch_processor = BatchDeltaProcessor::new(2);
1362
1363        let updates = vec![
1364            r#"INSERT DATA { <http://example.org/p1> <http://example.org/name> "Person1" . }"#
1365                .to_string(),
1366            r#"INSERT DATA { <http://example.org/p2> <http://example.org/name> "Person2" . }"#
1367                .to_string(),
1368            r#"DELETE DATA { <http://example.org/p1> <http://example.org/old> "value" . }"#
1369                .to_string(),
1370        ];
1371
1372        let events = batch_processor.process_updates(&updates).await.unwrap();
1373        assert_eq!(events.len(), 3);
1374
1375        // Check that we got the right types of events
1376        let add_count = events
1377            .iter()
1378            .filter(|e| matches!(e, StreamEvent::TripleAdded { .. }))
1379            .count();
1380        let remove_count = events
1381            .iter()
1382            .filter(|e| matches!(e, StreamEvent::TripleRemoved { .. }))
1383            .count();
1384
1385        assert_eq!(add_count, 2);
1386        assert_eq!(remove_count, 1);
1387    }
1388
1389    #[tokio::test]
1390    async fn test_updates_to_patch() {
1391        let mut processor = DeltaProcessor::new();
1392
1393        let updates = vec![
1394            r#"INSERT DATA { <http://example.org/s> <http://example.org/p> "value1" . }"#
1395                .to_string(),
1396            r#"DELETE DATA { <http://example.org/s> <http://example.org/p> "value1" . }"#
1397                .to_string(),
1398            r#"INSERT DATA { <http://example.org/s> <http://example.org/p> "value2" . }"#
1399                .to_string(),
1400        ];
1401
1402        let patch = processor.updates_to_patch(&updates).await.unwrap();
1403        assert_eq!(patch.operations.len(), 3);
1404
1405        // Should have Add, Delete, Add operations in order
1406        assert!(matches!(patch.operations[0], PatchOperation::Add { .. }));
1407        assert!(matches!(patch.operations[1], PatchOperation::Delete { .. }));
1408        assert!(matches!(patch.operations[2], PatchOperation::Add { .. }));
1409    }
1410
1411    #[test]
1412    fn test_statement_splitting() {
1413        let computer = DeltaComputer::new();
1414
1415        let input = r#"
1416            INSERT DATA { <s1> <p1> "o1" . };
1417            DELETE DATA { <s2> <p2> "o2" . };
1418            CLEAR GRAPH <g1>
1419        "#;
1420
1421        let statements = computer.split_statements(input);
1422        assert_eq!(statements.len(), 3);
1423        assert!(statements[0].contains("INSERT DATA"));
1424        assert!(statements[1].contains("DELETE DATA"));
1425        assert!(statements[2].contains("CLEAR GRAPH"));
1426    }
1427
1428    #[test]
1429    fn test_triple_parsing() {
1430        let computer = DeltaComputer::new();
1431
1432        let data = r#"
1433            <http://example.org/subject> <http://example.org/predicate> "Object literal" .
1434            <http://example.org/s2> <http://example.org/p2> <http://example.org/o2> .
1435        "#;
1436
1437        let triples = computer.parse_triples(data).unwrap();
1438        assert_eq!(triples.len(), 2);
1439
1440        assert_eq!(triples[0].subject, "http://example.org/subject");
1441        assert_eq!(triples[0].predicate, "http://example.org/predicate");
1442        assert_eq!(triples[0].object, "\"Object literal\"");
1443
1444        assert_eq!(triples[1].subject, "http://example.org/s2");
1445        assert_eq!(triples[1].predicate, "http://example.org/p2");
1446        assert_eq!(triples[1].object, "http://example.org/o2");
1447    }
1448
1449    #[test]
1450    fn test_optimization() {
1451        let computer = DeltaComputer::new().with_optimization(true);
1452
1453        let events = vec![
1454            StreamEvent::TripleAdded {
1455                subject: "s".to_string(),
1456                predicate: "p".to_string(),
1457                object: "o".to_string(),
1458                graph: None,
1459                metadata: EventMetadata {
1460                    event_id: "1".to_string(),
1461                    timestamp: Utc::now(),
1462                    source: "test".to_string(),
1463                    user: None,
1464                    context: None,
1465                    caused_by: None,
1466                    version: "1.0".to_string(),
1467                    properties: HashMap::new(),
1468                    checksum: None,
1469                },
1470            },
1471            StreamEvent::TripleAdded {
1472                subject: "s".to_string(),
1473                predicate: "p".to_string(),
1474                object: "o".to_string(),
1475                graph: None,
1476                metadata: EventMetadata {
1477                    event_id: "2".to_string(),
1478                    timestamp: Utc::now(),
1479                    source: "test".to_string(),
1480                    user: None,
1481                    context: None,
1482                    caused_by: None,
1483                    version: "1.0".to_string(),
1484                    properties: HashMap::new(),
1485                    checksum: None,
1486                },
1487            },
1488        ];
1489
1490        let optimized = computer.optimize_events(events);
1491        // Should remove the duplicate
1492        assert_eq!(optimized.len(), 1);
1493    }
1494}