Skip to main content

oxirs_stream/
rsp.rs

1//! # RDF Stream Processing (RSP) Module
2//!
3//! This module implements C-SPARQL and CQELS extensions for streaming SPARQL queries.
4//!
5//! ## Supported Features
6//!
7//! ### C-SPARQL (Continuous SPARQL)
8//! - FROM STREAM clause for stream registration
9//! - WINDOW operators (RANGE, TRIPLES, TUMBLING, SLIDING)
10//! - Time-based windows with configurable ranges
11//! - Aggregations over windows
12//! - Stream-to-relation operators
13//!
14//! ### CQELS (Continuous Query Evaluation over Linked Streams)
15//! - Native stream processing semantics
16//! - Incremental query evaluation
17//! - Sliding window processing
18//! - Multi-stream joins with window constraints
19//!
20//! ## Example Queries
21//!
22//! ```sparql
23//! # C-SPARQL: Temperature average over 5-minute sliding window
24//! SELECT ?sensor (AVG(?temp) AS ?avgTemp)
25//! FROM STREAM <http://sensors.example/stream> [RANGE 5m STEP 1m]
26//! WHERE {
27//!     ?sensor rdf:type :TemperatureSensor .
28//!     ?sensor :temperature ?temp .
29//! }
30//! GROUP BY ?sensor
31//! ```
32//!
33//! ```sparql
34//! # CQELS: Pattern detection with windowing
35//! SELECT ?user ?action1 ?action2
36//! FROM STREAM <http://events.example/stream> [NOW-5m TO NOW]
37//! WHERE {
38//!     ?event1 :user ?user ; :action ?action1 ; :timestamp ?t1 .
39//!     ?event2 :user ?user ; :action ?action2 ; :timestamp ?t2 .
40//!     FILTER(?t2 > ?t1 && ?t2 - ?t1 < 60)
41//! }
42//! ```
43
44use anyhow::{anyhow, Result};
45use chrono::{DateTime, Duration as ChronoDuration, Utc};
46use serde::{Deserialize, Serialize};
47use std::collections::{HashMap, VecDeque};
48use std::sync::Arc;
49use std::time::Duration;
50use tokio::sync::RwLock;
51use tracing::{debug, info, warn};
52
53use crate::{
54    sparql_streaming::{ContinuousQueryManager, QueryMetadata, QueryResultChannel},
55    store_integration::{RdfStore, Triple},
56    StreamEvent,
57};
58
59/// RSP query language
60#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
61pub enum RspLanguage {
62    /// C-SPARQL (Continuous SPARQL)
63    CSparql,
64    /// CQELS (Continuous Query Evaluation over Linked Streams)
65    Cqels,
66    /// Standard SPARQL with streaming extensions
67    SparqlStream,
68}
69
70impl std::fmt::Display for RspLanguage {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            RspLanguage::CSparql => write!(f, "C-SPARQL"),
74            RspLanguage::Cqels => write!(f, "CQELS"),
75            RspLanguage::SparqlStream => write!(f, "SPARQL-Stream"),
76        }
77    }
78}
79
80/// RSP query parser and processor
81pub struct RspProcessor {
82    /// Registered streams
83    streams: Arc<RwLock<HashMap<String, StreamDescriptor>>>,
84    /// Active windows
85    windows: Arc<RwLock<HashMap<String, WindowManager>>>,
86    /// Query manager for execution
87    query_manager: Arc<ContinuousQueryManager>,
88    /// RDF store
89    store: Arc<dyn RdfStore>,
90    /// Configuration
91    config: RspConfig,
92}
93
94/// RSP configuration
95#[derive(Debug, Clone)]
96pub struct RspConfig {
97    /// Default window size
98    pub default_window_size: ChronoDuration,
99    /// Default window slide
100    pub default_window_slide: ChronoDuration,
101    /// Maximum window size
102    pub max_window_size: ChronoDuration,
103    /// Enable incremental evaluation
104    pub enable_incremental_eval: bool,
105    /// Maximum concurrent windows
106    pub max_concurrent_windows: usize,
107    /// Window cleanup interval
108    pub cleanup_interval: Duration,
109}
110
111impl Default for RspConfig {
112    fn default() -> Self {
113        Self {
114            default_window_size: ChronoDuration::minutes(5),
115            default_window_slide: ChronoDuration::minutes(1),
116            max_window_size: ChronoDuration::hours(24),
117            enable_incremental_eval: true,
118            max_concurrent_windows: 1000,
119            cleanup_interval: Duration::from_secs(60),
120        }
121    }
122}
123
124/// Stream descriptor
125#[derive(Debug, Clone)]
126pub struct StreamDescriptor {
127    /// Stream URI
128    pub uri: String,
129    /// Stream name
130    pub name: String,
131    /// Schema (optional)
132    pub schema: Option<String>,
133    /// Window configuration
134    pub window: Option<WindowConfig>,
135    /// Stream metadata
136    pub metadata: HashMap<String, String>,
137    /// Creation timestamp
138    pub created_at: DateTime<Utc>,
139}
140
141/// Window configuration
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct WindowConfig {
144    /// Window type
145    pub window_type: WindowType,
146    /// Window size
147    pub size: WindowSize,
148    /// Window slide (for sliding windows)
149    pub slide: Option<WindowSize>,
150    /// Start time (for time-based windows)
151    pub start_time: Option<DateTime<Utc>>,
152    /// End time (for time-based windows)
153    pub end_time: Option<DateTime<Utc>>,
154}
155
156/// Window type
157#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
158pub enum WindowType {
159    /// Tumbling window (non-overlapping)
160    Tumbling,
161    /// Sliding window (overlapping)
162    Sliding,
163    /// Landmark window (from start to now)
164    Landmark,
165    /// Session window (activity-based)
166    Session { gap: ChronoDuration },
167}
168
169/// Window size specification
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub enum WindowSize {
172    /// Time-based (e.g., 5 minutes)
173    Time(ChronoDuration),
174    /// Triple count-based (e.g., 1000 triples)
175    Triples(usize),
176    /// Logical time-based (e.g., 100 events)
177    Logical(usize),
178}
179
180/// Window manager for a specific stream
181pub struct WindowManager {
182    /// Stream URI
183    stream_uri: String,
184    /// Window configuration
185    config: WindowConfig,
186    /// Current window contents
187    windows: VecDeque<Window>,
188    /// Statistics
189    stats: WindowStats,
190}
191
192/// Active window
193#[derive(Debug, Clone)]
194pub struct Window {
195    /// Window ID
196    pub id: String,
197    /// Window start time
198    pub start: DateTime<Utc>,
199    /// Window end time
200    pub end: DateTime<Utc>,
201    /// Triples in window
202    pub triples: Vec<Triple>,
203    /// Materialized as graph (for optimization)
204    pub materialized: bool,
205}
206
207/// Window statistics
208#[derive(Debug, Clone, Default)]
209pub struct WindowStats {
210    /// Total windows created
211    pub windows_created: u64,
212    /// Total windows closed
213    pub windows_closed: u64,
214    /// Active windows
215    pub active_windows: usize,
216    /// Total triples processed
217    pub triples_processed: u64,
218    /// Average window size
219    pub avg_window_size: f64,
220}
221
222/// Parsed RSP query
223#[derive(Debug, Clone)]
224pub struct RspQuery {
225    /// Query language
226    pub language: RspLanguage,
227    /// Original query string
228    pub original: String,
229    /// Parsed stream clauses
230    pub streams: Vec<StreamClause>,
231    /// Window specifications
232    pub windows: Vec<WindowConfig>,
233    /// Base SPARQL query (after stream/window extraction)
234    pub base_query: String,
235    /// Query metadata
236    pub metadata: QueryMetadata,
237}
238
239/// Stream clause (FROM STREAM)
240#[derive(Debug, Clone)]
241pub struct StreamClause {
242    /// Stream URI
243    pub uri: String,
244    /// Window specification
245    pub window: Option<WindowConfig>,
246    /// Named graph (if any)
247    pub graph: Option<String>,
248}
249
250impl RspProcessor {
251    /// Create a new RSP processor
252    pub async fn new(
253        store: Arc<dyn RdfStore>,
254        query_manager: Arc<ContinuousQueryManager>,
255        config: RspConfig,
256    ) -> Result<Self> {
257        Ok(Self {
258            streams: Arc::new(RwLock::new(HashMap::new())),
259            windows: Arc::new(RwLock::new(HashMap::new())),
260            query_manager,
261            store,
262            config,
263        })
264    }
265
266    /// Register a stream
267    pub async fn register_stream(&self, descriptor: StreamDescriptor) -> Result<()> {
268        let uri = descriptor.uri.clone();
269
270        // Create window manager if window config provided
271        if let Some(window_config) = &descriptor.window {
272            let manager = WindowManager {
273                stream_uri: uri.clone(),
274                config: window_config.clone(),
275                windows: VecDeque::new(),
276                stats: WindowStats::default(),
277            };
278
279            self.windows.write().await.insert(uri.clone(), manager);
280        }
281
282        self.streams.write().await.insert(uri.clone(), descriptor);
283
284        info!("Registered RSP stream: {}", uri);
285        Ok(())
286    }
287
288    /// Unregister a stream
289    pub async fn unregister_stream(&self, uri: &str) -> Result<()> {
290        self.streams
291            .write()
292            .await
293            .remove(uri)
294            .ok_or_else(|| anyhow!("Stream not found: {}", uri))?;
295
296        self.windows.write().await.remove(uri);
297
298        info!("Unregistered RSP stream: {}", uri);
299        Ok(())
300    }
301
302    /// Parse an RSP query
303    pub fn parse_query(&self, query: &str) -> Result<RspQuery> {
304        // Detect query language
305        let language = self.detect_language(query)?;
306
307        // Extract stream clauses
308        let streams = self.extract_stream_clauses(query)?;
309
310        // Extract window specifications
311        let windows = self.extract_windows(query)?;
312
313        // Generate base SPARQL query (remove RSP extensions)
314        let base_query = self.generate_base_query(query, &streams, &windows)?;
315
316        Ok(RspQuery {
317            language,
318            original: query.to_string(),
319            streams,
320            windows,
321            base_query,
322            metadata: QueryMetadata::default(),
323        })
324    }
325
326    /// Register and execute an RSP query
327    pub async fn execute_query(&self, query: &str, channel: QueryResultChannel) -> Result<String> {
328        // Parse RSP query
329        let rsp_query = self.parse_query(query)?;
330
331        // Validate streams exist
332        let streams_guard = self.streams.read().await;
333        for stream_clause in &rsp_query.streams {
334            if !streams_guard.contains_key(&stream_clause.uri) {
335                return Err(anyhow!("Stream not registered: {}", stream_clause.uri));
336            }
337        }
338        drop(streams_guard);
339
340        // Register with continuous query manager
341        let query_id = self
342            .query_manager
343            .register_query(rsp_query.base_query.clone(), rsp_query.metadata, channel)
344            .await?;
345
346        info!(
347            "Registered RSP query ({}): {}",
348            rsp_query.language, query_id
349        );
350
351        Ok(query_id)
352    }
353
354    /// Process a stream event (add to windows)
355    pub async fn process_event(&self, stream_uri: &str, event: &StreamEvent) -> Result<()> {
356        // Extract triples from event
357        let triples = self.extract_triples_from_event(event)?;
358
359        if triples.is_empty() {
360            return Ok(());
361        }
362
363        // Get window manager for stream
364        let mut windows_guard = self.windows.write().await;
365
366        if let Some(manager) = windows_guard.get_mut(stream_uri) {
367            // Add triples to appropriate windows
368            self.add_to_windows(manager, triples).await?;
369
370            // Update statistics
371            manager.stats.triples_processed += 1;
372
373            debug!(
374                "Processed event for stream {}: {} active windows",
375                stream_uri, manager.stats.active_windows
376            );
377        }
378
379        Ok(())
380    }
381
382    /// Detect RSP query language
383    pub fn detect_language(&self, query: &str) -> Result<RspLanguage> {
384        Self::detect_language_static(query)
385    }
386
387    /// Detect RSP query language (static version for testing)
388    fn detect_language_static(query: &str) -> Result<RspLanguage> {
389        let query_upper = query.to_uppercase();
390
391        if query_upper.contains("FROM STREAM") || query_upper.contains("[RANGE") {
392            Ok(RspLanguage::CSparql)
393        } else if query_upper.contains("[NOW-") || query_upper.contains("TO NOW") {
394            Ok(RspLanguage::Cqels)
395        } else if query_upper.contains("STREAM") || query_upper.contains("WINDOW") {
396            Ok(RspLanguage::SparqlStream)
397        } else {
398            Err(anyhow!("Query does not appear to contain RSP extensions"))
399        }
400    }
401
402    /// Extract FROM STREAM clauses
403    fn extract_stream_clauses(&self, query: &str) -> Result<Vec<StreamClause>> {
404        let mut clauses = Vec::new();
405
406        // Simple regex-based extraction (in production, use proper SPARQL parser)
407        let lines = query.lines();
408
409        for line in lines {
410            if line.to_uppercase().contains("FROM STREAM") {
411                // Extract stream URI
412                if let Some(start) = line.find('<') {
413                    if let Some(end) = line[start..].find('>') {
414                        let uri = line[start + 1..start + end].to_string();
415
416                        // Extract window if present
417                        let window = if line.contains('[') {
418                            self.parse_window_spec(line).ok()
419                        } else {
420                            None
421                        };
422
423                        clauses.push(StreamClause {
424                            uri,
425                            window,
426                            graph: None,
427                        });
428                    }
429                }
430            }
431        }
432
433        if clauses.is_empty() {
434            // Check for implicit streams in WHERE clause
435            warn!("No explicit FROM STREAM clauses found");
436        }
437
438        Ok(clauses)
439    }
440
441    /// Extract window specifications
442    fn extract_windows(&self, query: &str) -> Result<Vec<WindowConfig>> {
443        let mut windows = Vec::new();
444
445        // Look for window specifications
446        for line in query.lines() {
447            if line.contains('[') && line.contains(']') {
448                if let Ok(window) = self.parse_window_spec(line) {
449                    windows.push(window);
450                }
451            }
452        }
453
454        Ok(windows)
455    }
456
457    /// Parse window specification
458    fn parse_window_spec(&self, spec: &str) -> Result<WindowConfig> {
459        let spec_upper = spec.to_uppercase();
460
461        // C-SPARQL: [RANGE 5m STEP 1m]
462        if spec_upper.contains("RANGE") {
463            let size = self.parse_duration(spec, "RANGE")?;
464            let slide = if spec_upper.contains("STEP") {
465                Some(self.parse_duration(spec, "STEP")?)
466            } else {
467                None
468            };
469
470            return Ok(WindowConfig {
471                window_type: if slide.is_some() {
472                    WindowType::Sliding
473                } else {
474                    WindowType::Tumbling
475                },
476                size: WindowSize::Time(size),
477                slide: slide.map(WindowSize::Time),
478                start_time: None,
479                end_time: None,
480            });
481        }
482
483        // CQELS: [NOW-5m TO NOW]
484        if spec_upper.contains("NOW-") && spec_upper.contains("TO NOW") {
485            if let Some(start_idx) = spec.find("NOW-") {
486                let duration_str = &spec[start_idx + 4..];
487                if let Some(end_idx) = duration_str.find("TO") {
488                    let duration_str = duration_str[..end_idx].trim();
489                    let size = self.parse_duration_string(duration_str)?;
490
491                    return Ok(WindowConfig {
492                        window_type: WindowType::Sliding,
493                        size: WindowSize::Time(size),
494                        slide: Some(WindowSize::Time(ChronoDuration::seconds(1))),
495                        start_time: Some(Utc::now() - size),
496                        end_time: Some(Utc::now()),
497                    });
498                }
499            }
500        }
501
502        // Triples-based: [TRIPLES 1000]
503        if spec_upper.contains("TRIPLES") {
504            if let Some(start) = spec.find("TRIPLES") {
505                let num_str = &spec[start + 7..].trim();
506                if let Some(end) = num_str.find(|c: char| !c.is_numeric()) {
507                    let count: usize = num_str[..end].parse()?;
508                    return Ok(WindowConfig {
509                        window_type: WindowType::Tumbling,
510                        size: WindowSize::Triples(count),
511                        slide: None,
512                        start_time: None,
513                        end_time: None,
514                    });
515                }
516            }
517        }
518
519        Err(anyhow!("Unable to parse window specification: {}", spec))
520    }
521
522    /// Parse duration from spec
523    fn parse_duration(&self, spec: &str, keyword: &str) -> Result<ChronoDuration> {
524        if let Some(start) = spec.to_uppercase().find(keyword) {
525            let duration_str = &spec[start + keyword.len()..].trim();
526
527            // Extract duration string until next keyword or bracket
528            let duration_str = duration_str
529                .split_whitespace()
530                .next()
531                .unwrap_or("")
532                .replace(']', "");
533
534            self.parse_duration_string(&duration_str)
535        } else {
536            Err(anyhow!("Keyword not found: {}", keyword))
537        }
538    }
539
540    /// Parse duration string (e.g., "5m", "1h", "30s")
541    pub fn parse_duration_string(&self, s: &str) -> Result<ChronoDuration> {
542        Self::parse_duration_string_static(s)
543    }
544
545    /// Parse duration string (static version for testing)
546    fn parse_duration_string_static(s: &str) -> Result<ChronoDuration> {
547        let s = s.trim().to_lowercase();
548
549        if s.is_empty() {
550            return Err(anyhow!("Empty duration string"));
551        }
552
553        // Extract number and unit
554        let num_end = s.chars().position(|c| !c.is_numeric()).unwrap_or(s.len());
555        let num: i64 = s[..num_end].parse()?;
556        let unit = &s[num_end..];
557
558        match unit {
559            "s" | "sec" | "second" | "seconds" => Ok(ChronoDuration::seconds(num)),
560            "m" | "min" | "minute" | "minutes" => Ok(ChronoDuration::minutes(num)),
561            "h" | "hr" | "hour" | "hours" => Ok(ChronoDuration::hours(num)),
562            "d" | "day" | "days" => Ok(ChronoDuration::days(num)),
563            _ => Err(anyhow!("Unknown duration unit: {}", unit)),
564        }
565    }
566
567    /// Generate base SPARQL query (remove RSP extensions)
568    fn generate_base_query(
569        &self,
570        original: &str,
571        streams: &[StreamClause],
572        _windows: &[WindowConfig],
573    ) -> Result<String> {
574        let mut base_query = original.to_string();
575
576        // Remove FROM STREAM clauses
577        for stream in streams {
578            // Simple replacement (in production, use proper SPARQL rewriting)
579            base_query = base_query.replace(&format!("FROM STREAM <{}>", stream.uri), "");
580
581            // Remove window specifications
582            if let Some(start) = base_query.find('[') {
583                if let Some(end) = base_query[start..].find(']') {
584                    base_query.replace_range(start..start + end + 1, "");
585                }
586            }
587        }
588
589        // Clean up extra whitespace
590        base_query = base_query
591            .lines()
592            .filter(|line| !line.trim().is_empty())
593            .collect::<Vec<_>>()
594            .join("\n");
595
596        Ok(base_query)
597    }
598
599    /// Extract triples from stream event
600    fn extract_triples_from_event(&self, event: &StreamEvent) -> Result<Vec<Triple>> {
601        match event {
602            StreamEvent::TripleAdded {
603                subject,
604                predicate,
605                object,
606                graph,
607                metadata: _,
608            } => Ok(vec![Triple {
609                subject: subject.clone(),
610                predicate: predicate.clone(),
611                object: object.clone(),
612                graph: graph.clone(),
613            }]),
614            StreamEvent::QuadAdded {
615                subject,
616                predicate,
617                object,
618                graph,
619                metadata: _,
620            } => Ok(vec![Triple {
621                subject: subject.clone(),
622                predicate: predicate.clone(),
623                object: object.clone(),
624                graph: Some(graph.clone()),
625            }]),
626            _ => Ok(vec![]),
627        }
628    }
629
630    /// Add triples to appropriate windows
631    async fn add_to_windows(
632        &self,
633        manager: &mut WindowManager,
634        triples: Vec<Triple>,
635    ) -> Result<()> {
636        let now = Utc::now();
637
638        match &manager.config.window_type {
639            WindowType::Tumbling => {
640                // Create new window if needed
641                if manager.windows.is_empty()
642                    || self.window_is_full(
643                        manager
644                            .windows
645                            .back()
646                            .expect("windows validated to be non-empty via is_empty check"),
647                        &manager.config,
648                    )
649                {
650                    self.create_new_window(manager, now).await?;
651                }
652
653                // Add to current window
654                if let Some(window) = manager.windows.back_mut() {
655                    window.triples.extend(triples);
656                }
657            }
658            WindowType::Sliding => {
659                // For sliding windows, may need to add to multiple windows
660                // and create new windows as time progresses
661
662                // Remove expired windows
663                self.cleanup_expired_windows(manager, now);
664
665                // Ensure we have at least one active window
666                if manager.windows.is_empty() {
667                    self.create_new_window(manager, now).await?;
668                }
669
670                // Add triples to all active windows
671                for window in &mut manager.windows {
672                    if now >= window.start && now <= window.end {
673                        window.triples.extend(triples.clone());
674                    }
675                }
676
677                // Check if we need a new sliding window
678                if let Some(WindowSize::Time(slide)) = &manager.config.slide {
679                    if let Some(last_window) = manager.windows.back() {
680                        let next_start = last_window.start + *slide;
681                        if now >= next_start {
682                            self.create_new_window(manager, next_start).await?;
683                        }
684                    }
685                }
686            }
687            WindowType::Landmark => {
688                // Landmark windows grow from start to now
689                if manager.windows.is_empty() {
690                    self.create_new_window(manager, now).await?;
691                }
692
693                if let Some(window) = manager.windows.front_mut() {
694                    window.triples.extend(triples);
695                    window.end = now;
696                }
697            }
698            WindowType::Session { gap } => {
699                // Session windows based on activity gaps
700                if manager.windows.is_empty() {
701                    self.create_new_window(manager, now).await?;
702                } else if let Some(last_window) = manager.windows.back_mut() {
703                    // Check if within session gap
704                    if now - last_window.end <= *gap {
705                        // Extend existing session
706                        last_window.triples.extend(triples);
707                        last_window.end = now;
708                    } else {
709                        // Start new session
710                        self.create_new_window(manager, now).await?;
711                    }
712                }
713            }
714        }
715
716        manager.stats.active_windows = manager.windows.len();
717        Ok(())
718    }
719
720    /// Check if window is full
721    fn window_is_full(&self, window: &Window, config: &WindowConfig) -> bool {
722        match &config.size {
723            WindowSize::Time(duration) => {
724                let window_duration = window.end - window.start;
725                window_duration >= *duration
726            }
727            WindowSize::Triples(count) => window.triples.len() >= *count,
728            WindowSize::Logical(count) => window.triples.len() >= *count,
729        }
730    }
731
732    /// Create a new window
733    async fn create_new_window(
734        &self,
735        manager: &mut WindowManager,
736        start: DateTime<Utc>,
737    ) -> Result<()> {
738        let end = match &manager.config.size {
739            WindowSize::Time(duration) => start + *duration,
740            _ => start, // For count-based, will be updated as triples arrive
741        };
742
743        let window = Window {
744            id: uuid::Uuid::new_v4().to_string(),
745            start,
746            end,
747            triples: Vec::new(),
748            materialized: false,
749        };
750
751        manager.windows.push_back(window);
752        manager.stats.windows_created += 1;
753
754        Ok(())
755    }
756
757    /// Cleanup expired windows
758    fn cleanup_expired_windows(&self, manager: &mut WindowManager, now: DateTime<Utc>) {
759        while let Some(window) = manager.windows.front() {
760            if window.end < now {
761                manager.windows.pop_front();
762                manager.stats.windows_closed += 1;
763            } else {
764                break;
765            }
766        }
767    }
768
769    /// Get window statistics
770    pub async fn get_window_stats(&self, stream_uri: &str) -> Option<WindowStats> {
771        self.windows
772            .read()
773            .await
774            .get(stream_uri)
775            .map(|m| m.stats.clone())
776    }
777
778    /// List registered streams
779    pub async fn list_streams(&self) -> Vec<String> {
780        self.streams.read().await.keys().cloned().collect()
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787
788    #[test]
789    fn test_parse_duration() {
790        assert_eq!(
791            RspProcessor::parse_duration_string_static("5m").unwrap(),
792            ChronoDuration::minutes(5)
793        );
794        assert_eq!(
795            RspProcessor::parse_duration_string_static("1h").unwrap(),
796            ChronoDuration::hours(1)
797        );
798        assert_eq!(
799            RspProcessor::parse_duration_string_static("30s").unwrap(),
800            ChronoDuration::seconds(30)
801        );
802    }
803
804    #[test]
805    fn test_detect_language() {
806        let csparql_query = "SELECT * FROM STREAM <http://stream> [RANGE 5m]";
807        assert_eq!(
808            RspProcessor::detect_language_static(csparql_query).unwrap(),
809            RspLanguage::CSparql
810        );
811
812        let cqels_query = "SELECT * WHERE { ?s ?p ?o } [NOW-5m TO NOW]";
813        assert_eq!(
814            RspProcessor::detect_language_static(cqels_query).unwrap(),
815            RspLanguage::Cqels
816        );
817    }
818}