Skip to main content

oxirs_stream/
cqels.rs

1//! # CQELS (Continuous Query Evaluation over Linked Streams)
2//!
3//! Implementation of CQELS for native continuous query evaluation over RDF streams.
4//!
5//! CQELS features:
6//! - Native streaming operators
7//! - Continuous incremental evaluation
8//! - Physical vs. logical time windows
9//! - Stream-stream and stream-static joins
10//! - Efficient memory management
11//!
12//! ## References
13//! - Le-Phuoc et al. "A Native and Adaptive Approach for Unified Processing of Linked Streams and Linked Data"
14//! - <https://github.com/cqels/cqels>
15
16use anyhow::{anyhow, Result};
17use chrono::{DateTime, Duration as ChronoDuration, Utc};
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::sync::RwLock;
22use tracing::{debug, info};
23
24use crate::store_integration::{QueryResult, RdfStore, Triple};
25use crate::StreamEvent;
26
27/// CQELS query engine for continuous evaluation
28pub struct CqelsEngine {
29    /// RDF store for static data
30    store: Arc<dyn RdfStore>,
31    /// Active streams
32    streams: Arc<RwLock<HashMap<String, CqelsStream>>>,
33    /// Registered queries
34    queries: Arc<RwLock<HashMap<String, CqelsQuery>>>,
35    /// Query execution plans
36    plans: Arc<RwLock<HashMap<String, ExecutionPlan>>>,
37    /// Configuration
38    config: CqelsConfig,
39    /// Statistics
40    stats: Arc<RwLock<CqelsStats>>,
41}
42
43/// CQELS configuration
44#[derive(Debug, Clone)]
45pub struct CqelsConfig {
46    /// Maximum concurrent queries
47    pub max_queries: usize,
48    /// Enable incremental evaluation
49    pub incremental_evaluation: bool,
50    /// Enable adaptive optimization
51    pub adaptive_optimization: bool,
52    /// Window buffer size
53    pub window_buffer_size: usize,
54    /// Join buffer size
55    pub join_buffer_size: usize,
56    /// Enable physical timestamps
57    pub physical_timestamps: bool,
58}
59
60impl Default for CqelsConfig {
61    fn default() -> Self {
62        Self {
63            max_queries: 100,
64            incremental_evaluation: true,
65            adaptive_optimization: true,
66            window_buffer_size: 10000,
67            join_buffer_size: 10000,
68            physical_timestamps: true,
69        }
70    }
71}
72
73/// CQELS stream representation
74pub struct CqelsStream {
75    /// Stream identifier
76    pub id: String,
77    /// Stream URI
78    pub uri: String,
79    /// Event buffer
80    pub buffer: VecDeque<StreamTriple>,
81    /// Schema information
82    pub schema: Option<StreamSchema>,
83    /// Stream metadata
84    pub metadata: StreamMetadata,
85}
86
87/// Triple with stream metadata
88#[derive(Debug, Clone)]
89pub struct StreamTriple {
90    pub triple: Triple,
91    pub timestamp: DateTime<Utc>,
92    pub sequence_id: u64,
93    pub source_id: String,
94}
95
96/// Stream schema
97#[derive(Debug, Clone)]
98pub struct StreamSchema {
99    pub predicates: HashSet<String>,
100    pub value_types: HashMap<String, ValueType>,
101}
102
103/// Value type enumeration
104#[derive(Debug, Clone, PartialEq)]
105pub enum ValueType {
106    IRI,
107    Literal,
108    Integer,
109    Float,
110    Boolean,
111    DateTime,
112}
113
114/// Stream metadata
115#[derive(Debug, Clone)]
116pub struct StreamMetadata {
117    pub created_at: DateTime<Utc>,
118    pub event_count: u64,
119    pub avg_event_rate: f64,
120    pub last_event_time: Option<DateTime<Utc>>,
121}
122
123/// CQELS query
124#[derive(Debug, Clone)]
125pub struct CqelsQuery {
126    /// Query identifier
127    pub id: String,
128    /// Query string
129    pub query_string: String,
130    /// Parsed operators
131    pub operators: Vec<CqelsOperator>,
132    /// Query metadata
133    pub metadata: QueryMetadata,
134    /// Execution state
135    pub state: ExecutionState,
136}
137
138/// CQELS operators
139#[derive(Debug, Clone)]
140pub enum CqelsOperator {
141    /// Stream scan
142    StreamScan {
143        stream_uri: String,
144        window: WindowDefinition,
145    },
146    /// Static data scan
147    StaticScan {
148        graph_uri: Option<String>,
149        pattern: TriplePattern,
150    },
151    /// Stream-Stream join
152    StreamJoin {
153        left: Box<CqelsOperator>,
154        right: Box<CqelsOperator>,
155        condition: JoinCondition,
156    },
157    /// Stream-Static join
158    HybridJoin {
159        stream_op: Box<CqelsOperator>,
160        static_op: Box<CqelsOperator>,
161        condition: JoinCondition,
162    },
163    /// Filter operator
164    Filter {
165        input: Box<CqelsOperator>,
166        condition: FilterCondition,
167    },
168    /// Projection
169    Project {
170        input: Box<CqelsOperator>,
171        variables: Vec<String>,
172    },
173    /// Aggregation
174    Aggregate {
175        input: Box<CqelsOperator>,
176        functions: Vec<AggregateFunction>,
177        group_by: Vec<String>,
178    },
179}
180
181/// Window definition for CQELS
182#[derive(Debug, Clone)]
183pub struct WindowDefinition {
184    /// Window type
185    pub window_type: CqelsWindowType,
186    /// Time-based window size
187    pub time_range: Option<Duration>,
188    /// Count-based window size
189    pub triple_count: Option<usize>,
190    /// Window slide interval
191    pub slide: Option<Duration>,
192}
193
194/// CQELS window types
195#[derive(Debug, Clone, PartialEq)]
196pub enum CqelsWindowType {
197    /// Time-based tumbling window
198    TimeTumbling,
199    /// Time-based sliding window
200    TimeSliding,
201    /// Count-based tumbling window
202    CountTumbling,
203    /// Count-based sliding window
204    CountSliding,
205    /// Now window (current snapshot)
206    Now,
207}
208
209/// Triple pattern
210#[derive(Debug, Clone)]
211pub struct TriplePattern {
212    pub subject: PatternNode,
213    pub predicate: PatternNode,
214    pub object: PatternNode,
215}
216
217/// Pattern node
218#[derive(Debug, Clone)]
219pub enum PatternNode {
220    Variable(String),
221    IRI(String),
222    Literal(String),
223    Blank(String),
224}
225
226/// Join condition
227#[derive(Debug, Clone)]
228pub struct JoinCondition {
229    pub left_var: String,
230    pub right_var: String,
231    pub join_type: JoinType,
232}
233
234/// Join types
235#[derive(Debug, Clone, PartialEq)]
236pub enum JoinType {
237    Inner,
238    LeftOuter,
239    RightOuter,
240    FullOuter,
241}
242
243/// Filter condition
244#[derive(Debug, Clone)]
245pub enum FilterCondition {
246    Equals { var: String, value: String },
247    NotEquals { var: String, value: String },
248    LessThan { var: String, value: String },
249    GreaterThan { var: String, value: String },
250    Regex { var: String, pattern: String },
251    And(Box<FilterCondition>, Box<FilterCondition>),
252    Or(Box<FilterCondition>, Box<FilterCondition>),
253    Not(Box<FilterCondition>),
254}
255
256/// Aggregate function
257#[derive(Debug, Clone)]
258pub struct AggregateFunction {
259    pub function: AggregateFunctionType,
260    pub variable: String,
261    pub alias: String,
262}
263
264/// Aggregate function types
265#[derive(Debug, Clone, PartialEq)]
266pub enum AggregateFunctionType {
267    Count,
268    Sum,
269    Avg,
270    Min,
271    Max,
272}
273
274/// Query metadata
275#[derive(Debug, Clone)]
276pub struct QueryMetadata {
277    pub name: Option<String>,
278    pub description: Option<String>,
279    pub created_at: DateTime<Utc>,
280    pub owner: Option<String>,
281}
282
283/// Execution state
284#[derive(Debug, Clone, PartialEq)]
285pub enum ExecutionState {
286    Idle,
287    Running,
288    Paused,
289    Completed,
290    Failed(String),
291}
292
293/// Query execution plan
294#[derive(Debug, Clone)]
295pub struct ExecutionPlan {
296    /// Plan identifier
297    pub id: String,
298    /// Root operator
299    pub root: CqelsOperator,
300    /// Operator statistics
301    pub stats: HashMap<String, OperatorStats>,
302    /// Optimization hints
303    pub hints: Vec<OptimizationHint>,
304}
305
306/// Operator statistics
307#[derive(Debug, Clone, Default)]
308pub struct OperatorStats {
309    pub input_tuples: u64,
310    pub output_tuples: u64,
311    pub execution_time_ms: f64,
312    pub memory_usage_bytes: usize,
313}
314
315/// Optimization hints
316#[derive(Debug, Clone)]
317pub enum OptimizationHint {
318    PushDownFilter,
319    MaterializeJoin,
320    UseIndex(String),
321    ParallelExecution,
322}
323
324/// CQELS statistics
325#[derive(Debug, Clone, Default)]
326pub struct CqelsStats {
327    pub queries_registered: u64,
328    pub queries_executed: u64,
329    pub total_stream_triples: u64,
330    pub total_static_triples: u64,
331    pub total_joins_performed: u64,
332    pub avg_query_latency_ms: f64,
333    pub memory_usage_bytes: usize,
334}
335
336impl CqelsEngine {
337    /// Create a new CQELS engine
338    pub fn new(store: Arc<dyn RdfStore>, config: CqelsConfig) -> Self {
339        Self {
340            store,
341            streams: Arc::new(RwLock::new(HashMap::new())),
342            queries: Arc::new(RwLock::new(HashMap::new())),
343            plans: Arc::new(RwLock::new(HashMap::new())),
344            config,
345            stats: Arc::new(RwLock::new(CqelsStats::default())),
346        }
347    }
348
349    /// Register a new stream
350    pub async fn register_stream(&self, uri: String) -> Result<String> {
351        let stream_id = uuid::Uuid::new_v4().to_string();
352
353        let stream = CqelsStream {
354            id: stream_id.clone(),
355            uri,
356            buffer: VecDeque::with_capacity(self.config.window_buffer_size),
357            schema: None,
358            metadata: StreamMetadata {
359                created_at: Utc::now(),
360                event_count: 0,
361                avg_event_rate: 0.0,
362                last_event_time: None,
363            },
364        };
365
366        let mut streams = self.streams.write().await;
367        streams.insert(stream_id.clone(), stream);
368
369        info!("Registered CQELS stream: {}", stream_id);
370        Ok(stream_id)
371    }
372
373    /// Register a CQELS query
374    pub async fn register_query(&self, query_string: String) -> Result<String> {
375        let query_id = uuid::Uuid::new_v4().to_string();
376
377        // Parse CQELS query
378        let operators = self.parse_cqels_query(&query_string)?;
379
380        let query = CqelsQuery {
381            id: query_id.clone(),
382            query_string,
383            operators,
384            metadata: QueryMetadata {
385                name: None,
386                description: None,
387                created_at: Utc::now(),
388                owner: None,
389            },
390            state: ExecutionState::Idle,
391        };
392
393        // Create execution plan
394        let plan = self.create_execution_plan(&query)?;
395
396        let mut queries = self.queries.write().await;
397        if queries.len() >= self.config.max_queries {
398            return Err(anyhow!("Maximum number of queries reached"));
399        }
400        queries.insert(query_id.clone(), query);
401
402        let mut plans = self.plans.write().await;
403        plans.insert(query_id.clone(), plan);
404
405        let mut stats = self.stats.write().await;
406        stats.queries_registered += 1;
407
408        info!("Registered CQELS query: {}", query_id);
409        Ok(query_id)
410    }
411
412    /// Process a stream event
413    pub async fn process_event(&self, stream_uri: &str, event: &StreamEvent) -> Result<()> {
414        let triples = self.extract_triples_from_event(event)?;
415
416        let mut streams = self.streams.write().await;
417        let stream = streams
418            .values_mut()
419            .find(|s| s.uri == stream_uri)
420            .ok_or_else(|| anyhow!("Stream not found: {}", stream_uri))?;
421
422        for triple in &triples {
423            let stream_triple = StreamTriple {
424                triple: triple.clone(),
425                timestamp: Utc::now(),
426                sequence_id: stream.metadata.event_count,
427                source_id: stream.id.clone(),
428            };
429
430            stream.buffer.push_back(stream_triple);
431            stream.metadata.event_count += 1;
432            stream.metadata.last_event_time = Some(Utc::now());
433
434            // Evict old triples if buffer is full
435            if stream.buffer.len() > self.config.window_buffer_size {
436                stream.buffer.pop_front();
437            }
438        }
439
440        let mut stats = self.stats.write().await;
441        stats.total_stream_triples += triples.len() as u64;
442
443        Ok(())
444    }
445
446    /// Execute a registered query
447    pub async fn execute_query(&self, query_id: &str) -> Result<QueryResult> {
448        let queries = self.queries.read().await;
449        let _query = queries
450            .get(query_id)
451            .ok_or_else(|| anyhow!("Query not found: {}", query_id))?;
452
453        let plans = self.plans.read().await;
454        let plan = plans
455            .get(query_id)
456            .ok_or_else(|| anyhow!("Execution plan not found: {}", query_id))?;
457
458        // Execute the plan
459        let result = self.execute_plan(plan).await?;
460
461        let mut stats = self.stats.write().await;
462        stats.queries_executed += 1;
463
464        Ok(result)
465    }
466
467    /// Parse CQELS query string
468    fn parse_cqels_query(&self, query: &str) -> Result<Vec<CqelsOperator>> {
469        // Simplified parser - in production, use a full CQELS parser
470        let mut operators = Vec::new();
471
472        // Parse SELECT/CONSTRUCT
473        if query.to_uppercase().contains("SELECT") {
474            // Create stream scan operator
475            let stream_scan = CqelsOperator::StreamScan {
476                stream_uri: "http://example.org/stream".to_string(),
477                window: WindowDefinition {
478                    window_type: CqelsWindowType::TimeSliding,
479                    time_range: Some(Duration::from_secs(60)),
480                    triple_count: None,
481                    slide: Some(Duration::from_secs(10)),
482                },
483            };
484            operators.push(stream_scan);
485        }
486
487        Ok(operators)
488    }
489
490    /// Create execution plan for query
491    fn create_execution_plan(&self, query: &CqelsQuery) -> Result<ExecutionPlan> {
492        let plan_id = uuid::Uuid::new_v4().to_string();
493
494        // Create a simple plan with the first operator as root
495        let root = query
496            .operators
497            .first()
498            .cloned()
499            .ok_or_else(|| anyhow!("No operators in query"))?;
500
501        let plan = ExecutionPlan {
502            id: plan_id,
503            root,
504            stats: HashMap::new(),
505            hints: Vec::new(),
506        };
507
508        // Apply optimizations if enabled
509        if self.config.adaptive_optimization {
510            self.optimize_plan(&plan)
511        } else {
512            Ok(plan)
513        }
514    }
515
516    /// Optimize execution plan
517    fn optimize_plan(&self, plan: &ExecutionPlan) -> Result<ExecutionPlan> {
518        let mut optimized = plan.clone();
519
520        // Add optimization hints
521        optimized.hints.push(OptimizationHint::PushDownFilter);
522
523        debug!("Optimized execution plan: {}", optimized.id);
524        Ok(optimized)
525    }
526
527    /// Execute an execution plan
528    async fn execute_plan(&self, plan: &ExecutionPlan) -> Result<QueryResult> {
529        debug!("Executing plan: {}", plan.id);
530
531        // Execute the root operator
532        self.execute_operator(&plan.root).await
533    }
534
535    /// Execute a single operator
536    fn execute_operator<'a>(
537        &'a self,
538        operator: &'a CqelsOperator,
539    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueryResult>> + 'a>> {
540        Box::pin(async move { self.execute_operator_impl(operator).await })
541    }
542
543    /// Execute operator implementation
544    async fn execute_operator_impl(&self, operator: &CqelsOperator) -> Result<QueryResult> {
545        match operator {
546            CqelsOperator::StreamScan { stream_uri, window } => {
547                self.execute_stream_scan(stream_uri, window).await
548            }
549            CqelsOperator::StaticScan { graph_uri, pattern } => {
550                self.execute_static_scan(graph_uri.as_deref(), pattern)
551                    .await
552            }
553            CqelsOperator::StreamJoin {
554                left,
555                right,
556                condition,
557            } => self.execute_stream_join(left, right, condition).await,
558            CqelsOperator::HybridJoin {
559                stream_op,
560                static_op,
561                condition,
562            } => {
563                self.execute_hybrid_join(stream_op, static_op, condition)
564                    .await
565            }
566            CqelsOperator::Filter { input, condition } => {
567                self.execute_filter(input, condition).await
568            }
569            CqelsOperator::Project { input, variables } => {
570                self.execute_project(input, variables).await
571            }
572            CqelsOperator::Aggregate {
573                input,
574                functions,
575                group_by,
576            } => self.execute_aggregate(input, functions, group_by).await,
577        }
578    }
579
580    /// Execute stream scan operator
581    async fn execute_stream_scan(
582        &self,
583        stream_uri: &str,
584        window: &WindowDefinition,
585    ) -> Result<QueryResult> {
586        let streams = self.streams.read().await;
587        let stream = streams
588            .values()
589            .find(|s| s.uri == stream_uri)
590            .ok_or_else(|| anyhow!("Stream not found: {}", stream_uri))?;
591
592        // Apply window to get relevant triples
593        let triples = self.apply_window(&stream.buffer, window)?;
594
595        debug!("Stream scan returned {} triples", triples.len());
596
597        Ok(QueryResult {
598            bindings: Vec::new(),
599        })
600    }
601
602    /// Execute static scan operator
603    async fn execute_static_scan(
604        &self,
605        _graph_uri: Option<&str>,
606        _pattern: &TriplePattern,
607    ) -> Result<QueryResult> {
608        // Query static RDF store
609        debug!("Executing static scan");
610
611        Ok(QueryResult {
612            bindings: Vec::new(),
613        })
614    }
615
616    /// Execute stream-stream join
617    async fn execute_stream_join(
618        &self,
619        _left: &CqelsOperator,
620        _right: &CqelsOperator,
621        _condition: &JoinCondition,
622    ) -> Result<QueryResult> {
623        // Simplified non-recursive implementation for foundational version
624        // In production, implement proper recursive execution with Box::pin
625        let left_result = QueryResult {
626            bindings: Vec::new(),
627        };
628        let right_result = QueryResult {
629            bindings: Vec::new(),
630        };
631
632        debug!(
633            "Stream join: {} x {} bindings",
634            left_result.bindings.len(),
635            right_result.bindings.len()
636        );
637
638        let mut stats = self.stats.write().await;
639        stats.total_joins_performed += 1;
640
641        Ok(QueryResult {
642            bindings: Vec::new(),
643        })
644    }
645
646    /// Execute hybrid join (stream-static)
647    async fn execute_hybrid_join(
648        &self,
649        _stream_op: &CqelsOperator,
650        _static_op: &CqelsOperator,
651        _condition: &JoinCondition,
652    ) -> Result<QueryResult> {
653        // Simplified non-recursive implementation for foundational version
654        // In production, implement proper recursive execution with Box::pin
655        let stream_result = QueryResult {
656            bindings: Vec::new(),
657        };
658        let static_result = QueryResult {
659            bindings: Vec::new(),
660        };
661
662        debug!(
663            "Hybrid join: {} stream x {} static bindings",
664            stream_result.bindings.len(),
665            static_result.bindings.len()
666        );
667
668        Ok(QueryResult {
669            bindings: Vec::new(),
670        })
671    }
672
673    /// Execute filter operator
674    async fn execute_filter(
675        &self,
676        _input: &CqelsOperator,
677        _condition: &FilterCondition,
678    ) -> Result<QueryResult> {
679        // Simplified non-recursive implementation for foundational version
680        let input_result = QueryResult {
681            bindings: Vec::new(),
682        };
683
684        debug!("Filter applied to {} bindings", input_result.bindings.len());
685
686        Ok(input_result)
687    }
688
689    /// Execute project operator
690    async fn execute_project(
691        &self,
692        _input: &CqelsOperator,
693        _variables: &[String],
694    ) -> Result<QueryResult> {
695        // Simplified non-recursive implementation for foundational version
696        let input_result = QueryResult {
697            bindings: Vec::new(),
698        };
699
700        debug!(
701            "Project applied to {} bindings",
702            input_result.bindings.len()
703        );
704
705        Ok(input_result)
706    }
707
708    /// Execute aggregate operator
709    async fn execute_aggregate(
710        &self,
711        _input: &CqelsOperator,
712        _functions: &[AggregateFunction],
713        _group_by: &[String],
714    ) -> Result<QueryResult> {
715        // Simplified non-recursive implementation for foundational version
716        let input_result = QueryResult {
717            bindings: Vec::new(),
718        };
719
720        debug!(
721            "Aggregate applied to {} bindings",
722            input_result.bindings.len()
723        );
724
725        Ok(QueryResult {
726            bindings: Vec::new(),
727        })
728    }
729
730    /// Apply window to stream buffer
731    fn apply_window(
732        &self,
733        buffer: &VecDeque<StreamTriple>,
734        window: &WindowDefinition,
735    ) -> Result<Vec<Triple>> {
736        let now = Utc::now();
737        let mut triples = Vec::new();
738
739        match window.window_type {
740            CqelsWindowType::TimeSliding | CqelsWindowType::TimeTumbling => {
741                if let Some(time_range) = window.time_range {
742                    let cutoff = now - ChronoDuration::from_std(time_range)?;
743                    for stream_triple in buffer {
744                        if stream_triple.timestamp > cutoff {
745                            triples.push(stream_triple.triple.clone());
746                        }
747                    }
748                }
749            }
750            CqelsWindowType::CountSliding | CqelsWindowType::CountTumbling => {
751                if let Some(count) = window.triple_count {
752                    triples.extend(buffer.iter().rev().take(count).map(|st| st.triple.clone()));
753                }
754            }
755            CqelsWindowType::Now => {
756                // Return all current triples
757                triples.extend(buffer.iter().map(|st| st.triple.clone()));
758            }
759        }
760
761        Ok(triples)
762    }
763
764    /// Extract triples from event
765    fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
766        let mut triples = Vec::new();
767
768        match event {
769            StreamEvent::TripleAdded {
770                subject,
771                predicate,
772                object,
773                graph,
774                ..
775            } => {
776                triples.push(Triple {
777                    subject: subject.clone(),
778                    predicate: predicate.clone(),
779                    object: object.clone(),
780                    graph: graph.clone(),
781                });
782            }
783            StreamEvent::QuadAdded {
784                subject,
785                predicate,
786                object,
787                graph,
788                ..
789            } => {
790                triples.push(Triple {
791                    subject: subject.clone(),
792                    predicate: predicate.clone(),
793                    object: object.clone(),
794                    graph: Some(graph.clone()),
795                });
796            }
797            _ => {}
798        }
799
800        Ok(triples)
801    }
802
803    /// Get statistics
804    pub async fn get_stats(&self) -> CqelsStats {
805        self.stats.read().await.clone()
806    }
807
808    /// Start a query
809    pub async fn start_query(&self, query_id: &str) -> Result<()> {
810        let mut queries = self.queries.write().await;
811        if let Some(query) = queries.get_mut(query_id) {
812            query.state = ExecutionState::Running;
813            info!("Started CQELS query: {}", query_id);
814            Ok(())
815        } else {
816            Err(anyhow!("Query not found: {}", query_id))
817        }
818    }
819
820    /// Stop a query
821    pub async fn stop_query(&self, query_id: &str) -> Result<()> {
822        let mut queries = self.queries.write().await;
823        if let Some(query) = queries.get_mut(query_id) {
824            query.state = ExecutionState::Completed;
825            info!("Stopped CQELS query: {}", query_id);
826            Ok(())
827        } else {
828            Err(anyhow!("Query not found: {}", query_id))
829        }
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836
837    #[tokio::test]
838    async fn test_cqels_config_defaults() {
839        let config = CqelsConfig::default();
840        assert_eq!(config.max_queries, 100);
841        assert!(config.incremental_evaluation);
842        assert!(config.adaptive_optimization);
843    }
844
845    #[tokio::test]
846    async fn test_window_definition() {
847        let window = WindowDefinition {
848            window_type: CqelsWindowType::TimeSliding,
849            time_range: Some(Duration::from_secs(60)),
850            triple_count: None,
851            slide: Some(Duration::from_secs(10)),
852        };
853
854        assert_eq!(window.window_type, CqelsWindowType::TimeSliding);
855        assert!(window.time_range.is_some());
856    }
857
858    #[tokio::test]
859    async fn test_execution_state() {
860        let state = ExecutionState::Idle;
861        assert_eq!(state, ExecutionState::Idle);
862
863        let state = ExecutionState::Running;
864        assert_eq!(state, ExecutionState::Running);
865    }
866
867    #[tokio::test]
868    async fn test_cqels_stats() {
869        let stats = CqelsStats {
870            queries_registered: 10,
871            queries_executed: 50,
872            total_stream_triples: 10000,
873            total_static_triples: 5000,
874            total_joins_performed: 25,
875            avg_query_latency_ms: 12.5,
876            memory_usage_bytes: 1024 * 1024,
877        };
878
879        assert_eq!(stats.queries_registered, 10);
880        assert_eq!(stats.total_stream_triples, 10000);
881    }
882}