Skip to main content

oxirs_stream/
sparql_streaming.rs

1//! # SPARQL Streaming Module
2//!
3//! This module provides continuous SPARQL query support with:
4//! - Query registration and lifecycle management
5//! - Result streaming with push notifications
6//! - Query optimization for continuous execution
7//! - Performance monitoring and statistics
8
9use anyhow::{anyhow, Result};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::{broadcast, mpsc, RwLock};
15use tokio::time::interval;
16use tracing::{debug, error, info, warn};
17
18use crate::{
19    store_integration::{QueryResult, RdfStore, Triple},
20    EventMetadata, StreamEvent,
21};
22
23/// Continuous query manager
24pub struct ContinuousQueryManager {
25    /// Registered queries
26    queries: Arc<RwLock<HashMap<String, RegisteredQuery>>>,
27    /// RDF store connection
28    store: Arc<dyn RdfStore>,
29    /// Query execution engine
30    executor: Arc<QueryExecutor>,
31    /// Result dispatcher
32    dispatcher: Arc<ResultDispatcher>,
33    /// Configuration
34    config: QueryManagerConfig,
35    /// Statistics
36    stats: Arc<RwLock<QueryManagerStats>>,
37    /// Event notifier
38    event_notifier: broadcast::Sender<QueryEvent>,
39}
40
41/// Registered continuous query
42#[derive(Debug)]
43struct RegisteredQuery {
44    /// Query ID
45    id: String,
46    /// SPARQL query string
47    query: String,
48    /// Query metadata
49    metadata: QueryMetadata,
50    /// Query state
51    state: QueryState,
52    /// Result channel
53    result_channel: QueryResultChannel,
54    /// Statistics
55    stats: QueryStatistics,
56    /// Created timestamp
57    created_at: Instant,
58    /// Last execution
59    last_execution: Option<Instant>,
60}
61
62/// Query metadata
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct QueryMetadata {
65    /// Query name
66    pub name: Option<String>,
67    /// Query description
68    pub description: Option<String>,
69    /// Query owner
70    pub owner: Option<String>,
71    /// Query tags
72    pub tags: Vec<String>,
73    /// Query parameters
74    pub parameters: HashMap<String, String>,
75    /// Execution interval (for polling)
76    pub interval: Option<Duration>,
77    /// Query timeout
78    pub timeout: Duration,
79    /// Result limit
80    pub limit: Option<usize>,
81    /// Enable caching
82    pub enable_caching: bool,
83    /// Cache TTL
84    pub cache_ttl: Duration,
85}
86
87impl Default for QueryMetadata {
88    fn default() -> Self {
89        Self {
90            name: None,
91            description: None,
92            owner: None,
93            tags: Vec::new(),
94            parameters: HashMap::new(),
95            interval: Some(Duration::from_secs(60)),
96            timeout: Duration::from_secs(30),
97            limit: Some(1000),
98            enable_caching: true,
99            cache_ttl: Duration::from_secs(300),
100        }
101    }
102}
103
104/// Query state
105#[derive(Debug, Clone, PartialEq)]
106enum QueryState {
107    /// Query is active and running
108    Active,
109    /// Query is paused
110    Paused,
111    /// Query is stopped
112    Stopped,
113    /// Query has failed
114    Failed { reason: String },
115}
116
117/// Query result channel
118#[derive(Debug, Clone)]
119pub enum QueryResultChannel {
120    /// Direct channel to subscriber
121    Direct(mpsc::Sender<QueryResultUpdate>),
122    /// Broadcast channel for multiple subscribers
123    Broadcast(broadcast::Sender<QueryResultUpdate>),
124    /// Webhook delivery
125    Webhook {
126        url: String,
127        headers: HashMap<String, String>,
128    },
129    /// Stream to topic
130    Stream { topic: String },
131}
132
133/// Query result update
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct QueryResultUpdate {
136    /// Query ID
137    pub query_id: String,
138    /// Update timestamp
139    pub timestamp: chrono::DateTime<chrono::Utc>,
140    /// Update type
141    pub update_type: UpdateType,
142    /// Result bindings
143    pub bindings: Vec<HashMap<String, String>>,
144    /// Changed triples (for CONSTRUCT queries)
145    pub triples: Option<Vec<Triple>>,
146    /// Metadata
147    pub metadata: HashMap<String, String>,
148}
149
150/// Update types
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub enum UpdateType {
153    /// Initial result set
154    Initial,
155    /// Incremental addition
156    Added,
157    /// Incremental removal
158    Removed,
159    /// Full refresh
160    Refresh,
161    /// Query error
162    Error { message: String },
163}
164
165/// Query statistics
166#[derive(Debug, Clone, Default)]
167struct QueryStatistics {
168    /// Total executions
169    pub execution_count: u64,
170    /// Successful executions
171    pub success_count: u64,
172    /// Failed executions
173    pub failure_count: u64,
174    /// Total results returned
175    pub total_results: u64,
176    /// Average execution time
177    pub avg_execution_time: Duration,
178    /// Last execution time
179    pub last_execution_time: Option<Duration>,
180    /// Cache hits
181    pub cache_hits: u64,
182    /// Cache misses
183    pub cache_misses: u64,
184}
185
186/// Query manager configuration
187#[derive(Debug, Clone)]
188pub struct QueryManagerConfig {
189    /// Maximum concurrent queries
190    pub max_concurrent_queries: usize,
191    /// Maximum queries per owner
192    pub max_queries_per_owner: usize,
193    /// Default query timeout
194    pub default_timeout: Duration,
195    /// Enable query optimization
196    pub enable_optimization: bool,
197    /// Enable result caching
198    pub enable_caching: bool,
199    /// Cache size limit
200    pub cache_size_limit: usize,
201    /// Query execution thread pool size
202    pub executor_threads: usize,
203}
204
205impl Default for QueryManagerConfig {
206    fn default() -> Self {
207        Self {
208            max_concurrent_queries: 1000,
209            max_queries_per_owner: 100,
210            default_timeout: Duration::from_secs(30),
211            enable_optimization: true,
212            enable_caching: true,
213            cache_size_limit: 10000,
214            executor_threads: 4,
215        }
216    }
217}
218
219/// Query manager statistics
220#[derive(Debug, Clone, Default)]
221pub struct QueryManagerStats {
222    /// Total registered queries
223    pub total_queries: usize,
224    /// Active queries
225    pub active_queries: usize,
226    /// Total executions
227    pub total_executions: u64,
228    /// Failed executions
229    pub failed_executions: u64,
230    /// Average execution time
231    pub avg_execution_time: Duration,
232    /// Cache statistics
233    pub cache_hit_rate: f64,
234    /// Current cache size
235    pub cache_size: usize,
236}
237
238/// Query events for monitoring
239#[derive(Debug, Clone)]
240pub enum QueryEvent {
241    /// Query registered
242    QueryRegistered { id: String, query: String },
243    /// Query started
244    QueryStarted { id: String },
245    /// Query stopped
246    QueryStopped { id: String },
247    /// Query failed
248    QueryFailed { id: String, reason: String },
249    /// Results delivered
250    ResultsDelivered { id: String, count: usize },
251}
252
253/// Query executor
254struct QueryExecutor {
255    /// Execution thread pool
256    pool: tokio::runtime::Handle,
257    /// Query optimizer
258    optimizer: Arc<QueryOptimizer>,
259    /// Result cache
260    cache: Arc<RwLock<ResultCache>>,
261}
262
263/// Query optimizer
264struct QueryOptimizer {
265    /// Optimization rules
266    rules: Vec<OptimizationRule>,
267    /// Query patterns
268    patterns: HashMap<String, QueryPattern>,
269}
270
271/// Optimization rule
272struct OptimizationRule {
273    name: String,
274    condition: Box<dyn Fn(&str) -> bool + Send + Sync>,
275    transform: Box<dyn Fn(&str) -> String + Send + Sync>,
276}
277
278/// Query pattern for optimization
279#[derive(Debug, Clone)]
280struct QueryPattern {
281    pattern: String,
282    optimized: String,
283    description: String,
284}
285
286/// Result cache
287struct ResultCache {
288    /// Cached results
289    cache: HashMap<String, CachedResult>,
290    /// Cache size
291    size: usize,
292    /// Size limit
293    limit: usize,
294}
295
296/// Cached query result
297#[derive(Debug, Clone)]
298struct CachedResult {
299    /// Result data
300    data: QueryResult,
301    /// Cache timestamp
302    cached_at: Instant,
303    /// TTL
304    ttl: Duration,
305    /// Access count
306    access_count: u64,
307}
308
309/// Result dispatcher
310struct ResultDispatcher {
311    /// Dispatcher handle
312    handle: tokio::runtime::Handle,
313    /// Webhook client
314    webhook_client: reqwest::Client,
315    /// Retry configuration
316    retry_config: RetryConfig,
317}
318
319/// Retry configuration
320#[derive(Debug, Clone)]
321struct RetryConfig {
322    max_attempts: u32,
323    initial_delay: Duration,
324    max_delay: Duration,
325    exponential_backoff: bool,
326}
327
328impl ContinuousQueryManager {
329    /// Create a new continuous query manager
330    pub async fn new(store: Arc<dyn RdfStore>, config: QueryManagerConfig) -> Result<Self> {
331        let (tx, _) = broadcast::channel(1000);
332
333        let optimizer = Arc::new(QueryOptimizer::new());
334        let cache = Arc::new(RwLock::new(ResultCache::new(config.cache_size_limit)));
335
336        let executor = Arc::new(QueryExecutor {
337            pool: tokio::runtime::Handle::current(),
338            optimizer: optimizer.clone(),
339            cache: cache.clone(),
340        });
341
342        let dispatcher = Arc::new(ResultDispatcher {
343            handle: tokio::runtime::Handle::current(),
344            webhook_client: reqwest::Client::new(),
345            retry_config: RetryConfig {
346                max_attempts: 3,
347                initial_delay: Duration::from_millis(100),
348                max_delay: Duration::from_secs(10),
349                exponential_backoff: true,
350            },
351        });
352
353        Ok(Self {
354            queries: Arc::new(RwLock::new(HashMap::new())),
355            store,
356            executor,
357            dispatcher,
358            config,
359            stats: Arc::new(RwLock::new(QueryManagerStats::default())),
360            event_notifier: tx,
361        })
362    }
363
364    /// Register a continuous query
365    pub async fn register_query(
366        &self,
367        query: String,
368        metadata: QueryMetadata,
369        channel: QueryResultChannel,
370    ) -> Result<String> {
371        // Validate query
372        self.validate_query(&query)?;
373
374        // Check limits
375        let queries = self.queries.read().await;
376        if queries.len() >= self.config.max_concurrent_queries {
377            return Err(anyhow!("Maximum concurrent queries limit reached"));
378        }
379
380        if let Some(owner) = &metadata.owner {
381            let owner_count = queries
382                .values()
383                .filter(|q| q.metadata.owner.as_ref() == Some(owner))
384                .count();
385            if owner_count >= self.config.max_queries_per_owner {
386                return Err(anyhow!("Maximum queries per owner limit reached"));
387            }
388        }
389        drop(queries);
390
391        // Generate query ID
392        let query_id = uuid::Uuid::new_v4().to_string();
393
394        // Optimize query if enabled
395        let optimized_query = if self.config.enable_optimization {
396            self.executor.optimizer.optimize(&query).await
397        } else {
398            query.clone()
399        };
400
401        // Create registered query
402        let registered_query = RegisteredQuery {
403            id: query_id.clone(),
404            query: optimized_query,
405            metadata,
406            state: QueryState::Active,
407            result_channel: channel,
408            stats: QueryStatistics::default(),
409            created_at: Instant::now(),
410            last_execution: None,
411        };
412
413        // Register query
414        self.queries
415            .write()
416            .await
417            .insert(query_id.clone(), registered_query);
418
419        // Update statistics
420        let mut stats = self.stats.write().await;
421        stats.total_queries += 1;
422        stats.active_queries += 1;
423        drop(stats);
424
425        // Start query execution
426        self.start_query_execution(&query_id).await?;
427
428        // Notify
429        let _ = self.event_notifier.send(QueryEvent::QueryRegistered {
430            id: query_id.clone(),
431            query,
432        });
433
434        info!("Registered continuous query: {}", query_id);
435        Ok(query_id)
436    }
437
438    /// Unregister a query
439    pub async fn unregister_query(&self, query_id: &str) -> Result<()> {
440        let mut queries = self.queries.write().await;
441        let _query = queries
442            .remove(query_id)
443            .ok_or_else(|| anyhow!("Query not found"))?;
444
445        // Update statistics
446        self.stats.write().await.active_queries -= 1;
447
448        // Notify
449        let _ = self.event_notifier.send(QueryEvent::QueryStopped {
450            id: query_id.to_string(),
451        });
452
453        info!("Unregistered query: {}", query_id);
454        Ok(())
455    }
456
457    /// Pause a query
458    pub async fn pause_query(&self, query_id: &str) -> Result<()> {
459        let mut queries = self.queries.write().await;
460        let query = queries
461            .get_mut(query_id)
462            .ok_or_else(|| anyhow!("Query not found"))?;
463
464        query.state = QueryState::Paused;
465        Ok(())
466    }
467
468    /// Resume a query
469    pub async fn resume_query(&self, query_id: &str) -> Result<()> {
470        let mut queries = self.queries.write().await;
471        let query = queries
472            .get_mut(query_id)
473            .ok_or_else(|| anyhow!("Query not found"))?;
474
475        if query.state == QueryState::Paused {
476            query.state = QueryState::Active;
477            drop(queries);
478            self.start_query_execution(query_id).await?;
479        }
480
481        Ok(())
482    }
483
484    /// Validate a SPARQL query
485    fn validate_query(&self, query: &str) -> Result<()> {
486        // Basic validation - check for required keywords
487        let query_lower = query.to_lowercase();
488
489        if !query_lower.contains("select")
490            && !query_lower.contains("construct")
491            && !query_lower.contains("ask")
492            && !query_lower.contains("describe")
493        {
494            return Err(anyhow!("Invalid SPARQL query: missing query form"));
495        }
496
497        // Check for dangerous operations
498        if query_lower.contains("drop")
499            || query_lower.contains("clear")
500            || query_lower.contains("delete")
501            || query_lower.contains("insert")
502        {
503            return Err(anyhow!(
504                "Continuous queries cannot contain update operations"
505            ));
506        }
507
508        Ok(())
509    }
510
511    /// Register a SPARQL subscription query with enhanced syntax
512    pub async fn register_subscription(
513        &self,
514        query: String,
515        metadata: QueryMetadata,
516        channel: QueryResultChannel,
517    ) -> Result<String> {
518        // Parse subscription syntax extensions
519        let enhanced_query = self.parse_subscription_syntax(&query)?;
520
521        // Register as continuous query
522        self.register_query(enhanced_query, metadata, channel).await
523    }
524
525    /// Parse SPARQL subscription syntax extensions
526    fn parse_subscription_syntax(&self, query: &str) -> Result<String> {
527        let mut enhanced_query = query.to_string();
528
529        // Check for SUBSCRIBE keyword (custom extension)
530        if enhanced_query.to_lowercase().contains("subscribe") {
531            // Convert SUBSCRIBE to SELECT for standard SPARQL processing
532            enhanced_query = enhanced_query.replace("SUBSCRIBE", "SELECT");
533            enhanced_query = enhanced_query.replace("subscribe", "SELECT");
534        }
535
536        // Parse ON CHANGE clauses for change detection
537        if enhanced_query.to_lowercase().contains("on change") {
538            // Extract change detection patterns
539            // This would be expanded to parse custom change detection syntax
540            info!("Detected ON CHANGE clause in subscription query");
541        }
542
543        // Parse WINDOW clauses for temporal windows
544        if enhanced_query.to_lowercase().contains("window") {
545            // Extract windowing information
546            info!("Detected WINDOW clause in subscription query");
547        }
548
549        Ok(enhanced_query)
550    }
551
552    /// Start query execution
553    async fn start_query_execution(&self, query_id: &str) -> Result<()> {
554        let queries = self.queries.clone();
555        let store = self.store.clone();
556        let executor = self.executor.clone();
557        let dispatcher = self.dispatcher.clone();
558        let stats = self.stats.clone();
559        let event_notifier = self.event_notifier.clone();
560        let query_id = query_id.to_string();
561        let query_id_clone = query_id.clone();
562
563        tokio::spawn(async move {
564            let query_data = {
565                let queries_guard = queries.read().await;
566                queries_guard.get(&query_id_clone).map(|q| {
567                    (
568                        q.query.clone(),
569                        q.metadata.clone(),
570                        q.metadata.interval.unwrap_or(Duration::from_secs(60)),
571                    )
572                })
573            };
574
575            if let Some((query, metadata, poll_interval)) = query_data {
576                let mut interval = interval(poll_interval);
577                let mut last_result_hash = None;
578
579                loop {
580                    interval.tick().await;
581
582                    // Check if query is still active
583                    let state = {
584                        let queries_guard = queries.read().await;
585                        queries_guard.get(&query_id_clone).map(|q| q.state.clone())
586                    };
587
588                    match state {
589                        Some(QueryState::Active) => {
590                            // Execute query
591                            let start_time = Instant::now();
592
593                            match Self::execute_query(
594                                &store,
595                                &executor,
596                                &query,
597                                &metadata,
598                                last_result_hash.as_ref(),
599                            )
600                            .await
601                            {
602                                Ok((result, hash)) => {
603                                    let execution_time = start_time.elapsed();
604
605                                    // Update query statistics
606                                    {
607                                        let mut queries_guard = queries.write().await;
608                                        if let Some(q) = queries_guard.get_mut(&query_id_clone) {
609                                            q.stats.execution_count += 1;
610                                            q.stats.success_count += 1;
611                                            q.stats.total_results += result.bindings.len() as u64;
612                                            q.stats.last_execution_time = Some(execution_time);
613                                            q.last_execution = Some(Instant::now());
614
615                                            // Update average execution time
616                                            let count = q.stats.execution_count as u32;
617                                            q.stats.avg_execution_time =
618                                                (q.stats.avg_execution_time * (count - 1)
619                                                    + execution_time)
620                                                    / count;
621                                        }
622                                    }
623
624                                    // Check if results changed
625                                    if Some(&hash) != last_result_hash.as_ref() {
626                                        // Create update
627                                        let update = QueryResultUpdate {
628                                            query_id: query_id_clone.clone(),
629                                            timestamp: chrono::Utc::now(),
630                                            update_type: if last_result_hash.is_none() {
631                                                UpdateType::Initial
632                                            } else {
633                                                UpdateType::Refresh
634                                            },
635                                            bindings: result.bindings.clone(),
636                                            triples: None,
637                                            metadata: HashMap::new(),
638                                        };
639
640                                        // Dispatch results
641                                        match Self::dispatch_results(
642                                            &queries,
643                                            &dispatcher,
644                                            &query_id_clone,
645                                            update,
646                                        )
647                                        .await
648                                        {
649                                            Err(e) => {
650                                                error!(
651                                                    "Failed to dispatch results for query {}: {}",
652                                                    query_id_clone, e
653                                                );
654                                            }
655                                            _ => {
656                                                let _ = event_notifier.send(
657                                                    QueryEvent::ResultsDelivered {
658                                                        id: query_id_clone.clone(),
659                                                        count: result.bindings.len(),
660                                                    },
661                                                );
662                                            }
663                                        }
664
665                                        last_result_hash = Some(hash);
666                                    }
667
668                                    // Update global statistics
669                                    stats.write().await.total_executions += 1;
670                                }
671                                Err(e) => {
672                                    error!("Query execution failed for {}: {}", query_id_clone, e);
673
674                                    // Update query statistics
675                                    {
676                                        let mut queries_guard = queries.write().await;
677                                        if let Some(q) = queries_guard.get_mut(&query_id_clone) {
678                                            q.stats.execution_count += 1;
679                                            q.stats.failure_count += 1;
680                                        }
681                                    }
682
683                                    // Update global statistics
684                                    stats.write().await.failed_executions += 1;
685
686                                    // Send error update
687                                    let update = QueryResultUpdate {
688                                        query_id: query_id_clone.clone(),
689                                        timestamp: chrono::Utc::now(),
690                                        update_type: UpdateType::Error {
691                                            message: e.to_string(),
692                                        },
693                                        bindings: vec![],
694                                        triples: None,
695                                        metadata: HashMap::new(),
696                                    };
697
698                                    let _ = Self::dispatch_results(
699                                        &queries,
700                                        &dispatcher,
701                                        &query_id_clone,
702                                        update,
703                                    )
704                                    .await;
705
706                                    let _ = event_notifier.send(QueryEvent::QueryFailed {
707                                        id: query_id_clone.clone(),
708                                        reason: e.to_string(),
709                                    });
710                                }
711                            }
712                        }
713                        Some(QueryState::Paused) => {
714                            // Skip execution
715                            continue;
716                        }
717                        Some(QueryState::Stopped) | None => {
718                            // Exit loop
719                            break;
720                        }
721                        Some(QueryState::Failed { .. }) => {
722                            // Exit loop
723                            break;
724                        }
725                    }
726                }
727            }
728        });
729
730        let _ = self.event_notifier.send(QueryEvent::QueryStarted {
731            id: query_id.to_string(),
732        });
733
734        Ok(())
735    }
736
737    /// Execute a query
738    async fn execute_query(
739        store: &Arc<dyn RdfStore>,
740        executor: &Arc<QueryExecutor>,
741        query: &str,
742        metadata: &QueryMetadata,
743        _last_hash: Option<&String>,
744    ) -> Result<(QueryResult, String)> {
745        // Check cache if enabled
746        if metadata.enable_caching {
747            if let Some(cached) = executor.cache.read().await.get(query, metadata.cache_ttl) {
748                let hash = Self::hash_result(&cached);
749                return Ok((cached, hash));
750            }
751        }
752
753        // Execute query with timeout
754        let result = tokio::time::timeout(metadata.timeout, store.query(query))
755            .await
756            .map_err(|_| anyhow!("Query timeout"))?
757            .map_err(|e| anyhow!("Query execution failed: {}", e))?;
758
759        // Apply limit if specified
760        let result = if let Some(limit) = metadata.limit {
761            QueryResult {
762                bindings: result.bindings.into_iter().take(limit).collect(),
763            }
764        } else {
765            result
766        };
767
768        // Cache result if enabled
769        if metadata.enable_caching {
770            executor
771                .cache
772                .write()
773                .await
774                .put(query.to_string(), result.clone(), metadata.cache_ttl);
775        }
776
777        let hash = Self::hash_result(&result);
778        Ok((result, hash))
779    }
780
781    /// Hash query result for change detection
782    fn hash_result(result: &QueryResult) -> String {
783        use std::collections::hash_map::DefaultHasher;
784        use std::hash::{Hash, Hasher};
785
786        let mut hasher = DefaultHasher::new();
787        for binding in &result.bindings {
788            for (key, value) in binding {
789                key.hash(&mut hasher);
790                value.hash(&mut hasher);
791            }
792        }
793
794        hasher.finish().to_string()
795    }
796
797    /// Dispatch query results
798    async fn dispatch_results(
799        queries: &Arc<RwLock<HashMap<String, RegisteredQuery>>>,
800        dispatcher: &Arc<ResultDispatcher>,
801        query_id: &str,
802        update: QueryResultUpdate,
803    ) -> Result<()> {
804        let channel = {
805            let queries_guard = queries.read().await;
806            queries_guard
807                .get(query_id)
808                .map(|q| q.result_channel.clone())
809                .ok_or_else(|| anyhow!("Query not found"))?
810        };
811
812        match channel {
813            QueryResultChannel::Direct(sender) => sender
814                .send(update)
815                .await
816                .map_err(|_| anyhow!("Failed to send to direct channel")),
817            QueryResultChannel::Broadcast(sender) => {
818                sender
819                    .send(update)
820                    .map_err(|_| anyhow!("Failed to broadcast results"))?;
821                Ok(())
822            }
823            QueryResultChannel::Webhook { url, headers } => {
824                dispatcher.send_webhook(&url, &headers, update).await
825            }
826            QueryResultChannel::Stream { topic } => {
827                // Publish to stream topic (using internal stream producer)
828                dispatcher.send_stream(&topic, update).await
829            }
830        }
831    }
832
833    /// Get query status
834    pub async fn get_query_status(&self, query_id: &str) -> Result<QueryStatus> {
835        let queries = self.queries.read().await;
836        let query = queries
837            .get(query_id)
838            .ok_or_else(|| anyhow!("Query not found"))?;
839
840        Ok(QueryStatus {
841            id: query.id.clone(),
842            state: format!("{:?}", query.state),
843            created_at: query.created_at.elapsed(),
844            last_execution: query.last_execution.map(|t| t.elapsed()),
845            execution_count: query.stats.execution_count,
846            success_rate: if query.stats.execution_count > 0 {
847                query.stats.success_count as f64 / query.stats.execution_count as f64
848            } else {
849                0.0
850            },
851            total_results: query.stats.total_results,
852            avg_execution_time: query.stats.avg_execution_time,
853        })
854    }
855
856    /// List all queries
857    pub async fn list_queries(&self) -> Vec<QueryInfo> {
858        let queries = self.queries.read().await;
859        queries
860            .values()
861            .map(|q| QueryInfo {
862                id: q.id.clone(),
863                name: q.metadata.name.clone(),
864                owner: q.metadata.owner.clone(),
865                state: format!("{:?}", q.state),
866                created_at: q.created_at.elapsed(),
867            })
868            .collect()
869    }
870
871    /// Get manager statistics
872    pub async fn get_stats(&self) -> QueryManagerStats {
873        self.stats.read().await.clone()
874    }
875
876    /// Subscribe to query events
877    pub fn subscribe(&self) -> broadcast::Receiver<QueryEvent> {
878        self.event_notifier.subscribe()
879    }
880}
881
882/// Query status information
883#[derive(Debug, Clone, Serialize, Deserialize)]
884pub struct QueryStatus {
885    pub id: String,
886    pub state: String,
887    pub created_at: Duration,
888    pub last_execution: Option<Duration>,
889    pub execution_count: u64,
890    pub success_rate: f64,
891    pub total_results: u64,
892    pub avg_execution_time: Duration,
893}
894
895/// Query information
896#[derive(Debug, Clone, Serialize, Deserialize)]
897pub struct QueryInfo {
898    pub id: String,
899    pub name: Option<String>,
900    pub owner: Option<String>,
901    pub state: String,
902    pub created_at: Duration,
903}
904
905impl QueryOptimizer {
906    /// Create a new query optimizer
907    fn new() -> Self {
908        let mut optimizer = Self {
909            rules: Vec::new(),
910            patterns: HashMap::new(),
911        };
912
913        // Add default optimization rules
914        optimizer.add_default_rules();
915        optimizer
916    }
917
918    /// Add default optimization rules
919    fn add_default_rules(&mut self) {
920        // Rule: Remove redundant DISTINCT
921        self.rules.push(OptimizationRule {
922            name: "remove-redundant-distinct".to_string(),
923            condition: Box::new(|query| query.contains("DISTINCT") && !query.contains("ORDER BY")),
924            transform: Box::new(|query| {
925                // This would implement the actual transformation
926                query.to_string()
927            }),
928        });
929
930        // Rule: Optimize filter placement
931        self.rules.push(OptimizationRule {
932            name: "optimize-filter-placement".to_string(),
933            condition: Box::new(|query| query.contains("FILTER") && query.contains("OPTIONAL")),
934            transform: Box::new(|query| {
935                // This would move filters before optionals when possible
936                query.to_string()
937            }),
938        });
939    }
940
941    /// Optimize a query
942    async fn optimize(&self, query: &str) -> String {
943        let mut optimized = query.to_string();
944
945        // Apply optimization rules
946        for rule in &self.rules {
947            if (rule.condition)(&optimized) {
948                optimized = (rule.transform)(&optimized);
949                debug!("Applied optimization rule: {}", rule.name);
950            }
951        }
952
953        optimized
954    }
955}
956
957impl ResultCache {
958    /// Create a new result cache
959    fn new(limit: usize) -> Self {
960        Self {
961            cache: HashMap::new(),
962            size: 0,
963            limit,
964        }
965    }
966
967    /// Get cached result
968    fn get(&self, query: &str, _ttl: Duration) -> Option<QueryResult> {
969        self.cache.get(query).and_then(|cached| {
970            if cached.cached_at.elapsed() < cached.ttl {
971                Some(cached.data.clone())
972            } else {
973                None
974            }
975        })
976    }
977
978    /// Put result in cache
979    fn put(&mut self, query: String, result: QueryResult, ttl: Duration) {
980        let size = result.bindings.len();
981
982        // Evict old entries if needed
983        while self.size + size > self.limit && !self.cache.is_empty() {
984            // Simple LRU eviction - remove oldest
985            if let Some((oldest_key, _)) = self.cache.iter().min_by_key(|(_, v)| v.cached_at) {
986                let key = oldest_key.clone();
987                if let Some(removed) = self.cache.remove(&key) {
988                    self.size -= removed.data.bindings.len();
989                }
990            }
991        }
992
993        self.cache.insert(
994            query,
995            CachedResult {
996                data: result,
997                cached_at: Instant::now(),
998                ttl,
999                access_count: 0,
1000            },
1001        );
1002
1003        self.size += size;
1004    }
1005}
1006
1007impl ResultDispatcher {
1008    /// Create a stream producer for a specific topic
1009    async fn create_stream_producer_for_topic(&self, topic: &str) -> Result<crate::StreamProducer> {
1010        // Create a default stream configuration for this topic
1011        let config = crate::StreamConfig {
1012            backend: crate::StreamBackendType::Memory {
1013                max_size: Some(10000),
1014                persistence: false,
1015            },
1016            topic: topic.to_string(),
1017            batch_size: 100,
1018            flush_interval_ms: 100,
1019            max_connections: 10,
1020            connection_timeout: Duration::from_secs(30),
1021            enable_compression: false,
1022            compression_type: crate::CompressionType::None,
1023            retry_config: crate::RetryConfig::default(),
1024            circuit_breaker: crate::CircuitBreakerConfig::default(),
1025            security: crate::SecurityConfig::default(),
1026            performance: crate::StreamPerformanceConfig::default(),
1027            monitoring: crate::MonitoringConfig::default(),
1028        };
1029
1030        // Create and return the producer
1031        crate::StreamProducer::new(config).await
1032    }
1033    /// Send results via webhook
1034    async fn send_webhook(
1035        &self,
1036        url: &str,
1037        headers: &HashMap<String, String>,
1038        update: QueryResultUpdate,
1039    ) -> Result<()> {
1040        let mut request = self
1041            .webhook_client
1042            .post(url)
1043            .json(&update)
1044            .timeout(Duration::from_secs(30));
1045
1046        // Add custom headers
1047        for (key, value) in headers {
1048            request = request.header(key, value);
1049        }
1050
1051        // Send with retry
1052        let mut attempts = 0;
1053        let mut delay = self.retry_config.initial_delay;
1054
1055        loop {
1056            attempts += 1;
1057
1058            match request
1059                .try_clone()
1060                .expect("request should be cloneable for retry")
1061                .send()
1062                .await
1063            {
1064                Ok(response) => {
1065                    if response.status().is_success() {
1066                        return Ok(());
1067                    } else {
1068                        let status = response.status();
1069                        let body = response.text().await.unwrap_or_default();
1070
1071                        if attempts >= self.retry_config.max_attempts {
1072                            return Err(anyhow!("Webhook failed with status {}: {}", status, body));
1073                        }
1074
1075                        warn!("Webhook attempt {} failed with status {}", attempts, status);
1076                    }
1077                }
1078                Err(e) => {
1079                    if attempts >= self.retry_config.max_attempts {
1080                        return Err(anyhow!("Webhook failed after {} attempts: {}", attempts, e));
1081                    }
1082
1083                    warn!("Webhook attempt {} failed: {}", attempts, e);
1084                }
1085            }
1086
1087            // Wait before retry
1088            tokio::time::sleep(delay).await;
1089
1090            // Update delay for next attempt
1091            if self.retry_config.exponential_backoff {
1092                delay = (delay * 2).min(self.retry_config.max_delay);
1093            }
1094        }
1095    }
1096
1097    /// Send results to stream topic
1098    async fn send_stream(&self, topic: &str, update: QueryResultUpdate) -> Result<()> {
1099        // Convert query result update to stream event
1100        let stream_event = match update.update_type {
1101            UpdateType::Added => StreamEvent::QueryResultAdded {
1102                query_id: update.query_id.clone(),
1103                result: crate::event::QueryResult {
1104                    query_id: update.query_id.clone(),
1105                    bindings: update.bindings.first().cloned().unwrap_or_default(),
1106                    execution_time: Duration::from_millis(0),
1107                },
1108                metadata: EventMetadata {
1109                    event_id: uuid::Uuid::new_v4().to_string(),
1110                    timestamp: chrono::Utc::now(),
1111                    source: "sparql-streaming".to_string(),
1112                    user: Some("query-engine".to_string()),
1113                    context: Some(update.query_id.clone()),
1114                    caused_by: None,
1115                    version: "1.0".to_string(),
1116                    properties: {
1117                        let mut props = std::collections::HashMap::new();
1118                        props.insert("topic".to_string(), topic.to_string());
1119                        props.insert("update_type".to_string(), "result_added".to_string());
1120                        props
1121                    },
1122                    checksum: None,
1123                },
1124            },
1125            UpdateType::Removed => StreamEvent::QueryResultRemoved {
1126                query_id: update.query_id.clone(),
1127                result: crate::event::QueryResult {
1128                    query_id: update.query_id.clone(),
1129                    bindings: update.bindings.first().cloned().unwrap_or_default(),
1130                    execution_time: Duration::from_millis(0),
1131                },
1132                metadata: EventMetadata {
1133                    event_id: uuid::Uuid::new_v4().to_string(),
1134                    timestamp: chrono::Utc::now(),
1135                    source: "sparql-streaming".to_string(),
1136                    user: Some("query-engine".to_string()),
1137                    context: Some(update.query_id.clone()),
1138                    caused_by: None,
1139                    version: "1.0".to_string(),
1140                    properties: {
1141                        let mut props = std::collections::HashMap::new();
1142                        props.insert("topic".to_string(), topic.to_string());
1143                        props.insert("update_type".to_string(), "result_removed".to_string());
1144                        props
1145                    },
1146                    checksum: None,
1147                },
1148            },
1149            UpdateType::Initial | UpdateType::Refresh => {
1150                // For initial and refresh updates, we just use QueryResultAdded
1151                StreamEvent::QueryResultAdded {
1152                    query_id: update.query_id.clone(),
1153                    result: crate::event::QueryResult {
1154                        query_id: update.query_id.clone(),
1155                        bindings: update.bindings.first().cloned().unwrap_or_default(),
1156                        execution_time: Duration::from_millis(0), // Default execution time
1157                    },
1158                    metadata: EventMetadata {
1159                        event_id: uuid::Uuid::new_v4().to_string(),
1160                        timestamp: chrono::Utc::now(),
1161                        source: "sparql-streaming".to_string(),
1162                        user: Some("query-engine".to_string()),
1163                        context: Some(update.query_id.clone()),
1164                        caused_by: None,
1165                        version: "1.0".to_string(),
1166                        properties: {
1167                            let mut props = std::collections::HashMap::new();
1168                            props.insert("topic".to_string(), topic.to_string());
1169                            props.insert(
1170                                "update_type".to_string(),
1171                                format!("{:?}", update.update_type).to_lowercase(),
1172                            );
1173                            props
1174                        },
1175                        checksum: None,
1176                    },
1177                }
1178            }
1179            UpdateType::Error { message } => {
1180                // For errors, we'll just log and return Ok for now
1181                warn!("Query error in stream: {}", message);
1182                return Ok(());
1183            }
1184        };
1185
1186        // Create a stream producer for the topic and publish the event
1187        match self.create_stream_producer_for_topic(topic).await {
1188            Ok(mut producer) => match producer.publish(stream_event).await {
1189                Ok(_) => {
1190                    info!(
1191                        "Successfully published query result to stream topic '{}'",
1192                        topic
1193                    );
1194                }
1195                Err(e) => {
1196                    error!("Failed to publish to stream topic '{}': {}", topic, e);
1197                    return Err(anyhow!("Stream publishing failed: {}", e));
1198                }
1199            },
1200            Err(e) => {
1201                error!(
1202                    "Failed to create stream producer for topic '{}': {}",
1203                    topic, e
1204                );
1205                return Err(anyhow!("Stream producer creation failed: {}", e));
1206            }
1207        }
1208
1209        Ok(())
1210    }
1211}
1212
1213/// Create subscription channel for query results
1214pub fn create_subscription_channel() -> (
1215    mpsc::Sender<QueryResultUpdate>,
1216    mpsc::Receiver<QueryResultUpdate>,
1217) {
1218    mpsc::channel(100)
1219}
1220
1221/// Create broadcast channel for query results
1222pub fn create_broadcast_channel() -> (
1223    broadcast::Sender<QueryResultUpdate>,
1224    broadcast::Receiver<QueryResultUpdate>,
1225) {
1226    broadcast::channel(100)
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231    use super::*;
1232    use crate::store_integration::tests::MockRdfStore;
1233
1234    #[tokio::test]
1235    async fn test_query_registration() {
1236        let store = Arc::new(MockRdfStore {
1237            log_position: Arc::new(RwLock::new(0)),
1238            changes: Arc::new(RwLock::new(vec![])),
1239        });
1240
1241        let manager = ContinuousQueryManager::new(store, QueryManagerConfig::default())
1242            .await
1243            .unwrap();
1244
1245        let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10";
1246        let metadata = QueryMetadata::default();
1247        let (tx, _rx) = create_subscription_channel();
1248        let channel = QueryResultChannel::Direct(tx);
1249
1250        let query_id = manager
1251            .register_query(query.to_string(), metadata, channel)
1252            .await
1253            .unwrap();
1254
1255        assert!(!query_id.is_empty());
1256
1257        // Check query is registered
1258        let queries = manager.list_queries().await;
1259        assert_eq!(queries.len(), 1);
1260        assert_eq!(queries[0].id, query_id);
1261    }
1262
1263    #[tokio::test]
1264    async fn test_query_validation() {
1265        let store = Arc::new(MockRdfStore {
1266            log_position: Arc::new(RwLock::new(0)),
1267            changes: Arc::new(RwLock::new(vec![])),
1268        });
1269
1270        let manager = ContinuousQueryManager::new(store, QueryManagerConfig::default())
1271            .await
1272            .unwrap();
1273
1274        // Test invalid query
1275        let invalid_query = "DELETE WHERE { ?s ?p ?o }";
1276        let result = manager.validate_query(invalid_query);
1277        assert!(result.is_err());
1278
1279        // Test valid query
1280        let valid_query = "SELECT ?s WHERE { ?s ?p ?o }";
1281        let result = manager.validate_query(valid_query);
1282        assert!(result.is_ok());
1283    }
1284}