rsp_rs/lib.rs
1//! # RSP-RS
2//!
3//! A high-performance RDF Stream Processing engine in Rust, supporting RSP-QL queries
4//! with sliding windows and real-time analytics.
5//!
6//! This library provides:
7//! - RSP-QL syntax support for continuous queries
8//! - Sliding and tumbling window semantics
9//! - SPARQL aggregation functions (COUNT, AVG, MIN, MAX, SUM)
10//! - Real-time stream processing with multi-threading
11//! - Integration with static background knowledge
12//!
13//! ## When Are Results Emitted?
14//!
15//! **Important:** Results are emitted when windows **close**, which happens when:
16//! 1. A new event arrives with a **timestamp** > window end time
17//! 2. The window's STEP interval is reached **based on event timestamps**
18//!
19//! **Key Concept:** Window closure is driven by **event timestamps**, NOT wall-clock time!
20//! The system doesn't use timers - it only processes events when you call `add_quads()`.
21//!
22//! ### Example with RANGE 10000 STEP 2000:
23//!
24//! ```text
25//! - Events at t=0, 500, 1000, 1500 are added to windows
26//! - No results yet (windows still open)
27//! - Event with timestamp=2000 arrives -> closes window [-8000, 2000) -> results emitted
28//! - Event with timestamp=4000 arrives -> closes window [-6000, 4000) -> results emitted
29//!
30//! Note: Wall-clock time doesn't matter! You could add all these events instantly,
31//! but results only emit when an event's TIMESTAMP triggers window closure.
32//! ```
33//!
34//! **Important:** If your last event has timestamp=1500, NO results will be emitted
35//! because no subsequent event with a higher timestamp triggered window closure.
36//! Use `close_stream()` to add a "sentinel" event with a future timestamp to trigger
37//! remaining window closures.
38//!
39//! ### Complete Example with Stream Closure:
40//!
41//! ```rust,no_run
42//! use rsp_rs::RSPEngine;
43//! use oxigraph::model::*;
44//!
45//! fn main() -> Result<(), Box<dyn std::error::Error>> {
46//! let query = r#"
47//! PREFIX ex: <https://rsp.rs/>
48//! REGISTER RStream <output> AS
49//! SELECT *
50//! FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10000 STEP 2000]
51//! WHERE {
52//! WINDOW ex:w1 { ?s ?p ?o }
53//! }
54//! "#;
55//!
56//! let mut rsp_engine = RSPEngine::new(query.to_string());
57//! rsp_engine.initialize()?;
58//!
59//! // Get a cloned stream (can be stored and reused)
60//! let stream = rsp_engine.get_stream("https://rsp.rs/stream1").unwrap();
61//!
62//! // Start processing results
63//! let result_receiver = rsp_engine.start_processing();
64//!
65//! // Add events with TIMESTAMPS (not wall-clock time!)
66//! // These could be added instantly or over hours - doesn't matter
67//! let quad1 = Quad::new(
68//! NamedNode::new("https://rsp.rs/subject1")?,
69//! NamedNode::new("https://rsp.rs/predicate")?,
70//! NamedNode::new("https://rsp.rs/object")?,
71//! GraphName::DefaultGraph,
72//! );
73//! stream.add_quads(vec![quad1], 1000)?; // timestamp = 1000
74//!
75//! let quad2 = Quad::new(
76//! NamedNode::new("https://rsp.rs/subject2")?,
77//! NamedNode::new("https://rsp.rs/predicate")?,
78//! NamedNode::new("https://rsp.rs/object")?,
79//! GraphName::DefaultGraph,
80//! );
81//! stream.add_quads(vec![quad2], 1500)?; // timestamp = 1500
82//!
83//! // IMPORTANT: Close the stream to emit final results
84//! // This adds a sentinel event with timestamp=10000 to trigger window closures
85//! rsp_engine.close_stream("https://rsp.rs/stream1", 10000)?;
86//!
87//! // Collect results
88//! while let Ok(result) = result_receiver.recv() {
89//! println!("Result: {} (window: {} to {})",
90//! result.bindings,
91//! result.timestamp_from,
92//! result.timestamp_to);
93//! }
94//!
95//! Ok(())
96//! }
97//! ```
98//!
99//! ## Understanding Window Lifecycle
100//!
101//! Windows don't emit results when events arrive - they emit when **closed** by future events.
102//!
103//! **Critical:** The timeline below shows EVENT TIMESTAMPS (not wall-clock time):
104//!
105//! ### Timeline Example (RANGE 10s, STEP 2s):
106//!
107//! ```text
108//! Event with timestamp=0: Added to window
109//! Event with timestamp=1000: More events added to window
110//! Event with timestamp=2000: -> window [-8000, 2000) closes -> RESULTS EMITTED
111//! Event with timestamp=4000: -> window [-6000, 4000) closes -> RESULTS EMITTED
112//! Event with timestamp=6000: -> window [-4000, 6000) closes -> RESULTS EMITTED
113//! ...
114//! Event with timestamp=15000: Last event added to stream
115//! NO MORE RESULTS (no event to trigger closure!)
116//!
117//! Solution: Call close_stream("stream_uri", 20000) to emit final results
118//!
119//! Note: You can add ALL these events in 1 millisecond of real time! The system only
120//! cares about the timestamps you provide, not how fast you send events.
121//! ```
122//!
123//! ## Debugging Window Behavior
124//!
125//! You can inspect window state for debugging:
126//!
127//! ```rust,no_run
128//! # use rsp_rs::RSPEngine;
129//! # let mut engine = RSPEngine::new("".to_string());
130//! # engine.initialize().unwrap();
131//! if let Some(window) = engine.get_window("window_name") {
132//! let window_lock = window.lock().unwrap();
133//!
134//! // Check how many windows are active
135//! println!("Active windows: {}", window_lock.get_active_window_count());
136//!
137//! // See the time ranges of active windows
138//! for (start, end) in window_lock.get_active_window_ranges() {
139//! println!("Window: [{}, {})", start, end);
140//! }
141//!
142//! // Enable verbose debug logging
143//! // window_lock.set_debug_mode(true);
144//! }
145//! ```
146
147mod engine;
148mod parsing;
149mod quad_container;
150mod windowing;
151
152// Re-export modules for easier access
153pub use engine::*;
154pub use parsing::*;
155pub use windowing::*;
156
157// Public API exports
158pub use engine::r2r::R2ROperator;
159pub use engine::rsp_engine::{BindingWithTimestamp, RDFStream, RSPEngine};
160pub use parsing::parsed_query::{Operator, ParsedQuery, WindowDefinition};
161pub use parsing::rspql_parser::RSPQLParser;
162pub use quad_container::QuadContainer;
163pub use windowing::csparql_window::{
164 CSPARQLWindow, ReportStrategy, StreamType, Tick, execute_query,
165};
166pub use windowing::window_instance::WindowInstance;