1use anyhow::{anyhow, Result};
17use chrono::{DateTime, Duration as ChronoDuration, Utc};
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::sync::RwLock;
22use tracing::{debug, info};
23
24use crate::store_integration::{QueryResult, RdfStore, Triple};
25use crate::StreamEvent;
26
27pub struct CqelsEngine {
29 store: Arc<dyn RdfStore>,
31 streams: Arc<RwLock<HashMap<String, CqelsStream>>>,
33 queries: Arc<RwLock<HashMap<String, CqelsQuery>>>,
35 plans: Arc<RwLock<HashMap<String, ExecutionPlan>>>,
37 config: CqelsConfig,
39 stats: Arc<RwLock<CqelsStats>>,
41}
42
43#[derive(Debug, Clone)]
45pub struct CqelsConfig {
46 pub max_queries: usize,
48 pub incremental_evaluation: bool,
50 pub adaptive_optimization: bool,
52 pub window_buffer_size: usize,
54 pub join_buffer_size: usize,
56 pub physical_timestamps: bool,
58}
59
60impl Default for CqelsConfig {
61 fn default() -> Self {
62 Self {
63 max_queries: 100,
64 incremental_evaluation: true,
65 adaptive_optimization: true,
66 window_buffer_size: 10000,
67 join_buffer_size: 10000,
68 physical_timestamps: true,
69 }
70 }
71}
72
73pub struct CqelsStream {
75 pub id: String,
77 pub uri: String,
79 pub buffer: VecDeque<StreamTriple>,
81 pub schema: Option<StreamSchema>,
83 pub metadata: StreamMetadata,
85}
86
87#[derive(Debug, Clone)]
89pub struct StreamTriple {
90 pub triple: Triple,
91 pub timestamp: DateTime<Utc>,
92 pub sequence_id: u64,
93 pub source_id: String,
94}
95
96#[derive(Debug, Clone)]
98pub struct StreamSchema {
99 pub predicates: HashSet<String>,
100 pub value_types: HashMap<String, ValueType>,
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub enum ValueType {
106 IRI,
107 Literal,
108 Integer,
109 Float,
110 Boolean,
111 DateTime,
112}
113
114#[derive(Debug, Clone)]
116pub struct StreamMetadata {
117 pub created_at: DateTime<Utc>,
118 pub event_count: u64,
119 pub avg_event_rate: f64,
120 pub last_event_time: Option<DateTime<Utc>>,
121}
122
123#[derive(Debug, Clone)]
125pub struct CqelsQuery {
126 pub id: String,
128 pub query_string: String,
130 pub operators: Vec<CqelsOperator>,
132 pub metadata: QueryMetadata,
134 pub state: ExecutionState,
136}
137
138#[derive(Debug, Clone)]
140pub enum CqelsOperator {
141 StreamScan {
143 stream_uri: String,
144 window: WindowDefinition,
145 },
146 StaticScan {
148 graph_uri: Option<String>,
149 pattern: TriplePattern,
150 },
151 StreamJoin {
153 left: Box<CqelsOperator>,
154 right: Box<CqelsOperator>,
155 condition: JoinCondition,
156 },
157 HybridJoin {
159 stream_op: Box<CqelsOperator>,
160 static_op: Box<CqelsOperator>,
161 condition: JoinCondition,
162 },
163 Filter {
165 input: Box<CqelsOperator>,
166 condition: FilterCondition,
167 },
168 Project {
170 input: Box<CqelsOperator>,
171 variables: Vec<String>,
172 },
173 Aggregate {
175 input: Box<CqelsOperator>,
176 functions: Vec<AggregateFunction>,
177 group_by: Vec<String>,
178 },
179}
180
181#[derive(Debug, Clone)]
183pub struct WindowDefinition {
184 pub window_type: CqelsWindowType,
186 pub time_range: Option<Duration>,
188 pub triple_count: Option<usize>,
190 pub slide: Option<Duration>,
192}
193
194#[derive(Debug, Clone, PartialEq)]
196pub enum CqelsWindowType {
197 TimeTumbling,
199 TimeSliding,
201 CountTumbling,
203 CountSliding,
205 Now,
207}
208
209#[derive(Debug, Clone)]
211pub struct TriplePattern {
212 pub subject: PatternNode,
213 pub predicate: PatternNode,
214 pub object: PatternNode,
215}
216
217#[derive(Debug, Clone)]
219pub enum PatternNode {
220 Variable(String),
221 IRI(String),
222 Literal(String),
223 Blank(String),
224}
225
226#[derive(Debug, Clone)]
228pub struct JoinCondition {
229 pub left_var: String,
230 pub right_var: String,
231 pub join_type: JoinType,
232}
233
234#[derive(Debug, Clone, PartialEq)]
236pub enum JoinType {
237 Inner,
238 LeftOuter,
239 RightOuter,
240 FullOuter,
241}
242
243#[derive(Debug, Clone)]
245pub enum FilterCondition {
246 Equals { var: String, value: String },
247 NotEquals { var: String, value: String },
248 LessThan { var: String, value: String },
249 GreaterThan { var: String, value: String },
250 Regex { var: String, pattern: String },
251 And(Box<FilterCondition>, Box<FilterCondition>),
252 Or(Box<FilterCondition>, Box<FilterCondition>),
253 Not(Box<FilterCondition>),
254}
255
256#[derive(Debug, Clone)]
258pub struct AggregateFunction {
259 pub function: AggregateFunctionType,
260 pub variable: String,
261 pub alias: String,
262}
263
264#[derive(Debug, Clone, PartialEq)]
266pub enum AggregateFunctionType {
267 Count,
268 Sum,
269 Avg,
270 Min,
271 Max,
272}
273
274#[derive(Debug, Clone)]
276pub struct QueryMetadata {
277 pub name: Option<String>,
278 pub description: Option<String>,
279 pub created_at: DateTime<Utc>,
280 pub owner: Option<String>,
281}
282
283#[derive(Debug, Clone, PartialEq)]
285pub enum ExecutionState {
286 Idle,
287 Running,
288 Paused,
289 Completed,
290 Failed(String),
291}
292
293#[derive(Debug, Clone)]
295pub struct ExecutionPlan {
296 pub id: String,
298 pub root: CqelsOperator,
300 pub stats: HashMap<String, OperatorStats>,
302 pub hints: Vec<OptimizationHint>,
304}
305
306#[derive(Debug, Clone, Default)]
308pub struct OperatorStats {
309 pub input_tuples: u64,
310 pub output_tuples: u64,
311 pub execution_time_ms: f64,
312 pub memory_usage_bytes: usize,
313}
314
315#[derive(Debug, Clone)]
317pub enum OptimizationHint {
318 PushDownFilter,
319 MaterializeJoin,
320 UseIndex(String),
321 ParallelExecution,
322}
323
324#[derive(Debug, Clone, Default)]
326pub struct CqelsStats {
327 pub queries_registered: u64,
328 pub queries_executed: u64,
329 pub total_stream_triples: u64,
330 pub total_static_triples: u64,
331 pub total_joins_performed: u64,
332 pub avg_query_latency_ms: f64,
333 pub memory_usage_bytes: usize,
334}
335
336impl CqelsEngine {
337 pub fn new(store: Arc<dyn RdfStore>, config: CqelsConfig) -> Self {
339 Self {
340 store,
341 streams: Arc::new(RwLock::new(HashMap::new())),
342 queries: Arc::new(RwLock::new(HashMap::new())),
343 plans: Arc::new(RwLock::new(HashMap::new())),
344 config,
345 stats: Arc::new(RwLock::new(CqelsStats::default())),
346 }
347 }
348
349 pub async fn register_stream(&self, uri: String) -> Result<String> {
351 let stream_id = uuid::Uuid::new_v4().to_string();
352
353 let stream = CqelsStream {
354 id: stream_id.clone(),
355 uri,
356 buffer: VecDeque::with_capacity(self.config.window_buffer_size),
357 schema: None,
358 metadata: StreamMetadata {
359 created_at: Utc::now(),
360 event_count: 0,
361 avg_event_rate: 0.0,
362 last_event_time: None,
363 },
364 };
365
366 let mut streams = self.streams.write().await;
367 streams.insert(stream_id.clone(), stream);
368
369 info!("Registered CQELS stream: {}", stream_id);
370 Ok(stream_id)
371 }
372
373 pub async fn register_query(&self, query_string: String) -> Result<String> {
375 let query_id = uuid::Uuid::new_v4().to_string();
376
377 let operators = self.parse_cqels_query(&query_string)?;
379
380 let query = CqelsQuery {
381 id: query_id.clone(),
382 query_string,
383 operators,
384 metadata: QueryMetadata {
385 name: None,
386 description: None,
387 created_at: Utc::now(),
388 owner: None,
389 },
390 state: ExecutionState::Idle,
391 };
392
393 let plan = self.create_execution_plan(&query)?;
395
396 let mut queries = self.queries.write().await;
397 if queries.len() >= self.config.max_queries {
398 return Err(anyhow!("Maximum number of queries reached"));
399 }
400 queries.insert(query_id.clone(), query);
401
402 let mut plans = self.plans.write().await;
403 plans.insert(query_id.clone(), plan);
404
405 let mut stats = self.stats.write().await;
406 stats.queries_registered += 1;
407
408 info!("Registered CQELS query: {}", query_id);
409 Ok(query_id)
410 }
411
412 pub async fn process_event(&self, stream_uri: &str, event: &StreamEvent) -> Result<()> {
414 let triples = self.extract_triples_from_event(event)?;
415
416 let mut streams = self.streams.write().await;
417 let stream = streams
418 .values_mut()
419 .find(|s| s.uri == stream_uri)
420 .ok_or_else(|| anyhow!("Stream not found: {}", stream_uri))?;
421
422 for triple in &triples {
423 let stream_triple = StreamTriple {
424 triple: triple.clone(),
425 timestamp: Utc::now(),
426 sequence_id: stream.metadata.event_count,
427 source_id: stream.id.clone(),
428 };
429
430 stream.buffer.push_back(stream_triple);
431 stream.metadata.event_count += 1;
432 stream.metadata.last_event_time = Some(Utc::now());
433
434 if stream.buffer.len() > self.config.window_buffer_size {
436 stream.buffer.pop_front();
437 }
438 }
439
440 let mut stats = self.stats.write().await;
441 stats.total_stream_triples += triples.len() as u64;
442
443 Ok(())
444 }
445
446 pub async fn execute_query(&self, query_id: &str) -> Result<QueryResult> {
448 let queries = self.queries.read().await;
449 let _query = queries
450 .get(query_id)
451 .ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
452
453 let plans = self.plans.read().await;
454 let plan = plans
455 .get(query_id)
456 .ok_or_else(|| anyhow!("Execution plan not found: {}", query_id))?;
457
458 let result = self.execute_plan(plan).await?;
460
461 let mut stats = self.stats.write().await;
462 stats.queries_executed += 1;
463
464 Ok(result)
465 }
466
467 fn parse_cqels_query(&self, query: &str) -> Result<Vec<CqelsOperator>> {
469 let mut operators = Vec::new();
471
472 if query.to_uppercase().contains("SELECT") {
474 let stream_scan = CqelsOperator::StreamScan {
476 stream_uri: "http://example.org/stream".to_string(),
477 window: WindowDefinition {
478 window_type: CqelsWindowType::TimeSliding,
479 time_range: Some(Duration::from_secs(60)),
480 triple_count: None,
481 slide: Some(Duration::from_secs(10)),
482 },
483 };
484 operators.push(stream_scan);
485 }
486
487 Ok(operators)
488 }
489
490 fn create_execution_plan(&self, query: &CqelsQuery) -> Result<ExecutionPlan> {
492 let plan_id = uuid::Uuid::new_v4().to_string();
493
494 let root = query
496 .operators
497 .first()
498 .cloned()
499 .ok_or_else(|| anyhow!("No operators in query"))?;
500
501 let plan = ExecutionPlan {
502 id: plan_id,
503 root,
504 stats: HashMap::new(),
505 hints: Vec::new(),
506 };
507
508 if self.config.adaptive_optimization {
510 self.optimize_plan(&plan)
511 } else {
512 Ok(plan)
513 }
514 }
515
516 fn optimize_plan(&self, plan: &ExecutionPlan) -> Result<ExecutionPlan> {
518 let mut optimized = plan.clone();
519
520 optimized.hints.push(OptimizationHint::PushDownFilter);
522
523 debug!("Optimized execution plan: {}", optimized.id);
524 Ok(optimized)
525 }
526
527 async fn execute_plan(&self, plan: &ExecutionPlan) -> Result<QueryResult> {
529 debug!("Executing plan: {}", plan.id);
530
531 self.execute_operator(&plan.root).await
533 }
534
535 fn execute_operator<'a>(
537 &'a self,
538 operator: &'a CqelsOperator,
539 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueryResult>> + 'a>> {
540 Box::pin(async move { self.execute_operator_impl(operator).await })
541 }
542
543 async fn execute_operator_impl(&self, operator: &CqelsOperator) -> Result<QueryResult> {
545 match operator {
546 CqelsOperator::StreamScan { stream_uri, window } => {
547 self.execute_stream_scan(stream_uri, window).await
548 }
549 CqelsOperator::StaticScan { graph_uri, pattern } => {
550 self.execute_static_scan(graph_uri.as_deref(), pattern)
551 .await
552 }
553 CqelsOperator::StreamJoin {
554 left,
555 right,
556 condition,
557 } => self.execute_stream_join(left, right, condition).await,
558 CqelsOperator::HybridJoin {
559 stream_op,
560 static_op,
561 condition,
562 } => {
563 self.execute_hybrid_join(stream_op, static_op, condition)
564 .await
565 }
566 CqelsOperator::Filter { input, condition } => {
567 self.execute_filter(input, condition).await
568 }
569 CqelsOperator::Project { input, variables } => {
570 self.execute_project(input, variables).await
571 }
572 CqelsOperator::Aggregate {
573 input,
574 functions,
575 group_by,
576 } => self.execute_aggregate(input, functions, group_by).await,
577 }
578 }
579
580 async fn execute_stream_scan(
582 &self,
583 stream_uri: &str,
584 window: &WindowDefinition,
585 ) -> Result<QueryResult> {
586 let streams = self.streams.read().await;
587 let stream = streams
588 .values()
589 .find(|s| s.uri == stream_uri)
590 .ok_or_else(|| anyhow!("Stream not found: {}", stream_uri))?;
591
592 let triples = self.apply_window(&stream.buffer, window)?;
594
595 debug!("Stream scan returned {} triples", triples.len());
596
597 Ok(QueryResult {
598 bindings: Vec::new(),
599 })
600 }
601
602 async fn execute_static_scan(
604 &self,
605 _graph_uri: Option<&str>,
606 _pattern: &TriplePattern,
607 ) -> Result<QueryResult> {
608 debug!("Executing static scan");
610
611 Ok(QueryResult {
612 bindings: Vec::new(),
613 })
614 }
615
616 async fn execute_stream_join(
618 &self,
619 _left: &CqelsOperator,
620 _right: &CqelsOperator,
621 _condition: &JoinCondition,
622 ) -> Result<QueryResult> {
623 let left_result = QueryResult {
626 bindings: Vec::new(),
627 };
628 let right_result = QueryResult {
629 bindings: Vec::new(),
630 };
631
632 debug!(
633 "Stream join: {} x {} bindings",
634 left_result.bindings.len(),
635 right_result.bindings.len()
636 );
637
638 let mut stats = self.stats.write().await;
639 stats.total_joins_performed += 1;
640
641 Ok(QueryResult {
642 bindings: Vec::new(),
643 })
644 }
645
646 async fn execute_hybrid_join(
648 &self,
649 _stream_op: &CqelsOperator,
650 _static_op: &CqelsOperator,
651 _condition: &JoinCondition,
652 ) -> Result<QueryResult> {
653 let stream_result = QueryResult {
656 bindings: Vec::new(),
657 };
658 let static_result = QueryResult {
659 bindings: Vec::new(),
660 };
661
662 debug!(
663 "Hybrid join: {} stream x {} static bindings",
664 stream_result.bindings.len(),
665 static_result.bindings.len()
666 );
667
668 Ok(QueryResult {
669 bindings: Vec::new(),
670 })
671 }
672
673 async fn execute_filter(
675 &self,
676 _input: &CqelsOperator,
677 _condition: &FilterCondition,
678 ) -> Result<QueryResult> {
679 let input_result = QueryResult {
681 bindings: Vec::new(),
682 };
683
684 debug!("Filter applied to {} bindings", input_result.bindings.len());
685
686 Ok(input_result)
687 }
688
689 async fn execute_project(
691 &self,
692 _input: &CqelsOperator,
693 _variables: &[String],
694 ) -> Result<QueryResult> {
695 let input_result = QueryResult {
697 bindings: Vec::new(),
698 };
699
700 debug!(
701 "Project applied to {} bindings",
702 input_result.bindings.len()
703 );
704
705 Ok(input_result)
706 }
707
708 async fn execute_aggregate(
710 &self,
711 _input: &CqelsOperator,
712 _functions: &[AggregateFunction],
713 _group_by: &[String],
714 ) -> Result<QueryResult> {
715 let input_result = QueryResult {
717 bindings: Vec::new(),
718 };
719
720 debug!(
721 "Aggregate applied to {} bindings",
722 input_result.bindings.len()
723 );
724
725 Ok(QueryResult {
726 bindings: Vec::new(),
727 })
728 }
729
730 fn apply_window(
732 &self,
733 buffer: &VecDeque<StreamTriple>,
734 window: &WindowDefinition,
735 ) -> Result<Vec<Triple>> {
736 let now = Utc::now();
737 let mut triples = Vec::new();
738
739 match window.window_type {
740 CqelsWindowType::TimeSliding | CqelsWindowType::TimeTumbling => {
741 if let Some(time_range) = window.time_range {
742 let cutoff = now - ChronoDuration::from_std(time_range)?;
743 for stream_triple in buffer {
744 if stream_triple.timestamp > cutoff {
745 triples.push(stream_triple.triple.clone());
746 }
747 }
748 }
749 }
750 CqelsWindowType::CountSliding | CqelsWindowType::CountTumbling => {
751 if let Some(count) = window.triple_count {
752 triples.extend(buffer.iter().rev().take(count).map(|st| st.triple.clone()));
753 }
754 }
755 CqelsWindowType::Now => {
756 triples.extend(buffer.iter().map(|st| st.triple.clone()));
758 }
759 }
760
761 Ok(triples)
762 }
763
764 fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
766 let mut triples = Vec::new();
767
768 match event {
769 StreamEvent::TripleAdded {
770 subject,
771 predicate,
772 object,
773 graph,
774 ..
775 } => {
776 triples.push(Triple {
777 subject: subject.clone(),
778 predicate: predicate.clone(),
779 object: object.clone(),
780 graph: graph.clone(),
781 });
782 }
783 StreamEvent::QuadAdded {
784 subject,
785 predicate,
786 object,
787 graph,
788 ..
789 } => {
790 triples.push(Triple {
791 subject: subject.clone(),
792 predicate: predicate.clone(),
793 object: object.clone(),
794 graph: Some(graph.clone()),
795 });
796 }
797 _ => {}
798 }
799
800 Ok(triples)
801 }
802
803 pub async fn get_stats(&self) -> CqelsStats {
805 self.stats.read().await.clone()
806 }
807
808 pub async fn start_query(&self, query_id: &str) -> Result<()> {
810 let mut queries = self.queries.write().await;
811 if let Some(query) = queries.get_mut(query_id) {
812 query.state = ExecutionState::Running;
813 info!("Started CQELS query: {}", query_id);
814 Ok(())
815 } else {
816 Err(anyhow!("Query not found: {}", query_id))
817 }
818 }
819
820 pub async fn stop_query(&self, query_id: &str) -> Result<()> {
822 let mut queries = self.queries.write().await;
823 if let Some(query) = queries.get_mut(query_id) {
824 query.state = ExecutionState::Completed;
825 info!("Stopped CQELS query: {}", query_id);
826 Ok(())
827 } else {
828 Err(anyhow!("Query not found: {}", query_id))
829 }
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836
837 #[tokio::test]
838 async fn test_cqels_config_defaults() {
839 let config = CqelsConfig::default();
840 assert_eq!(config.max_queries, 100);
841 assert!(config.incremental_evaluation);
842 assert!(config.adaptive_optimization);
843 }
844
845 #[tokio::test]
846 async fn test_window_definition() {
847 let window = WindowDefinition {
848 window_type: CqelsWindowType::TimeSliding,
849 time_range: Some(Duration::from_secs(60)),
850 triple_count: None,
851 slide: Some(Duration::from_secs(10)),
852 };
853
854 assert_eq!(window.window_type, CqelsWindowType::TimeSliding);
855 assert!(window.time_range.is_some());
856 }
857
858 #[tokio::test]
859 async fn test_execution_state() {
860 let state = ExecutionState::Idle;
861 assert_eq!(state, ExecutionState::Idle);
862
863 let state = ExecutionState::Running;
864 assert_eq!(state, ExecutionState::Running);
865 }
866
867 #[tokio::test]
868 async fn test_cqels_stats() {
869 let stats = CqelsStats {
870 queries_registered: 10,
871 queries_executed: 50,
872 total_stream_triples: 10000,
873 total_static_triples: 5000,
874 total_joins_performed: 25,
875 avg_query_latency_ms: 12.5,
876 memory_usage_bytes: 1024 * 1024,
877 };
878
879 assert_eq!(stats.queries_registered, 10);
880 assert_eq!(stats.total_stream_triples, 10000);
881 }
882}