rsp_rs/
lib.rs

1//! # RSP-RS
2//!
3//! A high-performance RDF Stream Processing engine in Rust, supporting RSP-QL queries
4//! with sliding windows and real-time analytics.
5//!
6//! This library provides:
7//! - RSP-QL syntax support for continuous queries
8//! - Sliding and tumbling window semantics
9//! - SPARQL aggregation functions (COUNT, AVG, MIN, MAX, SUM)
10//! - Real-time stream processing with multi-threading
11//! - Integration with static background knowledge
12//!
13//! ## When Are Results Emitted?
14//!
15//! **Important:** Results are emitted when windows **close**, which happens when:
16//! 1. A new event arrives with a **timestamp** > window end time
17//! 2. The window's STEP interval is reached **based on event timestamps**
18//!
19//! **Key Concept:** Window closure is driven by **event timestamps**, NOT wall-clock time!
20//! The system doesn't use timers - it only processes events when you call `add_quads()`.
21//!
22//! ### Example with RANGE 10000 STEP 2000:
23//!
24//! ```text
25//! - Events at t=0, 500, 1000, 1500 are added to windows
26//! - No results yet (windows still open)
27//! - Event with timestamp=2000 arrives -> closes window [-8000, 2000) -> results emitted
28//! - Event with timestamp=4000 arrives -> closes window [-6000, 4000) -> results emitted
29//!
30//! Note: Wall-clock time doesn't matter! You could add all these events instantly,
31//! but results only emit when an event's TIMESTAMP triggers window closure.
32//! ```
33//!
34//! **Important:** If your last event has timestamp=1500, NO results will be emitted
35//! because no subsequent event with a higher timestamp triggered window closure.
36//! Use `close_stream()` to add a "sentinel" event with a future timestamp to trigger
37//! remaining window closures.
38//!
39//! ### Complete Example with Stream Closure:
40//!
41//! ```rust,no_run
42//! use rsp_rs::RSPEngine;
43//! use oxigraph::model::*;
44//!
45//! fn main() -> Result<(), Box<dyn std::error::Error>> {
46//!     let query = r#"
47//!         PREFIX ex: <https://rsp.rs/>
48//!         REGISTER RStream <output> AS
49//!         SELECT *
50//!         FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10000 STEP 2000]
51//!         WHERE {
52//!             WINDOW ex:w1 { ?s ?p ?o }
53//!         }
54//!     "#;
55//!
56//!     let mut rsp_engine = RSPEngine::new(query.to_string());
57//!     rsp_engine.initialize()?;
58//!
59//!     // Get a cloned stream (can be stored and reused)
60//!     let stream = rsp_engine.get_stream("https://rsp.rs/stream1").unwrap();
61//!
62//!     // Start processing results
63//!     let result_receiver = rsp_engine.start_processing();
64//!
65//!     // Add events with TIMESTAMPS (not wall-clock time!)
66//!     // These could be added instantly or over hours - doesn't matter
67//!     let quad1 = Quad::new(
68//!         NamedNode::new("https://rsp.rs/subject1")?,
69//!         NamedNode::new("https://rsp.rs/predicate")?,
70//!         NamedNode::new("https://rsp.rs/object")?,
71//!         GraphName::DefaultGraph,
72//!     );
73//!     stream.add_quads(vec![quad1], 1000)?;  // timestamp = 1000
74//!
75//!     let quad2 = Quad::new(
76//!         NamedNode::new("https://rsp.rs/subject2")?,
77//!         NamedNode::new("https://rsp.rs/predicate")?,
78//!         NamedNode::new("https://rsp.rs/object")?,
79//!         GraphName::DefaultGraph,
80//!     );
81//!     stream.add_quads(vec![quad2], 1500)?;  // timestamp = 1500
82//!
83//!     // IMPORTANT: Close the stream to emit final results
84//!     // This adds a sentinel event with timestamp=10000 to trigger window closures
85//!     rsp_engine.close_stream("https://rsp.rs/stream1", 10000)?;
86//!
87//!     // Collect results
88//!     while let Ok(result) = result_receiver.recv() {
89//!         println!("Result: {} (window: {} to {})",
90//!                  result.bindings,
91//!                  result.timestamp_from,
92//!                  result.timestamp_to);
93//!     }
94//!
95//!     Ok(())
96//! }
97//! ```
98//!
99//! ## Understanding Window Lifecycle
100//!
101//! Windows don't emit results when events arrive - they emit when **closed** by future events.
102//!
103//! **Critical:** The timeline below shows EVENT TIMESTAMPS (not wall-clock time):
104//!
105//! ### Timeline Example (RANGE 10s, STEP 2s):
106//!
107//! ```text
108//! Event with timestamp=0:     Added to window
109//! Event with timestamp=1000:  More events added to window
110//! Event with timestamp=2000:  -> window [-8000, 2000) closes -> RESULTS EMITTED
111//! Event with timestamp=4000:  -> window [-6000, 4000) closes -> RESULTS EMITTED
112//! Event with timestamp=6000:  -> window [-4000, 6000) closes -> RESULTS EMITTED
113//! ...
114//! Event with timestamp=15000: Last event added to stream
115//!                             NO MORE RESULTS (no event to trigger closure!)
116//!
117//! Solution: Call close_stream("stream_uri", 20000) to emit final results
118//!
119//! Note: You can add ALL these events in 1 millisecond of real time! The system only
120//! cares about the timestamps you provide, not how fast you send events.
121//! ```
122//!
123//! ## Debugging Window Behavior
124//!
125//! You can inspect window state for debugging:
126//!
127//! ```rust,no_run
128//! # use rsp_rs::RSPEngine;
129//! # let mut engine = RSPEngine::new("".to_string());
130//! # engine.initialize().unwrap();
131//! if let Some(window) = engine.get_window("window_name") {
132//!     let window_lock = window.lock().unwrap();
133//!
134//!     // Check how many windows are active
135//!     println!("Active windows: {}", window_lock.get_active_window_count());
136//!
137//!     // See the time ranges of active windows
138//!     for (start, end) in window_lock.get_active_window_ranges() {
139//!         println!("Window: [{}, {})", start, end);
140//!     }
141//!
142//!     // Enable verbose debug logging
143//!     // window_lock.set_debug_mode(true);
144//! }
145//! ```
146
147mod engine;
148mod parsing;
149mod quad_container;
150mod windowing;
151
152// Re-export modules for easier access
153pub use engine::*;
154pub use parsing::*;
155pub use windowing::*;
156
157// Public API exports
158pub use engine::r2r::R2ROperator;
159pub use engine::rsp_engine::{BindingWithTimestamp, RDFStream, RSPEngine};
160pub use parsing::parsed_query::{Operator, ParsedQuery, WindowDefinition};
161pub use parsing::rspql_parser::RSPQLParser;
162pub use quad_container::QuadContainer;
163pub use windowing::csparql_window::{
164    CSPARQLWindow, ReportStrategy, StreamType, Tick, execute_query,
165};
166pub use windowing::window_instance::WindowInstance;