Skip to main content

oxirs_stream/
csparql.rs

1//! # C-SPARQL (Continuous SPARQL) Extensions
2//!
3//! Implementation of C-SPARQL for continuous query processing over RDF streams.
4//!
5//! C-SPARQL extends SPARQL 1.1 with:
6//! - Stream declarations (FROM STREAM)
7//! - Time-based and count-based windows
8//! - Tumbling and sliding windows
9//! - Stream-to-relation operators
10//! - Temporal aggregations
11//!
12//! ## References
13//! - Barbieri et al. "C-SPARQL: a Continuous Query Language for RDF Data Streams"
14//! - <https://streamreasoning.org/>
15
16use anyhow::{anyhow, Result};
17use chrono::{DateTime, Duration as ChronoDuration, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::RwLock;
23use tracing::{debug, info};
24
25use crate::store_integration::{QueryResult, RdfStore, Triple};
26use crate::StreamEvent;
27
28/// C-SPARQL query engine for continuous stream processing
29pub struct CSparqlEngine {
30    /// RDF store backend
31    store: Arc<dyn RdfStore>,
32    /// Active stream windows
33    windows: Arc<RwLock<HashMap<String, StreamWindow>>>,
34    /// Registered queries
35    queries: Arc<RwLock<HashMap<String, CSparqlQuery>>>,
36    /// Configuration
37    config: CSparqlConfig,
38    /// Statistics
39    stats: Arc<RwLock<CSparqlStats>>,
40}
41
42/// C-SPARQL configuration
43#[derive(Debug, Clone)]
44pub struct CSparqlConfig {
45    /// Maximum number of concurrent queries
46    pub max_queries: usize,
47    /// Default window size
48    pub default_window_size: Duration,
49    /// Default window step
50    pub default_window_step: Duration,
51    /// Enable incremental evaluation
52    pub incremental_evaluation: bool,
53    /// Memory limit for windows (bytes)
54    pub memory_limit: usize,
55}
56
57impl Default for CSparqlConfig {
58    fn default() -> Self {
59        Self {
60            max_queries: 100,
61            default_window_size: Duration::from_secs(60),
62            default_window_step: Duration::from_secs(10),
63            incremental_evaluation: true,
64            memory_limit: 1024 * 1024 * 100, // 100 MB
65        }
66    }
67}
68
69/// C-SPARQL query representation
70#[derive(Debug, Clone)]
71pub struct CSparqlQuery {
72    /// Query identifier
73    pub id: String,
74    /// Original query string
75    pub query_string: String,
76    /// Parsed query components
77    pub components: QueryComponents,
78    /// Execution metadata
79    pub metadata: QueryMetadata,
80    /// Query state
81    pub state: QueryState,
82}
83
84/// Parsed query components
85#[derive(Debug, Clone)]
86pub struct QueryComponents {
87    /// Stream declarations
88    pub streams: Vec<StreamDeclaration>,
89    /// Window specifications
90    pub windows: Vec<WindowSpec>,
91    /// SELECT/CONSTRUCT/ASK query part
92    pub query_type: QueryType,
93    /// WHERE clause patterns
94    pub patterns: Vec<TriplePattern>,
95    /// Aggregations
96    pub aggregations: Vec<Aggregation>,
97    /// GROUP BY variables
98    pub group_by: Vec<String>,
99    /// HAVING conditions
100    pub having: Option<String>,
101    /// ORDER BY clause
102    pub order_by: Vec<OrderByClause>,
103    /// LIMIT
104    pub limit: Option<usize>,
105}
106
107/// Stream declaration (FROM STREAM `<uri>`)
108#[derive(Debug, Clone)]
109pub struct StreamDeclaration {
110    /// Stream URI
111    pub uri: String,
112    /// Window specification for this stream
113    pub window: WindowSpec,
114}
115
116/// Window specification
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct WindowSpec {
119    /// Window type
120    pub window_type: WindowType,
121    /// Window range (time-based or count-based)
122    pub range: WindowRange,
123    /// Window step (for sliding windows)
124    pub step: Option<WindowRange>,
125}
126
127/// Window types
128#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
129pub enum WindowType {
130    /// Tumbling window (non-overlapping)
131    Tumbling,
132    /// Sliding window (overlapping)
133    Sliding,
134    /// Landmark window (from a fixed point)
135    Landmark,
136}
137
138/// Window range specification
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub enum WindowRange {
141    /// Time-based window (e.g., "PT10S" for 10 seconds)
142    Time(Duration),
143    /// Count-based window (number of events)
144    Count(usize),
145    /// Batch-based (process N batches)
146    Batch(usize),
147}
148
149/// Query types
150#[derive(Debug, Clone, PartialEq)]
151pub enum QueryType {
152    Select,
153    Construct,
154    Ask,
155    Describe,
156}
157
158/// Triple pattern in WHERE clause
159#[derive(Debug, Clone)]
160pub struct TriplePattern {
161    pub subject: PatternElement,
162    pub predicate: PatternElement,
163    pub object: PatternElement,
164}
165
166/// Pattern element (variable or value)
167#[derive(Debug, Clone)]
168pub enum PatternElement {
169    Variable(String),
170    IRI(String),
171    Literal(String),
172    Blank(String),
173}
174
175/// Aggregation function
176#[derive(Debug, Clone)]
177pub struct Aggregation {
178    pub function: AggregationFunction,
179    pub variable: String,
180    pub alias: Option<String>,
181}
182
183/// Aggregation functions
184#[derive(Debug, Clone, PartialEq)]
185pub enum AggregationFunction {
186    Count,
187    Sum,
188    Avg,
189    Min,
190    Max,
191    Sample,
192    GroupConcat { separator: String },
193}
194
195/// ORDER BY clause
196#[derive(Debug, Clone)]
197pub struct OrderByClause {
198    pub variable: String,
199    pub ascending: bool,
200}
201
202/// Query metadata
203#[derive(Debug, Clone)]
204pub struct QueryMetadata {
205    pub name: Option<String>,
206    pub description: Option<String>,
207    pub created_at: DateTime<Utc>,
208    pub owner: Option<String>,
209    pub tags: Vec<String>,
210}
211
212/// Query execution state
213#[derive(Debug, Clone, PartialEq)]
214pub enum QueryState {
215    Registered,
216    Running,
217    Paused,
218    Stopped,
219    Error(String),
220}
221
222/// Stream window for buffering events
223pub struct StreamWindow {
224    /// Window identifier
225    pub id: String,
226    /// Window specification
227    pub spec: WindowSpec,
228    /// Buffered triples
229    pub buffer: VecDeque<WindowedTriple>,
230    /// Window start time
231    pub start_time: DateTime<Utc>,
232    /// Last update time
233    pub last_update: DateTime<Utc>,
234    /// Total events processed
235    pub event_count: usize,
236}
237
238/// Triple with timestamp for windowing
239#[derive(Debug, Clone)]
240pub struct WindowedTriple {
241    pub triple: Triple,
242    pub timestamp: DateTime<Utc>,
243    pub event_id: String,
244}
245
246/// C-SPARQL statistics
247#[derive(Debug, Clone, Default)]
248pub struct CSparqlStats {
249    pub queries_registered: u64,
250    pub queries_executed: u64,
251    pub queries_failed: u64,
252    pub total_events_processed: u64,
253    pub total_results_produced: u64,
254    pub avg_query_latency_ms: f64,
255    pub active_windows: usize,
256}
257
258impl CSparqlEngine {
259    /// Create a new C-SPARQL engine
260    pub fn new(store: Arc<dyn RdfStore>, config: CSparqlConfig) -> Self {
261        Self {
262            store,
263            windows: Arc::new(RwLock::new(HashMap::new())),
264            queries: Arc::new(RwLock::new(HashMap::new())),
265            config,
266            stats: Arc::new(RwLock::new(CSparqlStats::default())),
267        }
268    }
269
270    /// Register a C-SPARQL query
271    pub async fn register_query(&self, query_string: String) -> Result<String> {
272        let query_id = uuid::Uuid::new_v4().to_string();
273
274        // Parse the C-SPARQL query
275        let components = self.parse_csparql_query(&query_string)?;
276
277        let query = CSparqlQuery {
278            id: query_id.clone(),
279            query_string,
280            components,
281            metadata: QueryMetadata {
282                name: None,
283                description: None,
284                created_at: Utc::now(),
285                owner: None,
286                tags: Vec::new(),
287            },
288            state: QueryState::Registered,
289        };
290
291        // Register the query
292        let mut queries = self.queries.write().await;
293        if queries.len() >= self.config.max_queries {
294            return Err(anyhow!("Maximum number of queries reached"));
295        }
296        queries.insert(query_id.clone(), query);
297
298        let mut stats = self.stats.write().await;
299        stats.queries_registered += 1;
300
301        info!("Registered C-SPARQL query: {}", query_id);
302        Ok(query_id)
303    }
304
305    /// Process a stream event
306    pub async fn process_event(&self, event: &StreamEvent) -> Result<()> {
307        // Extract triples from event
308        let triples = self.extract_triples_from_event(event)?;
309
310        // Update all relevant windows
311        let mut windows = self.windows.write().await;
312        for (_window_id, window) in windows.iter_mut() {
313            for triple in &triples {
314                let windowed_triple = WindowedTriple {
315                    triple: triple.clone(),
316                    timestamp: Utc::now(),
317                    event_id: uuid::Uuid::new_v4().to_string(),
318                };
319
320                window.buffer.push_back(windowed_triple);
321                window.event_count += 1;
322                window.last_update = Utc::now();
323            }
324
325            // Apply window eviction policy
326            self.evict_expired_triples(window).await?;
327        }
328
329        let mut stats = self.stats.write().await;
330        stats.total_events_processed += 1;
331
332        Ok(())
333    }
334
335    /// Execute a registered query
336    pub async fn execute_query(&self, query_id: &str) -> Result<QueryResult> {
337        let queries = self.queries.read().await;
338        let query = queries
339            .get(query_id)
340            .ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
341
342        // Get relevant window data
343        let window_data = self.get_window_data_for_query(query).await?;
344
345        // Execute SPARQL query on window data
346        let result = self.execute_sparql_on_window(query, &window_data).await?;
347
348        let mut stats = self.stats.write().await;
349        stats.queries_executed += 1;
350        stats.total_results_produced += result.bindings.len() as u64;
351
352        Ok(result)
353    }
354
355    /// Parse C-SPARQL query string
356    fn parse_csparql_query(&self, query: &str) -> Result<QueryComponents> {
357        // Simplified parser - in production, use a full SPARQL parser with C-SPARQL extensions
358        let streams = self.parse_stream_declarations(query)?;
359        let windows = self.parse_window_specifications(query)?;
360        let query_type = self.parse_query_type(query)?;
361
362        Ok(QueryComponents {
363            streams,
364            windows,
365            query_type,
366            patterns: Vec::new(),
367            aggregations: Vec::new(),
368            group_by: Vec::new(),
369            having: None,
370            order_by: Vec::new(),
371            limit: None,
372        })
373    }
374
375    /// Parse stream declarations (FROM STREAM)
376    fn parse_stream_declarations(&self, query: &str) -> Result<Vec<StreamDeclaration>> {
377        let mut streams = Vec::new();
378
379        // Look for "FROM STREAM <uri> [RANGE duration] [STEP duration]"
380        if query.contains("FROM STREAM") {
381            // Simple regex-based parsing (use proper parser in production)
382            let parts: Vec<&str> = query.split("FROM STREAM").collect();
383            for part in parts.iter().skip(1) {
384                if let Some(uri_end) = part.find('[') {
385                    let uri = part[..uri_end]
386                        .trim()
387                        .trim_matches('<')
388                        .trim_matches('>')
389                        .to_string();
390
391                    // Parse window specification
392                    let window = if let Some(range_start) = part.find("RANGE") {
393                        self.parse_window_from_string(&part[range_start..])?
394                    } else {
395                        WindowSpec {
396                            window_type: WindowType::Tumbling,
397                            range: WindowRange::Time(self.config.default_window_size),
398                            step: None,
399                        }
400                    };
401
402                    streams.push(StreamDeclaration { uri, window });
403                }
404            }
405        }
406
407        Ok(streams)
408    }
409
410    /// Parse window specifications
411    fn parse_window_specifications(&self, query: &str) -> Result<Vec<WindowSpec>> {
412        let mut windows = Vec::new();
413
414        // Parse RANGE and STEP clauses
415        if query.contains("RANGE") {
416            let window = self.parse_window_from_string(query)?;
417            windows.push(window);
418        }
419
420        Ok(windows)
421    }
422
423    /// Parse window from string
424    fn parse_window_from_string(&self, s: &str) -> Result<WindowSpec> {
425        let has_step = s.contains("STEP");
426        let window_type = if has_step {
427            WindowType::Sliding
428        } else {
429            WindowType::Tumbling
430        };
431
432        // Parse RANGE value (e.g., "PT10S" for 10 seconds)
433        let range = if let Some(range_pos) = s.find("RANGE") {
434            let range_str = &s[range_pos + 5..].trim();
435            if range_str.starts_with("PT") {
436                // Parse ISO 8601 duration
437                let duration = self.parse_duration(range_str)?;
438                WindowRange::Time(duration)
439            } else if let Ok(count) = range_str.parse::<usize>() {
440                WindowRange::Count(count)
441            } else {
442                WindowRange::Time(self.config.default_window_size)
443            }
444        } else {
445            WindowRange::Time(self.config.default_window_size)
446        };
447
448        // Parse STEP value
449        let step = if let Some(step_pos) = s.find("STEP") {
450            let step_str = &s[step_pos + 4..].trim();
451            if step_str.starts_with("PT") {
452                let duration = self.parse_duration(step_str)?;
453                Some(WindowRange::Time(duration))
454            } else {
455                Some(WindowRange::Time(self.config.default_window_step))
456            }
457        } else {
458            None
459        };
460
461        Ok(WindowSpec {
462            window_type,
463            range,
464            step,
465        })
466    }
467
468    /// Parse ISO 8601 duration (simplified)
469    fn parse_duration(&self, s: &str) -> Result<Duration> {
470        // Handle PT format (e.g., PT10S, PT5M, PT1H)
471        if !s.starts_with("PT") {
472            return Err(anyhow!("Invalid duration format: {}", s));
473        }
474
475        let duration_part = &s[2..];
476
477        if let Some(seconds_pos) = duration_part.find('S') {
478            let seconds: u64 = duration_part[..seconds_pos].parse()?;
479            Ok(Duration::from_secs(seconds))
480        } else if let Some(minutes_pos) = duration_part.find('M') {
481            let minutes: u64 = duration_part[..minutes_pos].parse()?;
482            Ok(Duration::from_secs(minutes * 60))
483        } else if let Some(hours_pos) = duration_part.find('H') {
484            let hours: u64 = duration_part[..hours_pos].parse()?;
485            Ok(Duration::from_secs(hours * 3600))
486        } else {
487            Err(anyhow!("Invalid duration format: {}", s))
488        }
489    }
490
491    /// Parse query type
492    fn parse_query_type(&self, query: &str) -> Result<QueryType> {
493        let upper = query.to_uppercase();
494        if upper.contains("SELECT") {
495            Ok(QueryType::Select)
496        } else if upper.contains("CONSTRUCT") {
497            Ok(QueryType::Construct)
498        } else if upper.contains("ASK") {
499            Ok(QueryType::Ask)
500        } else if upper.contains("DESCRIBE") {
501            Ok(QueryType::Describe)
502        } else {
503            Err(anyhow!("Unknown query type"))
504        }
505    }
506
507    /// Evict expired triples from window
508    async fn evict_expired_triples(&self, window: &mut StreamWindow) -> Result<()> {
509        let now = Utc::now();
510
511        match &window.spec.range {
512            WindowRange::Time(duration) => {
513                // Remove triples older than window range
514                let cutoff = now - ChronoDuration::from_std(*duration)?;
515                window.buffer.retain(|t| t.timestamp > cutoff);
516            }
517            WindowRange::Count(max_count) => {
518                // Keep only the last N triples
519                while window.buffer.len() > *max_count {
520                    window.buffer.pop_front();
521                }
522            }
523            WindowRange::Batch(max_batches) => {
524                // Batch-based eviction (simplified)
525                if window.buffer.len() > max_batches * 1000 {
526                    window.buffer.drain(0..*max_batches * 500);
527                }
528            }
529        }
530
531        Ok(())
532    }
533
534    /// Extract triples from stream event
535    fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
536        // Extract RDF triples from the event
537        // This depends on the event type and structure
538        let mut triples = Vec::new();
539
540        match event {
541            StreamEvent::TripleAdded {
542                subject,
543                predicate,
544                object,
545                graph,
546                ..
547            } => {
548                triples.push(Triple {
549                    subject: subject.clone(),
550                    predicate: predicate.clone(),
551                    object: object.clone(),
552                    graph: graph.clone(),
553                });
554            }
555            StreamEvent::QuadAdded {
556                subject,
557                predicate,
558                object,
559                graph,
560                ..
561            } => {
562                triples.push(Triple {
563                    subject: subject.clone(),
564                    predicate: predicate.clone(),
565                    object: object.clone(),
566                    graph: Some(graph.clone()),
567                });
568            }
569            StreamEvent::SparqlUpdate { query, .. } => {
570                // Parse SPARQL update and extract triples
571                debug!("Extracting triples from SPARQL update: {}", query);
572            }
573            _ => {
574                // Other event types might not contain RDF data
575            }
576        }
577
578        Ok(triples)
579    }
580
581    /// Get window data for query execution
582    async fn get_window_data_for_query(&self, query: &CSparqlQuery) -> Result<Vec<Triple>> {
583        let mut all_triples = Vec::new();
584
585        let windows = self.windows.read().await;
586        for stream in &query.components.streams {
587            // Find matching window
588            if let Some(window) = windows.get(&stream.uri) {
589                for windowed_triple in &window.buffer {
590                    all_triples.push(windowed_triple.triple.clone());
591                }
592            }
593        }
594
595        Ok(all_triples)
596    }
597
598    /// Execute SPARQL query on window data
599    async fn execute_sparql_on_window(
600        &self,
601        query: &CSparqlQuery,
602        triples: &[Triple],
603    ) -> Result<QueryResult> {
604        // In a real implementation, insert triples into a temporary graph
605        // and execute the SPARQL query
606
607        debug!(
608            "Executing C-SPARQL query {} on {} triples",
609            query.id,
610            triples.len()
611        );
612
613        // Simplified result
614        Ok(QueryResult {
615            bindings: Vec::new(),
616        })
617    }
618
619    /// Get statistics
620    pub async fn get_stats(&self) -> CSparqlStats {
621        self.stats.read().await.clone()
622    }
623
624    /// Start a registered query
625    pub async fn start_query(&self, query_id: &str) -> Result<()> {
626        let mut queries = self.queries.write().await;
627        if let Some(query) = queries.get_mut(query_id) {
628            query.state = QueryState::Running;
629            info!("Started C-SPARQL query: {}", query_id);
630            Ok(())
631        } else {
632            Err(anyhow!("Query not found: {}", query_id))
633        }
634    }
635
636    /// Stop a running query
637    pub async fn stop_query(&self, query_id: &str) -> Result<()> {
638        let mut queries = self.queries.write().await;
639        if let Some(query) = queries.get_mut(query_id) {
640            query.state = QueryState::Stopped;
641            info!("Stopped C-SPARQL query: {}", query_id);
642            Ok(())
643        } else {
644            Err(anyhow!("Query not found: {}", query_id))
645        }
646    }
647
648    /// Unregister a query
649    pub async fn unregister_query(&self, query_id: &str) -> Result<()> {
650        let mut queries = self.queries.write().await;
651        queries
652            .remove(query_id)
653            .ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
654
655        info!("Unregistered C-SPARQL query: {}", query_id);
656        Ok(())
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663
664    #[tokio::test]
665    async fn test_csparql_config_defaults() {
666        let config = CSparqlConfig::default();
667        assert_eq!(config.max_queries, 100);
668        assert!(config.incremental_evaluation);
669    }
670
671    #[tokio::test]
672    async fn test_window_spec_creation() {
673        let window = WindowSpec {
674            window_type: WindowType::Tumbling,
675            range: WindowRange::Time(Duration::from_secs(60)),
676            step: None,
677        };
678
679        assert_eq!(window.window_type, WindowType::Tumbling);
680        matches!(window.range, WindowRange::Time(_));
681    }
682
683    #[tokio::test]
684    async fn test_query_type_parsing() {
685        let query_select = "SELECT * WHERE { ?s ?p ?o }";
686        let query_construct = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }";
687
688        assert!(query_select.to_uppercase().contains("SELECT"));
689        assert!(query_construct.to_uppercase().contains("CONSTRUCT"));
690    }
691
692    #[test]
693    fn test_duration_parsing_standalone() {
694        // Test duration parsing without needing a store
695        let parse_duration = |s: &str| -> Result<Duration> {
696            if !s.starts_with("PT") {
697                return Err(anyhow!("Invalid duration format: {}", s));
698            }
699
700            let duration_part = &s[2..];
701
702            if let Some(seconds_pos) = duration_part.find('S') {
703                let seconds: u64 = duration_part[..seconds_pos].parse()?;
704                Ok(Duration::from_secs(seconds))
705            } else if let Some(minutes_pos) = duration_part.find('M') {
706                let minutes: u64 = duration_part[..minutes_pos].parse()?;
707                Ok(Duration::from_secs(minutes * 60))
708            } else if let Some(hours_pos) = duration_part.find('H') {
709                let hours: u64 = duration_part[..hours_pos].parse()?;
710                Ok(Duration::from_secs(hours * 3600))
711            } else {
712                Err(anyhow!("Invalid duration format: {}", s))
713            }
714        };
715
716        let duration = parse_duration("PT10S").unwrap();
717        assert_eq!(duration, Duration::from_secs(10));
718
719        let duration = parse_duration("PT5M").unwrap();
720        assert_eq!(duration, Duration::from_secs(300));
721
722        let duration = parse_duration("PT1H").unwrap();
723        assert_eq!(duration, Duration::from_secs(3600));
724    }
725
726    #[tokio::test]
727    async fn test_csparql_stats() {
728        let stats = CSparqlStats {
729            queries_registered: 5,
730            queries_executed: 100,
731            queries_failed: 2,
732            total_events_processed: 1000,
733            total_results_produced: 500,
734            avg_query_latency_ms: 15.5,
735            active_windows: 3,
736        };
737
738        assert_eq!(stats.queries_registered, 5);
739        assert_eq!(stats.total_events_processed, 1000);
740    }
741}