1use anyhow::{anyhow, Result};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::{broadcast, mpsc, RwLock};
15use tokio::time::interval;
16use tracing::{debug, error, info, warn};
17
18use crate::{
19 store_integration::{QueryResult, RdfStore, Triple},
20 EventMetadata, StreamEvent,
21};
22
23pub struct ContinuousQueryManager {
25 queries: Arc<RwLock<HashMap<String, RegisteredQuery>>>,
27 store: Arc<dyn RdfStore>,
29 executor: Arc<QueryExecutor>,
31 dispatcher: Arc<ResultDispatcher>,
33 config: QueryManagerConfig,
35 stats: Arc<RwLock<QueryManagerStats>>,
37 event_notifier: broadcast::Sender<QueryEvent>,
39}
40
41#[derive(Debug)]
43struct RegisteredQuery {
44 id: String,
46 query: String,
48 metadata: QueryMetadata,
50 state: QueryState,
52 result_channel: QueryResultChannel,
54 stats: QueryStatistics,
56 created_at: Instant,
58 last_execution: Option<Instant>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct QueryMetadata {
65 pub name: Option<String>,
67 pub description: Option<String>,
69 pub owner: Option<String>,
71 pub tags: Vec<String>,
73 pub parameters: HashMap<String, String>,
75 pub interval: Option<Duration>,
77 pub timeout: Duration,
79 pub limit: Option<usize>,
81 pub enable_caching: bool,
83 pub cache_ttl: Duration,
85}
86
87impl Default for QueryMetadata {
88 fn default() -> Self {
89 Self {
90 name: None,
91 description: None,
92 owner: None,
93 tags: Vec::new(),
94 parameters: HashMap::new(),
95 interval: Some(Duration::from_secs(60)),
96 timeout: Duration::from_secs(30),
97 limit: Some(1000),
98 enable_caching: true,
99 cache_ttl: Duration::from_secs(300),
100 }
101 }
102}
103
104#[derive(Debug, Clone, PartialEq)]
106enum QueryState {
107 Active,
109 Paused,
111 Stopped,
113 Failed { reason: String },
115}
116
117#[derive(Debug, Clone)]
119pub enum QueryResultChannel {
120 Direct(mpsc::Sender<QueryResultUpdate>),
122 Broadcast(broadcast::Sender<QueryResultUpdate>),
124 Webhook {
126 url: String,
127 headers: HashMap<String, String>,
128 },
129 Stream { topic: String },
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct QueryResultUpdate {
136 pub query_id: String,
138 pub timestamp: chrono::DateTime<chrono::Utc>,
140 pub update_type: UpdateType,
142 pub bindings: Vec<HashMap<String, String>>,
144 pub triples: Option<Vec<Triple>>,
146 pub metadata: HashMap<String, String>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub enum UpdateType {
153 Initial,
155 Added,
157 Removed,
159 Refresh,
161 Error { message: String },
163}
164
165#[derive(Debug, Clone, Default)]
167struct QueryStatistics {
168 pub execution_count: u64,
170 pub success_count: u64,
172 pub failure_count: u64,
174 pub total_results: u64,
176 pub avg_execution_time: Duration,
178 pub last_execution_time: Option<Duration>,
180 pub cache_hits: u64,
182 pub cache_misses: u64,
184}
185
186#[derive(Debug, Clone)]
188pub struct QueryManagerConfig {
189 pub max_concurrent_queries: usize,
191 pub max_queries_per_owner: usize,
193 pub default_timeout: Duration,
195 pub enable_optimization: bool,
197 pub enable_caching: bool,
199 pub cache_size_limit: usize,
201 pub executor_threads: usize,
203}
204
205impl Default for QueryManagerConfig {
206 fn default() -> Self {
207 Self {
208 max_concurrent_queries: 1000,
209 max_queries_per_owner: 100,
210 default_timeout: Duration::from_secs(30),
211 enable_optimization: true,
212 enable_caching: true,
213 cache_size_limit: 10000,
214 executor_threads: 4,
215 }
216 }
217}
218
219#[derive(Debug, Clone, Default)]
221pub struct QueryManagerStats {
222 pub total_queries: usize,
224 pub active_queries: usize,
226 pub total_executions: u64,
228 pub failed_executions: u64,
230 pub avg_execution_time: Duration,
232 pub cache_hit_rate: f64,
234 pub cache_size: usize,
236}
237
238#[derive(Debug, Clone)]
240pub enum QueryEvent {
241 QueryRegistered { id: String, query: String },
243 QueryStarted { id: String },
245 QueryStopped { id: String },
247 QueryFailed { id: String, reason: String },
249 ResultsDelivered { id: String, count: usize },
251}
252
253struct QueryExecutor {
255 pool: tokio::runtime::Handle,
257 optimizer: Arc<QueryOptimizer>,
259 cache: Arc<RwLock<ResultCache>>,
261}
262
263struct QueryOptimizer {
265 rules: Vec<OptimizationRule>,
267 patterns: HashMap<String, QueryPattern>,
269}
270
271struct OptimizationRule {
273 name: String,
274 condition: Box<dyn Fn(&str) -> bool + Send + Sync>,
275 transform: Box<dyn Fn(&str) -> String + Send + Sync>,
276}
277
278#[derive(Debug, Clone)]
280struct QueryPattern {
281 pattern: String,
282 optimized: String,
283 description: String,
284}
285
286struct ResultCache {
288 cache: HashMap<String, CachedResult>,
290 size: usize,
292 limit: usize,
294}
295
296#[derive(Debug, Clone)]
298struct CachedResult {
299 data: QueryResult,
301 cached_at: Instant,
303 ttl: Duration,
305 access_count: u64,
307}
308
309struct ResultDispatcher {
311 handle: tokio::runtime::Handle,
313 webhook_client: reqwest::Client,
315 retry_config: RetryConfig,
317}
318
319#[derive(Debug, Clone)]
321struct RetryConfig {
322 max_attempts: u32,
323 initial_delay: Duration,
324 max_delay: Duration,
325 exponential_backoff: bool,
326}
327
328impl ContinuousQueryManager {
329 pub async fn new(store: Arc<dyn RdfStore>, config: QueryManagerConfig) -> Result<Self> {
331 let (tx, _) = broadcast::channel(1000);
332
333 let optimizer = Arc::new(QueryOptimizer::new());
334 let cache = Arc::new(RwLock::new(ResultCache::new(config.cache_size_limit)));
335
336 let executor = Arc::new(QueryExecutor {
337 pool: tokio::runtime::Handle::current(),
338 optimizer: optimizer.clone(),
339 cache: cache.clone(),
340 });
341
342 let dispatcher = Arc::new(ResultDispatcher {
343 handle: tokio::runtime::Handle::current(),
344 webhook_client: reqwest::Client::new(),
345 retry_config: RetryConfig {
346 max_attempts: 3,
347 initial_delay: Duration::from_millis(100),
348 max_delay: Duration::from_secs(10),
349 exponential_backoff: true,
350 },
351 });
352
353 Ok(Self {
354 queries: Arc::new(RwLock::new(HashMap::new())),
355 store,
356 executor,
357 dispatcher,
358 config,
359 stats: Arc::new(RwLock::new(QueryManagerStats::default())),
360 event_notifier: tx,
361 })
362 }
363
364 pub async fn register_query(
366 &self,
367 query: String,
368 metadata: QueryMetadata,
369 channel: QueryResultChannel,
370 ) -> Result<String> {
371 self.validate_query(&query)?;
373
374 let queries = self.queries.read().await;
376 if queries.len() >= self.config.max_concurrent_queries {
377 return Err(anyhow!("Maximum concurrent queries limit reached"));
378 }
379
380 if let Some(owner) = &metadata.owner {
381 let owner_count = queries
382 .values()
383 .filter(|q| q.metadata.owner.as_ref() == Some(owner))
384 .count();
385 if owner_count >= self.config.max_queries_per_owner {
386 return Err(anyhow!("Maximum queries per owner limit reached"));
387 }
388 }
389 drop(queries);
390
391 let query_id = uuid::Uuid::new_v4().to_string();
393
394 let optimized_query = if self.config.enable_optimization {
396 self.executor.optimizer.optimize(&query).await
397 } else {
398 query.clone()
399 };
400
401 let registered_query = RegisteredQuery {
403 id: query_id.clone(),
404 query: optimized_query,
405 metadata,
406 state: QueryState::Active,
407 result_channel: channel,
408 stats: QueryStatistics::default(),
409 created_at: Instant::now(),
410 last_execution: None,
411 };
412
413 self.queries
415 .write()
416 .await
417 .insert(query_id.clone(), registered_query);
418
419 let mut stats = self.stats.write().await;
421 stats.total_queries += 1;
422 stats.active_queries += 1;
423 drop(stats);
424
425 self.start_query_execution(&query_id).await?;
427
428 let _ = self.event_notifier.send(QueryEvent::QueryRegistered {
430 id: query_id.clone(),
431 query,
432 });
433
434 info!("Registered continuous query: {}", query_id);
435 Ok(query_id)
436 }
437
438 pub async fn unregister_query(&self, query_id: &str) -> Result<()> {
440 let mut queries = self.queries.write().await;
441 let _query = queries
442 .remove(query_id)
443 .ok_or_else(|| anyhow!("Query not found"))?;
444
445 self.stats.write().await.active_queries -= 1;
447
448 let _ = self.event_notifier.send(QueryEvent::QueryStopped {
450 id: query_id.to_string(),
451 });
452
453 info!("Unregistered query: {}", query_id);
454 Ok(())
455 }
456
457 pub async fn pause_query(&self, query_id: &str) -> Result<()> {
459 let mut queries = self.queries.write().await;
460 let query = queries
461 .get_mut(query_id)
462 .ok_or_else(|| anyhow!("Query not found"))?;
463
464 query.state = QueryState::Paused;
465 Ok(())
466 }
467
468 pub async fn resume_query(&self, query_id: &str) -> Result<()> {
470 let mut queries = self.queries.write().await;
471 let query = queries
472 .get_mut(query_id)
473 .ok_or_else(|| anyhow!("Query not found"))?;
474
475 if query.state == QueryState::Paused {
476 query.state = QueryState::Active;
477 drop(queries);
478 self.start_query_execution(query_id).await?;
479 }
480
481 Ok(())
482 }
483
484 fn validate_query(&self, query: &str) -> Result<()> {
486 let query_lower = query.to_lowercase();
488
489 if !query_lower.contains("select")
490 && !query_lower.contains("construct")
491 && !query_lower.contains("ask")
492 && !query_lower.contains("describe")
493 {
494 return Err(anyhow!("Invalid SPARQL query: missing query form"));
495 }
496
497 if query_lower.contains("drop")
499 || query_lower.contains("clear")
500 || query_lower.contains("delete")
501 || query_lower.contains("insert")
502 {
503 return Err(anyhow!(
504 "Continuous queries cannot contain update operations"
505 ));
506 }
507
508 Ok(())
509 }
510
511 pub async fn register_subscription(
513 &self,
514 query: String,
515 metadata: QueryMetadata,
516 channel: QueryResultChannel,
517 ) -> Result<String> {
518 let enhanced_query = self.parse_subscription_syntax(&query)?;
520
521 self.register_query(enhanced_query, metadata, channel).await
523 }
524
525 fn parse_subscription_syntax(&self, query: &str) -> Result<String> {
527 let mut enhanced_query = query.to_string();
528
529 if enhanced_query.to_lowercase().contains("subscribe") {
531 enhanced_query = enhanced_query.replace("SUBSCRIBE", "SELECT");
533 enhanced_query = enhanced_query.replace("subscribe", "SELECT");
534 }
535
536 if enhanced_query.to_lowercase().contains("on change") {
538 info!("Detected ON CHANGE clause in subscription query");
541 }
542
543 if enhanced_query.to_lowercase().contains("window") {
545 info!("Detected WINDOW clause in subscription query");
547 }
548
549 Ok(enhanced_query)
550 }
551
552 async fn start_query_execution(&self, query_id: &str) -> Result<()> {
554 let queries = self.queries.clone();
555 let store = self.store.clone();
556 let executor = self.executor.clone();
557 let dispatcher = self.dispatcher.clone();
558 let stats = self.stats.clone();
559 let event_notifier = self.event_notifier.clone();
560 let query_id = query_id.to_string();
561 let query_id_clone = query_id.clone();
562
563 tokio::spawn(async move {
564 let query_data = {
565 let queries_guard = queries.read().await;
566 queries_guard.get(&query_id_clone).map(|q| {
567 (
568 q.query.clone(),
569 q.metadata.clone(),
570 q.metadata.interval.unwrap_or(Duration::from_secs(60)),
571 )
572 })
573 };
574
575 if let Some((query, metadata, poll_interval)) = query_data {
576 let mut interval = interval(poll_interval);
577 let mut last_result_hash = None;
578
579 loop {
580 interval.tick().await;
581
582 let state = {
584 let queries_guard = queries.read().await;
585 queries_guard.get(&query_id_clone).map(|q| q.state.clone())
586 };
587
588 match state {
589 Some(QueryState::Active) => {
590 let start_time = Instant::now();
592
593 match Self::execute_query(
594 &store,
595 &executor,
596 &query,
597 &metadata,
598 last_result_hash.as_ref(),
599 )
600 .await
601 {
602 Ok((result, hash)) => {
603 let execution_time = start_time.elapsed();
604
605 {
607 let mut queries_guard = queries.write().await;
608 if let Some(q) = queries_guard.get_mut(&query_id_clone) {
609 q.stats.execution_count += 1;
610 q.stats.success_count += 1;
611 q.stats.total_results += result.bindings.len() as u64;
612 q.stats.last_execution_time = Some(execution_time);
613 q.last_execution = Some(Instant::now());
614
615 let count = q.stats.execution_count as u32;
617 q.stats.avg_execution_time =
618 (q.stats.avg_execution_time * (count - 1)
619 + execution_time)
620 / count;
621 }
622 }
623
624 if Some(&hash) != last_result_hash.as_ref() {
626 let update = QueryResultUpdate {
628 query_id: query_id_clone.clone(),
629 timestamp: chrono::Utc::now(),
630 update_type: if last_result_hash.is_none() {
631 UpdateType::Initial
632 } else {
633 UpdateType::Refresh
634 },
635 bindings: result.bindings.clone(),
636 triples: None,
637 metadata: HashMap::new(),
638 };
639
640 match Self::dispatch_results(
642 &queries,
643 &dispatcher,
644 &query_id_clone,
645 update,
646 )
647 .await
648 {
649 Err(e) => {
650 error!(
651 "Failed to dispatch results for query {}: {}",
652 query_id_clone, e
653 );
654 }
655 _ => {
656 let _ = event_notifier.send(
657 QueryEvent::ResultsDelivered {
658 id: query_id_clone.clone(),
659 count: result.bindings.len(),
660 },
661 );
662 }
663 }
664
665 last_result_hash = Some(hash);
666 }
667
668 stats.write().await.total_executions += 1;
670 }
671 Err(e) => {
672 error!("Query execution failed for {}: {}", query_id_clone, e);
673
674 {
676 let mut queries_guard = queries.write().await;
677 if let Some(q) = queries_guard.get_mut(&query_id_clone) {
678 q.stats.execution_count += 1;
679 q.stats.failure_count += 1;
680 }
681 }
682
683 stats.write().await.failed_executions += 1;
685
686 let update = QueryResultUpdate {
688 query_id: query_id_clone.clone(),
689 timestamp: chrono::Utc::now(),
690 update_type: UpdateType::Error {
691 message: e.to_string(),
692 },
693 bindings: vec![],
694 triples: None,
695 metadata: HashMap::new(),
696 };
697
698 let _ = Self::dispatch_results(
699 &queries,
700 &dispatcher,
701 &query_id_clone,
702 update,
703 )
704 .await;
705
706 let _ = event_notifier.send(QueryEvent::QueryFailed {
707 id: query_id_clone.clone(),
708 reason: e.to_string(),
709 });
710 }
711 }
712 }
713 Some(QueryState::Paused) => {
714 continue;
716 }
717 Some(QueryState::Stopped) | None => {
718 break;
720 }
721 Some(QueryState::Failed { .. }) => {
722 break;
724 }
725 }
726 }
727 }
728 });
729
730 let _ = self.event_notifier.send(QueryEvent::QueryStarted {
731 id: query_id.to_string(),
732 });
733
734 Ok(())
735 }
736
737 async fn execute_query(
739 store: &Arc<dyn RdfStore>,
740 executor: &Arc<QueryExecutor>,
741 query: &str,
742 metadata: &QueryMetadata,
743 _last_hash: Option<&String>,
744 ) -> Result<(QueryResult, String)> {
745 if metadata.enable_caching {
747 if let Some(cached) = executor.cache.read().await.get(query, metadata.cache_ttl) {
748 let hash = Self::hash_result(&cached);
749 return Ok((cached, hash));
750 }
751 }
752
753 let result = tokio::time::timeout(metadata.timeout, store.query(query))
755 .await
756 .map_err(|_| anyhow!("Query timeout"))?
757 .map_err(|e| anyhow!("Query execution failed: {}", e))?;
758
759 let result = if let Some(limit) = metadata.limit {
761 QueryResult {
762 bindings: result.bindings.into_iter().take(limit).collect(),
763 }
764 } else {
765 result
766 };
767
768 if metadata.enable_caching {
770 executor
771 .cache
772 .write()
773 .await
774 .put(query.to_string(), result.clone(), metadata.cache_ttl);
775 }
776
777 let hash = Self::hash_result(&result);
778 Ok((result, hash))
779 }
780
781 fn hash_result(result: &QueryResult) -> String {
783 use std::collections::hash_map::DefaultHasher;
784 use std::hash::{Hash, Hasher};
785
786 let mut hasher = DefaultHasher::new();
787 for binding in &result.bindings {
788 for (key, value) in binding {
789 key.hash(&mut hasher);
790 value.hash(&mut hasher);
791 }
792 }
793
794 hasher.finish().to_string()
795 }
796
797 async fn dispatch_results(
799 queries: &Arc<RwLock<HashMap<String, RegisteredQuery>>>,
800 dispatcher: &Arc<ResultDispatcher>,
801 query_id: &str,
802 update: QueryResultUpdate,
803 ) -> Result<()> {
804 let channel = {
805 let queries_guard = queries.read().await;
806 queries_guard
807 .get(query_id)
808 .map(|q| q.result_channel.clone())
809 .ok_or_else(|| anyhow!("Query not found"))?
810 };
811
812 match channel {
813 QueryResultChannel::Direct(sender) => sender
814 .send(update)
815 .await
816 .map_err(|_| anyhow!("Failed to send to direct channel")),
817 QueryResultChannel::Broadcast(sender) => {
818 sender
819 .send(update)
820 .map_err(|_| anyhow!("Failed to broadcast results"))?;
821 Ok(())
822 }
823 QueryResultChannel::Webhook { url, headers } => {
824 dispatcher.send_webhook(&url, &headers, update).await
825 }
826 QueryResultChannel::Stream { topic } => {
827 dispatcher.send_stream(&topic, update).await
829 }
830 }
831 }
832
833 pub async fn get_query_status(&self, query_id: &str) -> Result<QueryStatus> {
835 let queries = self.queries.read().await;
836 let query = queries
837 .get(query_id)
838 .ok_or_else(|| anyhow!("Query not found"))?;
839
840 Ok(QueryStatus {
841 id: query.id.clone(),
842 state: format!("{:?}", query.state),
843 created_at: query.created_at.elapsed(),
844 last_execution: query.last_execution.map(|t| t.elapsed()),
845 execution_count: query.stats.execution_count,
846 success_rate: if query.stats.execution_count > 0 {
847 query.stats.success_count as f64 / query.stats.execution_count as f64
848 } else {
849 0.0
850 },
851 total_results: query.stats.total_results,
852 avg_execution_time: query.stats.avg_execution_time,
853 })
854 }
855
856 pub async fn list_queries(&self) -> Vec<QueryInfo> {
858 let queries = self.queries.read().await;
859 queries
860 .values()
861 .map(|q| QueryInfo {
862 id: q.id.clone(),
863 name: q.metadata.name.clone(),
864 owner: q.metadata.owner.clone(),
865 state: format!("{:?}", q.state),
866 created_at: q.created_at.elapsed(),
867 })
868 .collect()
869 }
870
871 pub async fn get_stats(&self) -> QueryManagerStats {
873 self.stats.read().await.clone()
874 }
875
876 pub fn subscribe(&self) -> broadcast::Receiver<QueryEvent> {
878 self.event_notifier.subscribe()
879 }
880}
881
882#[derive(Debug, Clone, Serialize, Deserialize)]
884pub struct QueryStatus {
885 pub id: String,
886 pub state: String,
887 pub created_at: Duration,
888 pub last_execution: Option<Duration>,
889 pub execution_count: u64,
890 pub success_rate: f64,
891 pub total_results: u64,
892 pub avg_execution_time: Duration,
893}
894
895#[derive(Debug, Clone, Serialize, Deserialize)]
897pub struct QueryInfo {
898 pub id: String,
899 pub name: Option<String>,
900 pub owner: Option<String>,
901 pub state: String,
902 pub created_at: Duration,
903}
904
905impl QueryOptimizer {
906 fn new() -> Self {
908 let mut optimizer = Self {
909 rules: Vec::new(),
910 patterns: HashMap::new(),
911 };
912
913 optimizer.add_default_rules();
915 optimizer
916 }
917
918 fn add_default_rules(&mut self) {
920 self.rules.push(OptimizationRule {
922 name: "remove-redundant-distinct".to_string(),
923 condition: Box::new(|query| query.contains("DISTINCT") && !query.contains("ORDER BY")),
924 transform: Box::new(|query| {
925 query.to_string()
927 }),
928 });
929
930 self.rules.push(OptimizationRule {
932 name: "optimize-filter-placement".to_string(),
933 condition: Box::new(|query| query.contains("FILTER") && query.contains("OPTIONAL")),
934 transform: Box::new(|query| {
935 query.to_string()
937 }),
938 });
939 }
940
941 async fn optimize(&self, query: &str) -> String {
943 let mut optimized = query.to_string();
944
945 for rule in &self.rules {
947 if (rule.condition)(&optimized) {
948 optimized = (rule.transform)(&optimized);
949 debug!("Applied optimization rule: {}", rule.name);
950 }
951 }
952
953 optimized
954 }
955}
956
957impl ResultCache {
958 fn new(limit: usize) -> Self {
960 Self {
961 cache: HashMap::new(),
962 size: 0,
963 limit,
964 }
965 }
966
967 fn get(&self, query: &str, _ttl: Duration) -> Option<QueryResult> {
969 self.cache.get(query).and_then(|cached| {
970 if cached.cached_at.elapsed() < cached.ttl {
971 Some(cached.data.clone())
972 } else {
973 None
974 }
975 })
976 }
977
978 fn put(&mut self, query: String, result: QueryResult, ttl: Duration) {
980 let size = result.bindings.len();
981
982 while self.size + size > self.limit && !self.cache.is_empty() {
984 if let Some((oldest_key, _)) = self.cache.iter().min_by_key(|(_, v)| v.cached_at) {
986 let key = oldest_key.clone();
987 if let Some(removed) = self.cache.remove(&key) {
988 self.size -= removed.data.bindings.len();
989 }
990 }
991 }
992
993 self.cache.insert(
994 query,
995 CachedResult {
996 data: result,
997 cached_at: Instant::now(),
998 ttl,
999 access_count: 0,
1000 },
1001 );
1002
1003 self.size += size;
1004 }
1005}
1006
1007impl ResultDispatcher {
1008 async fn create_stream_producer_for_topic(&self, topic: &str) -> Result<crate::StreamProducer> {
1010 let config = crate::StreamConfig {
1012 backend: crate::StreamBackendType::Memory {
1013 max_size: Some(10000),
1014 persistence: false,
1015 },
1016 topic: topic.to_string(),
1017 batch_size: 100,
1018 flush_interval_ms: 100,
1019 max_connections: 10,
1020 connection_timeout: Duration::from_secs(30),
1021 enable_compression: false,
1022 compression_type: crate::CompressionType::None,
1023 retry_config: crate::RetryConfig::default(),
1024 circuit_breaker: crate::CircuitBreakerConfig::default(),
1025 security: crate::SecurityConfig::default(),
1026 performance: crate::StreamPerformanceConfig::default(),
1027 monitoring: crate::MonitoringConfig::default(),
1028 };
1029
1030 crate::StreamProducer::new(config).await
1032 }
1033 async fn send_webhook(
1035 &self,
1036 url: &str,
1037 headers: &HashMap<String, String>,
1038 update: QueryResultUpdate,
1039 ) -> Result<()> {
1040 let mut request = self
1041 .webhook_client
1042 .post(url)
1043 .json(&update)
1044 .timeout(Duration::from_secs(30));
1045
1046 for (key, value) in headers {
1048 request = request.header(key, value);
1049 }
1050
1051 let mut attempts = 0;
1053 let mut delay = self.retry_config.initial_delay;
1054
1055 loop {
1056 attempts += 1;
1057
1058 match request
1059 .try_clone()
1060 .expect("request should be cloneable for retry")
1061 .send()
1062 .await
1063 {
1064 Ok(response) => {
1065 if response.status().is_success() {
1066 return Ok(());
1067 } else {
1068 let status = response.status();
1069 let body = response.text().await.unwrap_or_default();
1070
1071 if attempts >= self.retry_config.max_attempts {
1072 return Err(anyhow!("Webhook failed with status {}: {}", status, body));
1073 }
1074
1075 warn!("Webhook attempt {} failed with status {}", attempts, status);
1076 }
1077 }
1078 Err(e) => {
1079 if attempts >= self.retry_config.max_attempts {
1080 return Err(anyhow!("Webhook failed after {} attempts: {}", attempts, e));
1081 }
1082
1083 warn!("Webhook attempt {} failed: {}", attempts, e);
1084 }
1085 }
1086
1087 tokio::time::sleep(delay).await;
1089
1090 if self.retry_config.exponential_backoff {
1092 delay = (delay * 2).min(self.retry_config.max_delay);
1093 }
1094 }
1095 }
1096
1097 async fn send_stream(&self, topic: &str, update: QueryResultUpdate) -> Result<()> {
1099 let stream_event = match update.update_type {
1101 UpdateType::Added => StreamEvent::QueryResultAdded {
1102 query_id: update.query_id.clone(),
1103 result: crate::event::QueryResult {
1104 query_id: update.query_id.clone(),
1105 bindings: update.bindings.first().cloned().unwrap_or_default(),
1106 execution_time: Duration::from_millis(0),
1107 },
1108 metadata: EventMetadata {
1109 event_id: uuid::Uuid::new_v4().to_string(),
1110 timestamp: chrono::Utc::now(),
1111 source: "sparql-streaming".to_string(),
1112 user: Some("query-engine".to_string()),
1113 context: Some(update.query_id.clone()),
1114 caused_by: None,
1115 version: "1.0".to_string(),
1116 properties: {
1117 let mut props = std::collections::HashMap::new();
1118 props.insert("topic".to_string(), topic.to_string());
1119 props.insert("update_type".to_string(), "result_added".to_string());
1120 props
1121 },
1122 checksum: None,
1123 },
1124 },
1125 UpdateType::Removed => StreamEvent::QueryResultRemoved {
1126 query_id: update.query_id.clone(),
1127 result: crate::event::QueryResult {
1128 query_id: update.query_id.clone(),
1129 bindings: update.bindings.first().cloned().unwrap_or_default(),
1130 execution_time: Duration::from_millis(0),
1131 },
1132 metadata: EventMetadata {
1133 event_id: uuid::Uuid::new_v4().to_string(),
1134 timestamp: chrono::Utc::now(),
1135 source: "sparql-streaming".to_string(),
1136 user: Some("query-engine".to_string()),
1137 context: Some(update.query_id.clone()),
1138 caused_by: None,
1139 version: "1.0".to_string(),
1140 properties: {
1141 let mut props = std::collections::HashMap::new();
1142 props.insert("topic".to_string(), topic.to_string());
1143 props.insert("update_type".to_string(), "result_removed".to_string());
1144 props
1145 },
1146 checksum: None,
1147 },
1148 },
1149 UpdateType::Initial | UpdateType::Refresh => {
1150 StreamEvent::QueryResultAdded {
1152 query_id: update.query_id.clone(),
1153 result: crate::event::QueryResult {
1154 query_id: update.query_id.clone(),
1155 bindings: update.bindings.first().cloned().unwrap_or_default(),
1156 execution_time: Duration::from_millis(0), },
1158 metadata: EventMetadata {
1159 event_id: uuid::Uuid::new_v4().to_string(),
1160 timestamp: chrono::Utc::now(),
1161 source: "sparql-streaming".to_string(),
1162 user: Some("query-engine".to_string()),
1163 context: Some(update.query_id.clone()),
1164 caused_by: None,
1165 version: "1.0".to_string(),
1166 properties: {
1167 let mut props = std::collections::HashMap::new();
1168 props.insert("topic".to_string(), topic.to_string());
1169 props.insert(
1170 "update_type".to_string(),
1171 format!("{:?}", update.update_type).to_lowercase(),
1172 );
1173 props
1174 },
1175 checksum: None,
1176 },
1177 }
1178 }
1179 UpdateType::Error { message } => {
1180 warn!("Query error in stream: {}", message);
1182 return Ok(());
1183 }
1184 };
1185
1186 match self.create_stream_producer_for_topic(topic).await {
1188 Ok(mut producer) => match producer.publish(stream_event).await {
1189 Ok(_) => {
1190 info!(
1191 "Successfully published query result to stream topic '{}'",
1192 topic
1193 );
1194 }
1195 Err(e) => {
1196 error!("Failed to publish to stream topic '{}': {}", topic, e);
1197 return Err(anyhow!("Stream publishing failed: {}", e));
1198 }
1199 },
1200 Err(e) => {
1201 error!(
1202 "Failed to create stream producer for topic '{}': {}",
1203 topic, e
1204 );
1205 return Err(anyhow!("Stream producer creation failed: {}", e));
1206 }
1207 }
1208
1209 Ok(())
1210 }
1211}
1212
1213pub fn create_subscription_channel() -> (
1215 mpsc::Sender<QueryResultUpdate>,
1216 mpsc::Receiver<QueryResultUpdate>,
1217) {
1218 mpsc::channel(100)
1219}
1220
1221pub fn create_broadcast_channel() -> (
1223 broadcast::Sender<QueryResultUpdate>,
1224 broadcast::Receiver<QueryResultUpdate>,
1225) {
1226 broadcast::channel(100)
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231 use super::*;
1232 use crate::store_integration::tests::MockRdfStore;
1233
1234 #[tokio::test]
1235 async fn test_query_registration() {
1236 let store = Arc::new(MockRdfStore {
1237 log_position: Arc::new(RwLock::new(0)),
1238 changes: Arc::new(RwLock::new(vec![])),
1239 });
1240
1241 let manager = ContinuousQueryManager::new(store, QueryManagerConfig::default())
1242 .await
1243 .unwrap();
1244
1245 let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10";
1246 let metadata = QueryMetadata::default();
1247 let (tx, _rx) = create_subscription_channel();
1248 let channel = QueryResultChannel::Direct(tx);
1249
1250 let query_id = manager
1251 .register_query(query.to_string(), metadata, channel)
1252 .await
1253 .unwrap();
1254
1255 assert!(!query_id.is_empty());
1256
1257 let queries = manager.list_queries().await;
1259 assert_eq!(queries.len(), 1);
1260 assert_eq!(queries[0].id, query_id);
1261 }
1262
1263 #[tokio::test]
1264 async fn test_query_validation() {
1265 let store = Arc::new(MockRdfStore {
1266 log_position: Arc::new(RwLock::new(0)),
1267 changes: Arc::new(RwLock::new(vec![])),
1268 });
1269
1270 let manager = ContinuousQueryManager::new(store, QueryManagerConfig::default())
1271 .await
1272 .unwrap();
1273
1274 let invalid_query = "DELETE WHERE { ?s ?p ?o }";
1276 let result = manager.validate_query(invalid_query);
1277 assert!(result.is_err());
1278
1279 let valid_query = "SELECT ?s WHERE { ?s ?p ?o }";
1281 let result = manager.validate_query(valid_query);
1282 assert!(result.is_ok());
1283 }
1284}