1use anyhow::{anyhow, Result};
45use chrono::{DateTime, Duration as ChronoDuration, Utc};
46use serde::{Deserialize, Serialize};
47use std::collections::{HashMap, VecDeque};
48use std::sync::Arc;
49use std::time::Duration;
50use tokio::sync::RwLock;
51use tracing::{debug, info, warn};
52
53use crate::{
54 sparql_streaming::{ContinuousQueryManager, QueryMetadata, QueryResultChannel},
55 store_integration::{RdfStore, Triple},
56 StreamEvent,
57};
58
59#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
61pub enum RspLanguage {
62 CSparql,
64 Cqels,
66 SparqlStream,
68}
69
70impl std::fmt::Display for RspLanguage {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 RspLanguage::CSparql => write!(f, "C-SPARQL"),
74 RspLanguage::Cqels => write!(f, "CQELS"),
75 RspLanguage::SparqlStream => write!(f, "SPARQL-Stream"),
76 }
77 }
78}
79
80pub struct RspProcessor {
82 streams: Arc<RwLock<HashMap<String, StreamDescriptor>>>,
84 windows: Arc<RwLock<HashMap<String, WindowManager>>>,
86 query_manager: Arc<ContinuousQueryManager>,
88 store: Arc<dyn RdfStore>,
90 config: RspConfig,
92}
93
94#[derive(Debug, Clone)]
96pub struct RspConfig {
97 pub default_window_size: ChronoDuration,
99 pub default_window_slide: ChronoDuration,
101 pub max_window_size: ChronoDuration,
103 pub enable_incremental_eval: bool,
105 pub max_concurrent_windows: usize,
107 pub cleanup_interval: Duration,
109}
110
111impl Default for RspConfig {
112 fn default() -> Self {
113 Self {
114 default_window_size: ChronoDuration::minutes(5),
115 default_window_slide: ChronoDuration::minutes(1),
116 max_window_size: ChronoDuration::hours(24),
117 enable_incremental_eval: true,
118 max_concurrent_windows: 1000,
119 cleanup_interval: Duration::from_secs(60),
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamDescriptor {
127 pub uri: String,
129 pub name: String,
131 pub schema: Option<String>,
133 pub window: Option<WindowConfig>,
135 pub metadata: HashMap<String, String>,
137 pub created_at: DateTime<Utc>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct WindowConfig {
144 pub window_type: WindowType,
146 pub size: WindowSize,
148 pub slide: Option<WindowSize>,
150 pub start_time: Option<DateTime<Utc>>,
152 pub end_time: Option<DateTime<Utc>>,
154}
155
156#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
158pub enum WindowType {
159 Tumbling,
161 Sliding,
163 Landmark,
165 Session { gap: ChronoDuration },
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub enum WindowSize {
172 Time(ChronoDuration),
174 Triples(usize),
176 Logical(usize),
178}
179
180pub struct WindowManager {
182 stream_uri: String,
184 config: WindowConfig,
186 windows: VecDeque<Window>,
188 stats: WindowStats,
190}
191
192#[derive(Debug, Clone)]
194pub struct Window {
195 pub id: String,
197 pub start: DateTime<Utc>,
199 pub end: DateTime<Utc>,
201 pub triples: Vec<Triple>,
203 pub materialized: bool,
205}
206
207#[derive(Debug, Clone, Default)]
209pub struct WindowStats {
210 pub windows_created: u64,
212 pub windows_closed: u64,
214 pub active_windows: usize,
216 pub triples_processed: u64,
218 pub avg_window_size: f64,
220}
221
222#[derive(Debug, Clone)]
224pub struct RspQuery {
225 pub language: RspLanguage,
227 pub original: String,
229 pub streams: Vec<StreamClause>,
231 pub windows: Vec<WindowConfig>,
233 pub base_query: String,
235 pub metadata: QueryMetadata,
237}
238
239#[derive(Debug, Clone)]
241pub struct StreamClause {
242 pub uri: String,
244 pub window: Option<WindowConfig>,
246 pub graph: Option<String>,
248}
249
250impl RspProcessor {
251 pub async fn new(
253 store: Arc<dyn RdfStore>,
254 query_manager: Arc<ContinuousQueryManager>,
255 config: RspConfig,
256 ) -> Result<Self> {
257 Ok(Self {
258 streams: Arc::new(RwLock::new(HashMap::new())),
259 windows: Arc::new(RwLock::new(HashMap::new())),
260 query_manager,
261 store,
262 config,
263 })
264 }
265
266 pub async fn register_stream(&self, descriptor: StreamDescriptor) -> Result<()> {
268 let uri = descriptor.uri.clone();
269
270 if let Some(window_config) = &descriptor.window {
272 let manager = WindowManager {
273 stream_uri: uri.clone(),
274 config: window_config.clone(),
275 windows: VecDeque::new(),
276 stats: WindowStats::default(),
277 };
278
279 self.windows.write().await.insert(uri.clone(), manager);
280 }
281
282 self.streams.write().await.insert(uri.clone(), descriptor);
283
284 info!("Registered RSP stream: {}", uri);
285 Ok(())
286 }
287
288 pub async fn unregister_stream(&self, uri: &str) -> Result<()> {
290 self.streams
291 .write()
292 .await
293 .remove(uri)
294 .ok_or_else(|| anyhow!("Stream not found: {}", uri))?;
295
296 self.windows.write().await.remove(uri);
297
298 info!("Unregistered RSP stream: {}", uri);
299 Ok(())
300 }
301
302 pub fn parse_query(&self, query: &str) -> Result<RspQuery> {
304 let language = self.detect_language(query)?;
306
307 let streams = self.extract_stream_clauses(query)?;
309
310 let windows = self.extract_windows(query)?;
312
313 let base_query = self.generate_base_query(query, &streams, &windows)?;
315
316 Ok(RspQuery {
317 language,
318 original: query.to_string(),
319 streams,
320 windows,
321 base_query,
322 metadata: QueryMetadata::default(),
323 })
324 }
325
326 pub async fn execute_query(&self, query: &str, channel: QueryResultChannel) -> Result<String> {
328 let rsp_query = self.parse_query(query)?;
330
331 let streams_guard = self.streams.read().await;
333 for stream_clause in &rsp_query.streams {
334 if !streams_guard.contains_key(&stream_clause.uri) {
335 return Err(anyhow!("Stream not registered: {}", stream_clause.uri));
336 }
337 }
338 drop(streams_guard);
339
340 let query_id = self
342 .query_manager
343 .register_query(rsp_query.base_query.clone(), rsp_query.metadata, channel)
344 .await?;
345
346 info!(
347 "Registered RSP query ({}): {}",
348 rsp_query.language, query_id
349 );
350
351 Ok(query_id)
352 }
353
354 pub async fn process_event(&self, stream_uri: &str, event: &StreamEvent) -> Result<()> {
356 let triples = self.extract_triples_from_event(event)?;
358
359 if triples.is_empty() {
360 return Ok(());
361 }
362
363 let mut windows_guard = self.windows.write().await;
365
366 if let Some(manager) = windows_guard.get_mut(stream_uri) {
367 self.add_to_windows(manager, triples).await?;
369
370 manager.stats.triples_processed += 1;
372
373 debug!(
374 "Processed event for stream {}: {} active windows",
375 stream_uri, manager.stats.active_windows
376 );
377 }
378
379 Ok(())
380 }
381
382 pub fn detect_language(&self, query: &str) -> Result<RspLanguage> {
384 Self::detect_language_static(query)
385 }
386
387 fn detect_language_static(query: &str) -> Result<RspLanguage> {
389 let query_upper = query.to_uppercase();
390
391 if query_upper.contains("FROM STREAM") || query_upper.contains("[RANGE") {
392 Ok(RspLanguage::CSparql)
393 } else if query_upper.contains("[NOW-") || query_upper.contains("TO NOW") {
394 Ok(RspLanguage::Cqels)
395 } else if query_upper.contains("STREAM") || query_upper.contains("WINDOW") {
396 Ok(RspLanguage::SparqlStream)
397 } else {
398 Err(anyhow!("Query does not appear to contain RSP extensions"))
399 }
400 }
401
402 fn extract_stream_clauses(&self, query: &str) -> Result<Vec<StreamClause>> {
404 let mut clauses = Vec::new();
405
406 let lines = query.lines();
408
409 for line in lines {
410 if line.to_uppercase().contains("FROM STREAM") {
411 if let Some(start) = line.find('<') {
413 if let Some(end) = line[start..].find('>') {
414 let uri = line[start + 1..start + end].to_string();
415
416 let window = if line.contains('[') {
418 self.parse_window_spec(line).ok()
419 } else {
420 None
421 };
422
423 clauses.push(StreamClause {
424 uri,
425 window,
426 graph: None,
427 });
428 }
429 }
430 }
431 }
432
433 if clauses.is_empty() {
434 warn!("No explicit FROM STREAM clauses found");
436 }
437
438 Ok(clauses)
439 }
440
441 fn extract_windows(&self, query: &str) -> Result<Vec<WindowConfig>> {
443 let mut windows = Vec::new();
444
445 for line in query.lines() {
447 if line.contains('[') && line.contains(']') {
448 if let Ok(window) = self.parse_window_spec(line) {
449 windows.push(window);
450 }
451 }
452 }
453
454 Ok(windows)
455 }
456
457 fn parse_window_spec(&self, spec: &str) -> Result<WindowConfig> {
459 let spec_upper = spec.to_uppercase();
460
461 if spec_upper.contains("RANGE") {
463 let size = self.parse_duration(spec, "RANGE")?;
464 let slide = if spec_upper.contains("STEP") {
465 Some(self.parse_duration(spec, "STEP")?)
466 } else {
467 None
468 };
469
470 return Ok(WindowConfig {
471 window_type: if slide.is_some() {
472 WindowType::Sliding
473 } else {
474 WindowType::Tumbling
475 },
476 size: WindowSize::Time(size),
477 slide: slide.map(WindowSize::Time),
478 start_time: None,
479 end_time: None,
480 });
481 }
482
483 if spec_upper.contains("NOW-") && spec_upper.contains("TO NOW") {
485 if let Some(start_idx) = spec.find("NOW-") {
486 let duration_str = &spec[start_idx + 4..];
487 if let Some(end_idx) = duration_str.find("TO") {
488 let duration_str = duration_str[..end_idx].trim();
489 let size = self.parse_duration_string(duration_str)?;
490
491 return Ok(WindowConfig {
492 window_type: WindowType::Sliding,
493 size: WindowSize::Time(size),
494 slide: Some(WindowSize::Time(ChronoDuration::seconds(1))),
495 start_time: Some(Utc::now() - size),
496 end_time: Some(Utc::now()),
497 });
498 }
499 }
500 }
501
502 if spec_upper.contains("TRIPLES") {
504 if let Some(start) = spec.find("TRIPLES") {
505 let num_str = &spec[start + 7..].trim();
506 if let Some(end) = num_str.find(|c: char| !c.is_numeric()) {
507 let count: usize = num_str[..end].parse()?;
508 return Ok(WindowConfig {
509 window_type: WindowType::Tumbling,
510 size: WindowSize::Triples(count),
511 slide: None,
512 start_time: None,
513 end_time: None,
514 });
515 }
516 }
517 }
518
519 Err(anyhow!("Unable to parse window specification: {}", spec))
520 }
521
522 fn parse_duration(&self, spec: &str, keyword: &str) -> Result<ChronoDuration> {
524 if let Some(start) = spec.to_uppercase().find(keyword) {
525 let duration_str = &spec[start + keyword.len()..].trim();
526
527 let duration_str = duration_str
529 .split_whitespace()
530 .next()
531 .unwrap_or("")
532 .replace(']', "");
533
534 self.parse_duration_string(&duration_str)
535 } else {
536 Err(anyhow!("Keyword not found: {}", keyword))
537 }
538 }
539
540 pub fn parse_duration_string(&self, s: &str) -> Result<ChronoDuration> {
542 Self::parse_duration_string_static(s)
543 }
544
545 fn parse_duration_string_static(s: &str) -> Result<ChronoDuration> {
547 let s = s.trim().to_lowercase();
548
549 if s.is_empty() {
550 return Err(anyhow!("Empty duration string"));
551 }
552
553 let num_end = s.chars().position(|c| !c.is_numeric()).unwrap_or(s.len());
555 let num: i64 = s[..num_end].parse()?;
556 let unit = &s[num_end..];
557
558 match unit {
559 "s" | "sec" | "second" | "seconds" => Ok(ChronoDuration::seconds(num)),
560 "m" | "min" | "minute" | "minutes" => Ok(ChronoDuration::minutes(num)),
561 "h" | "hr" | "hour" | "hours" => Ok(ChronoDuration::hours(num)),
562 "d" | "day" | "days" => Ok(ChronoDuration::days(num)),
563 _ => Err(anyhow!("Unknown duration unit: {}", unit)),
564 }
565 }
566
567 fn generate_base_query(
569 &self,
570 original: &str,
571 streams: &[StreamClause],
572 _windows: &[WindowConfig],
573 ) -> Result<String> {
574 let mut base_query = original.to_string();
575
576 for stream in streams {
578 base_query = base_query.replace(&format!("FROM STREAM <{}>", stream.uri), "");
580
581 if let Some(start) = base_query.find('[') {
583 if let Some(end) = base_query[start..].find(']') {
584 base_query.replace_range(start..start + end + 1, "");
585 }
586 }
587 }
588
589 base_query = base_query
591 .lines()
592 .filter(|line| !line.trim().is_empty())
593 .collect::<Vec<_>>()
594 .join("\n");
595
596 Ok(base_query)
597 }
598
599 fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
601 match event {
602 StreamEvent::TripleAdded {
603 subject,
604 predicate,
605 object,
606 graph,
607 metadata: _,
608 } => Ok(vec![Triple {
609 subject: subject.clone(),
610 predicate: predicate.clone(),
611 object: object.clone(),
612 graph: graph.clone(),
613 }]),
614 StreamEvent::QuadAdded {
615 subject,
616 predicate,
617 object,
618 graph,
619 metadata: _,
620 } => Ok(vec![Triple {
621 subject: subject.clone(),
622 predicate: predicate.clone(),
623 object: object.clone(),
624 graph: Some(graph.clone()),
625 }]),
626 _ => Ok(vec![]),
627 }
628 }
629
630 async fn add_to_windows(
632 &self,
633 manager: &mut WindowManager,
634 triples: Vec<Triple>,
635 ) -> Result<()> {
636 let now = Utc::now();
637
638 match &manager.config.window_type {
639 WindowType::Tumbling => {
640 if manager.windows.is_empty()
642 || self.window_is_full(
643 manager
644 .windows
645 .back()
646 .expect("windows validated to be non-empty via is_empty check"),
647 &manager.config,
648 )
649 {
650 self.create_new_window(manager, now).await?;
651 }
652
653 if let Some(window) = manager.windows.back_mut() {
655 window.triples.extend(triples);
656 }
657 }
658 WindowType::Sliding => {
659 self.cleanup_expired_windows(manager, now);
664
665 if manager.windows.is_empty() {
667 self.create_new_window(manager, now).await?;
668 }
669
670 for window in &mut manager.windows {
672 if now >= window.start && now <= window.end {
673 window.triples.extend(triples.clone());
674 }
675 }
676
677 if let Some(WindowSize::Time(slide)) = &manager.config.slide {
679 if let Some(last_window) = manager.windows.back() {
680 let next_start = last_window.start + *slide;
681 if now >= next_start {
682 self.create_new_window(manager, next_start).await?;
683 }
684 }
685 }
686 }
687 WindowType::Landmark => {
688 if manager.windows.is_empty() {
690 self.create_new_window(manager, now).await?;
691 }
692
693 if let Some(window) = manager.windows.front_mut() {
694 window.triples.extend(triples);
695 window.end = now;
696 }
697 }
698 WindowType::Session { gap } => {
699 if manager.windows.is_empty() {
701 self.create_new_window(manager, now).await?;
702 } else if let Some(last_window) = manager.windows.back_mut() {
703 if now - last_window.end <= *gap {
705 last_window.triples.extend(triples);
707 last_window.end = now;
708 } else {
709 self.create_new_window(manager, now).await?;
711 }
712 }
713 }
714 }
715
716 manager.stats.active_windows = manager.windows.len();
717 Ok(())
718 }
719
720 fn window_is_full(&self, window: &Window, config: &WindowConfig) -> bool {
722 match &config.size {
723 WindowSize::Time(duration) => {
724 let window_duration = window.end - window.start;
725 window_duration >= *duration
726 }
727 WindowSize::Triples(count) => window.triples.len() >= *count,
728 WindowSize::Logical(count) => window.triples.len() >= *count,
729 }
730 }
731
732 async fn create_new_window(
734 &self,
735 manager: &mut WindowManager,
736 start: DateTime<Utc>,
737 ) -> Result<()> {
738 let end = match &manager.config.size {
739 WindowSize::Time(duration) => start + *duration,
740 _ => start, };
742
743 let window = Window {
744 id: uuid::Uuid::new_v4().to_string(),
745 start,
746 end,
747 triples: Vec::new(),
748 materialized: false,
749 };
750
751 manager.windows.push_back(window);
752 manager.stats.windows_created += 1;
753
754 Ok(())
755 }
756
757 fn cleanup_expired_windows(&self, manager: &mut WindowManager, now: DateTime<Utc>) {
759 while let Some(window) = manager.windows.front() {
760 if window.end < now {
761 manager.windows.pop_front();
762 manager.stats.windows_closed += 1;
763 } else {
764 break;
765 }
766 }
767 }
768
769 pub async fn get_window_stats(&self, stream_uri: &str) -> Option<WindowStats> {
771 self.windows
772 .read()
773 .await
774 .get(stream_uri)
775 .map(|m| m.stats.clone())
776 }
777
778 pub async fn list_streams(&self) -> Vec<String> {
780 self.streams.read().await.keys().cloned().collect()
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787
788 #[test]
789 fn test_parse_duration() {
790 assert_eq!(
791 RspProcessor::parse_duration_string_static("5m").unwrap(),
792 ChronoDuration::minutes(5)
793 );
794 assert_eq!(
795 RspProcessor::parse_duration_string_static("1h").unwrap(),
796 ChronoDuration::hours(1)
797 );
798 assert_eq!(
799 RspProcessor::parse_duration_string_static("30s").unwrap(),
800 ChronoDuration::seconds(30)
801 );
802 }
803
804 #[test]
805 fn test_detect_language() {
806 let csparql_query = "SELECT * FROM STREAM <http://stream> [RANGE 5m]";
807 assert_eq!(
808 RspProcessor::detect_language_static(csparql_query).unwrap(),
809 RspLanguage::CSparql
810 );
811
812 let cqels_query = "SELECT * WHERE { ?s ?p ?o } [NOW-5m TO NOW]";
813 assert_eq!(
814 RspProcessor::detect_language_static(cqels_query).unwrap(),
815 RspLanguage::Cqels
816 );
817 }
818}