1use anyhow::{anyhow, Result};
17use chrono::{DateTime, Duration as ChronoDuration, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::RwLock;
23use tracing::{debug, info};
24
25use crate::store_integration::{QueryResult, RdfStore, Triple};
26use crate::StreamEvent;
27
28pub struct CSparqlEngine {
30 store: Arc<dyn RdfStore>,
32 windows: Arc<RwLock<HashMap<String, StreamWindow>>>,
34 queries: Arc<RwLock<HashMap<String, CSparqlQuery>>>,
36 config: CSparqlConfig,
38 stats: Arc<RwLock<CSparqlStats>>,
40}
41
42#[derive(Debug, Clone)]
44pub struct CSparqlConfig {
45 pub max_queries: usize,
47 pub default_window_size: Duration,
49 pub default_window_step: Duration,
51 pub incremental_evaluation: bool,
53 pub memory_limit: usize,
55}
56
57impl Default for CSparqlConfig {
58 fn default() -> Self {
59 Self {
60 max_queries: 100,
61 default_window_size: Duration::from_secs(60),
62 default_window_step: Duration::from_secs(10),
63 incremental_evaluation: true,
64 memory_limit: 1024 * 1024 * 100, }
66 }
67}
68
69#[derive(Debug, Clone)]
71pub struct CSparqlQuery {
72 pub id: String,
74 pub query_string: String,
76 pub components: QueryComponents,
78 pub metadata: QueryMetadata,
80 pub state: QueryState,
82}
83
84#[derive(Debug, Clone)]
86pub struct QueryComponents {
87 pub streams: Vec<StreamDeclaration>,
89 pub windows: Vec<WindowSpec>,
91 pub query_type: QueryType,
93 pub patterns: Vec<TriplePattern>,
95 pub aggregations: Vec<Aggregation>,
97 pub group_by: Vec<String>,
99 pub having: Option<String>,
101 pub order_by: Vec<OrderByClause>,
103 pub limit: Option<usize>,
105}
106
107#[derive(Debug, Clone)]
109pub struct StreamDeclaration {
110 pub uri: String,
112 pub window: WindowSpec,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct WindowSpec {
119 pub window_type: WindowType,
121 pub range: WindowRange,
123 pub step: Option<WindowRange>,
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
129pub enum WindowType {
130 Tumbling,
132 Sliding,
134 Landmark,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub enum WindowRange {
141 Time(Duration),
143 Count(usize),
145 Batch(usize),
147}
148
149#[derive(Debug, Clone, PartialEq)]
151pub enum QueryType {
152 Select,
153 Construct,
154 Ask,
155 Describe,
156}
157
158#[derive(Debug, Clone)]
160pub struct TriplePattern {
161 pub subject: PatternElement,
162 pub predicate: PatternElement,
163 pub object: PatternElement,
164}
165
166#[derive(Debug, Clone)]
168pub enum PatternElement {
169 Variable(String),
170 IRI(String),
171 Literal(String),
172 Blank(String),
173}
174
175#[derive(Debug, Clone)]
177pub struct Aggregation {
178 pub function: AggregationFunction,
179 pub variable: String,
180 pub alias: Option<String>,
181}
182
183#[derive(Debug, Clone, PartialEq)]
185pub enum AggregationFunction {
186 Count,
187 Sum,
188 Avg,
189 Min,
190 Max,
191 Sample,
192 GroupConcat { separator: String },
193}
194
195#[derive(Debug, Clone)]
197pub struct OrderByClause {
198 pub variable: String,
199 pub ascending: bool,
200}
201
202#[derive(Debug, Clone)]
204pub struct QueryMetadata {
205 pub name: Option<String>,
206 pub description: Option<String>,
207 pub created_at: DateTime<Utc>,
208 pub owner: Option<String>,
209 pub tags: Vec<String>,
210}
211
212#[derive(Debug, Clone, PartialEq)]
214pub enum QueryState {
215 Registered,
216 Running,
217 Paused,
218 Stopped,
219 Error(String),
220}
221
222pub struct StreamWindow {
224 pub id: String,
226 pub spec: WindowSpec,
228 pub buffer: VecDeque<WindowedTriple>,
230 pub start_time: DateTime<Utc>,
232 pub last_update: DateTime<Utc>,
234 pub event_count: usize,
236}
237
238#[derive(Debug, Clone)]
240pub struct WindowedTriple {
241 pub triple: Triple,
242 pub timestamp: DateTime<Utc>,
243 pub event_id: String,
244}
245
246#[derive(Debug, Clone, Default)]
248pub struct CSparqlStats {
249 pub queries_registered: u64,
250 pub queries_executed: u64,
251 pub queries_failed: u64,
252 pub total_events_processed: u64,
253 pub total_results_produced: u64,
254 pub avg_query_latency_ms: f64,
255 pub active_windows: usize,
256}
257
258impl CSparqlEngine {
259 pub fn new(store: Arc<dyn RdfStore>, config: CSparqlConfig) -> Self {
261 Self {
262 store,
263 windows: Arc::new(RwLock::new(HashMap::new())),
264 queries: Arc::new(RwLock::new(HashMap::new())),
265 config,
266 stats: Arc::new(RwLock::new(CSparqlStats::default())),
267 }
268 }
269
270 pub async fn register_query(&self, query_string: String) -> Result<String> {
272 let query_id = uuid::Uuid::new_v4().to_string();
273
274 let components = self.parse_csparql_query(&query_string)?;
276
277 let query = CSparqlQuery {
278 id: query_id.clone(),
279 query_string,
280 components,
281 metadata: QueryMetadata {
282 name: None,
283 description: None,
284 created_at: Utc::now(),
285 owner: None,
286 tags: Vec::new(),
287 },
288 state: QueryState::Registered,
289 };
290
291 let mut queries = self.queries.write().await;
293 if queries.len() >= self.config.max_queries {
294 return Err(anyhow!("Maximum number of queries reached"));
295 }
296 queries.insert(query_id.clone(), query);
297
298 let mut stats = self.stats.write().await;
299 stats.queries_registered += 1;
300
301 info!("Registered C-SPARQL query: {}", query_id);
302 Ok(query_id)
303 }
304
305 pub async fn process_event(&self, event: &StreamEvent) -> Result<()> {
307 let triples = self.extract_triples_from_event(event)?;
309
310 let mut windows = self.windows.write().await;
312 for (_window_id, window) in windows.iter_mut() {
313 for triple in &triples {
314 let windowed_triple = WindowedTriple {
315 triple: triple.clone(),
316 timestamp: Utc::now(),
317 event_id: uuid::Uuid::new_v4().to_string(),
318 };
319
320 window.buffer.push_back(windowed_triple);
321 window.event_count += 1;
322 window.last_update = Utc::now();
323 }
324
325 self.evict_expired_triples(window).await?;
327 }
328
329 let mut stats = self.stats.write().await;
330 stats.total_events_processed += 1;
331
332 Ok(())
333 }
334
335 pub async fn execute_query(&self, query_id: &str) -> Result<QueryResult> {
337 let queries = self.queries.read().await;
338 let query = queries
339 .get(query_id)
340 .ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
341
342 let window_data = self.get_window_data_for_query(query).await?;
344
345 let result = self.execute_sparql_on_window(query, &window_data).await?;
347
348 let mut stats = self.stats.write().await;
349 stats.queries_executed += 1;
350 stats.total_results_produced += result.bindings.len() as u64;
351
352 Ok(result)
353 }
354
355 fn parse_csparql_query(&self, query: &str) -> Result<QueryComponents> {
357 let streams = self.parse_stream_declarations(query)?;
359 let windows = self.parse_window_specifications(query)?;
360 let query_type = self.parse_query_type(query)?;
361
362 Ok(QueryComponents {
363 streams,
364 windows,
365 query_type,
366 patterns: Vec::new(),
367 aggregations: Vec::new(),
368 group_by: Vec::new(),
369 having: None,
370 order_by: Vec::new(),
371 limit: None,
372 })
373 }
374
375 fn parse_stream_declarations(&self, query: &str) -> Result<Vec<StreamDeclaration>> {
377 let mut streams = Vec::new();
378
379 if query.contains("FROM STREAM") {
381 let parts: Vec<&str> = query.split("FROM STREAM").collect();
383 for part in parts.iter().skip(1) {
384 if let Some(uri_end) = part.find('[') {
385 let uri = part[..uri_end]
386 .trim()
387 .trim_matches('<')
388 .trim_matches('>')
389 .to_string();
390
391 let window = if let Some(range_start) = part.find("RANGE") {
393 self.parse_window_from_string(&part[range_start..])?
394 } else {
395 WindowSpec {
396 window_type: WindowType::Tumbling,
397 range: WindowRange::Time(self.config.default_window_size),
398 step: None,
399 }
400 };
401
402 streams.push(StreamDeclaration { uri, window });
403 }
404 }
405 }
406
407 Ok(streams)
408 }
409
410 fn parse_window_specifications(&self, query: &str) -> Result<Vec<WindowSpec>> {
412 let mut windows = Vec::new();
413
414 if query.contains("RANGE") {
416 let window = self.parse_window_from_string(query)?;
417 windows.push(window);
418 }
419
420 Ok(windows)
421 }
422
423 fn parse_window_from_string(&self, s: &str) -> Result<WindowSpec> {
425 let has_step = s.contains("STEP");
426 let window_type = if has_step {
427 WindowType::Sliding
428 } else {
429 WindowType::Tumbling
430 };
431
432 let range = if let Some(range_pos) = s.find("RANGE") {
434 let range_str = &s[range_pos + 5..].trim();
435 if range_str.starts_with("PT") {
436 let duration = self.parse_duration(range_str)?;
438 WindowRange::Time(duration)
439 } else if let Ok(count) = range_str.parse::<usize>() {
440 WindowRange::Count(count)
441 } else {
442 WindowRange::Time(self.config.default_window_size)
443 }
444 } else {
445 WindowRange::Time(self.config.default_window_size)
446 };
447
448 let step = if let Some(step_pos) = s.find("STEP") {
450 let step_str = &s[step_pos + 4..].trim();
451 if step_str.starts_with("PT") {
452 let duration = self.parse_duration(step_str)?;
453 Some(WindowRange::Time(duration))
454 } else {
455 Some(WindowRange::Time(self.config.default_window_step))
456 }
457 } else {
458 None
459 };
460
461 Ok(WindowSpec {
462 window_type,
463 range,
464 step,
465 })
466 }
467
468 fn parse_duration(&self, s: &str) -> Result<Duration> {
470 if !s.starts_with("PT") {
472 return Err(anyhow!("Invalid duration format: {}", s));
473 }
474
475 let duration_part = &s[2..];
476
477 if let Some(seconds_pos) = duration_part.find('S') {
478 let seconds: u64 = duration_part[..seconds_pos].parse()?;
479 Ok(Duration::from_secs(seconds))
480 } else if let Some(minutes_pos) = duration_part.find('M') {
481 let minutes: u64 = duration_part[..minutes_pos].parse()?;
482 Ok(Duration::from_secs(minutes * 60))
483 } else if let Some(hours_pos) = duration_part.find('H') {
484 let hours: u64 = duration_part[..hours_pos].parse()?;
485 Ok(Duration::from_secs(hours * 3600))
486 } else {
487 Err(anyhow!("Invalid duration format: {}", s))
488 }
489 }
490
491 fn parse_query_type(&self, query: &str) -> Result<QueryType> {
493 let upper = query.to_uppercase();
494 if upper.contains("SELECT") {
495 Ok(QueryType::Select)
496 } else if upper.contains("CONSTRUCT") {
497 Ok(QueryType::Construct)
498 } else if upper.contains("ASK") {
499 Ok(QueryType::Ask)
500 } else if upper.contains("DESCRIBE") {
501 Ok(QueryType::Describe)
502 } else {
503 Err(anyhow!("Unknown query type"))
504 }
505 }
506
507 async fn evict_expired_triples(&self, window: &mut StreamWindow) -> Result<()> {
509 let now = Utc::now();
510
511 match &window.spec.range {
512 WindowRange::Time(duration) => {
513 let cutoff = now - ChronoDuration::from_std(*duration)?;
515 window.buffer.retain(|t| t.timestamp > cutoff);
516 }
517 WindowRange::Count(max_count) => {
518 while window.buffer.len() > *max_count {
520 window.buffer.pop_front();
521 }
522 }
523 WindowRange::Batch(max_batches) => {
524 if window.buffer.len() > max_batches * 1000 {
526 window.buffer.drain(0..*max_batches * 500);
527 }
528 }
529 }
530
531 Ok(())
532 }
533
534 fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
536 let mut triples = Vec::new();
539
540 match event {
541 StreamEvent::TripleAdded {
542 subject,
543 predicate,
544 object,
545 graph,
546 ..
547 } => {
548 triples.push(Triple {
549 subject: subject.clone(),
550 predicate: predicate.clone(),
551 object: object.clone(),
552 graph: graph.clone(),
553 });
554 }
555 StreamEvent::QuadAdded {
556 subject,
557 predicate,
558 object,
559 graph,
560 ..
561 } => {
562 triples.push(Triple {
563 subject: subject.clone(),
564 predicate: predicate.clone(),
565 object: object.clone(),
566 graph: Some(graph.clone()),
567 });
568 }
569 StreamEvent::SparqlUpdate { query, .. } => {
570 debug!("Extracting triples from SPARQL update: {}", query);
572 }
573 _ => {
574 }
576 }
577
578 Ok(triples)
579 }
580
581 async fn get_window_data_for_query(&self, query: &CSparqlQuery) -> Result<Vec<Triple>> {
583 let mut all_triples = Vec::new();
584
585 let windows = self.windows.read().await;
586 for stream in &query.components.streams {
587 if let Some(window) = windows.get(&stream.uri) {
589 for windowed_triple in &window.buffer {
590 all_triples.push(windowed_triple.triple.clone());
591 }
592 }
593 }
594
595 Ok(all_triples)
596 }
597
598 async fn execute_sparql_on_window(
600 &self,
601 query: &CSparqlQuery,
602 triples: &[Triple],
603 ) -> Result<QueryResult> {
604 debug!(
608 "Executing C-SPARQL query {} on {} triples",
609 query.id,
610 triples.len()
611 );
612
613 Ok(QueryResult {
615 bindings: Vec::new(),
616 })
617 }
618
619 pub async fn get_stats(&self) -> CSparqlStats {
621 self.stats.read().await.clone()
622 }
623
624 pub async fn start_query(&self, query_id: &str) -> Result<()> {
626 let mut queries = self.queries.write().await;
627 if let Some(query) = queries.get_mut(query_id) {
628 query.state = QueryState::Running;
629 info!("Started C-SPARQL query: {}", query_id);
630 Ok(())
631 } else {
632 Err(anyhow!("Query not found: {}", query_id))
633 }
634 }
635
636 pub async fn stop_query(&self, query_id: &str) -> Result<()> {
638 let mut queries = self.queries.write().await;
639 if let Some(query) = queries.get_mut(query_id) {
640 query.state = QueryState::Stopped;
641 info!("Stopped C-SPARQL query: {}", query_id);
642 Ok(())
643 } else {
644 Err(anyhow!("Query not found: {}", query_id))
645 }
646 }
647
648 pub async fn unregister_query(&self, query_id: &str) -> Result<()> {
650 let mut queries = self.queries.write().await;
651 queries
652 .remove(query_id)
653 .ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
654
655 info!("Unregistered C-SPARQL query: {}", query_id);
656 Ok(())
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663
664 #[tokio::test]
665 async fn test_csparql_config_defaults() {
666 let config = CSparqlConfig::default();
667 assert_eq!(config.max_queries, 100);
668 assert!(config.incremental_evaluation);
669 }
670
671 #[tokio::test]
672 async fn test_window_spec_creation() {
673 let window = WindowSpec {
674 window_type: WindowType::Tumbling,
675 range: WindowRange::Time(Duration::from_secs(60)),
676 step: None,
677 };
678
679 assert_eq!(window.window_type, WindowType::Tumbling);
680 matches!(window.range, WindowRange::Time(_));
681 }
682
683 #[tokio::test]
684 async fn test_query_type_parsing() {
685 let query_select = "SELECT * WHERE { ?s ?p ?o }";
686 let query_construct = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }";
687
688 assert!(query_select.to_uppercase().contains("SELECT"));
689 assert!(query_construct.to_uppercase().contains("CONSTRUCT"));
690 }
691
692 #[test]
693 fn test_duration_parsing_standalone() {
694 let parse_duration = |s: &str| -> Result<Duration> {
696 if !s.starts_with("PT") {
697 return Err(anyhow!("Invalid duration format: {}", s));
698 }
699
700 let duration_part = &s[2..];
701
702 if let Some(seconds_pos) = duration_part.find('S') {
703 let seconds: u64 = duration_part[..seconds_pos].parse()?;
704 Ok(Duration::from_secs(seconds))
705 } else if let Some(minutes_pos) = duration_part.find('M') {
706 let minutes: u64 = duration_part[..minutes_pos].parse()?;
707 Ok(Duration::from_secs(minutes * 60))
708 } else if let Some(hours_pos) = duration_part.find('H') {
709 let hours: u64 = duration_part[..hours_pos].parse()?;
710 Ok(Duration::from_secs(hours * 3600))
711 } else {
712 Err(anyhow!("Invalid duration format: {}", s))
713 }
714 };
715
716 let duration = parse_duration("PT10S").unwrap();
717 assert_eq!(duration, Duration::from_secs(10));
718
719 let duration = parse_duration("PT5M").unwrap();
720 assert_eq!(duration, Duration::from_secs(300));
721
722 let duration = parse_duration("PT1H").unwrap();
723 assert_eq!(duration, Duration::from_secs(3600));
724 }
725
726 #[tokio::test]
727 async fn test_csparql_stats() {
728 let stats = CSparqlStats {
729 queries_registered: 5,
730 queries_executed: 100,
731 queries_failed: 2,
732 total_events_processed: 1000,
733 total_results_produced: 500,
734 avg_query_latency_ms: 15.5,
735 active_windows: 3,
736 };
737
738 assert_eq!(stats.queries_registered, 5);
739 assert_eq!(stats.total_events_processed, 1000);
740 }
741}