rsp_rs/engine/
rsp_engine.rs

1use crate::parsed_query::WindowDefinition;
2use crate::rspql_parser::RSPQLParser;
3use crate::{CSPARQLWindow, QuadContainer, R2ROperator};
4use oxigraph::model::Quad;
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8
9/// Represents a binding result with timestamp information
10#[derive(Debug, Clone)]
11pub struct BindingWithTimestamp {
12    pub bindings: String,
13    pub timestamp_from: i64,
14    pub timestamp_to: i64,
15}
16
17/// Represents an RDF stream that feeds data into a window
18#[derive(Clone)]
19pub struct RDFStream {
20    pub name: String,
21    pub(crate) window_sender: mpsc::Sender<(QuadContainer, String)>,
22}
23
24impl RDFStream {
25    pub fn new(name: String, window_sender: mpsc::Sender<(QuadContainer, String)>) -> Self {
26        Self {
27            name,
28            window_sender,
29        }
30    }
31
32    /// Add a quad container to the stream
33    pub fn add(&self, container: QuadContainer) -> Result<(), String> {
34        self.window_sender
35            .send((container, self.name.clone()))
36            .map_err(|e| format!("Failed to send data to window: {}", e))
37    }
38
39    /// Add a set of quads with a timestamp to the stream
40    pub fn add_quads(&self, quads: Vec<Quad>, timestamp: i64) -> Result<(), String> {
41        let mut elements = std::collections::HashSet::new();
42        for quad in quads {
43            elements.insert(quad);
44        }
45        let container = QuadContainer::new(elements, timestamp);
46        self.add(container)
47    }
48}
49
50/// The main RSP (RDF Stream Processing) Engine
51pub struct RSPEngine {
52    windows: HashMap<String, Arc<Mutex<CSPARQLWindow>>>,
53    streams: HashMap<String, RDFStream>,
54    r2r: R2ROperator,
55    parsed_query: crate::parsed_query::ParsedQuery,
56}
57
58impl RSPEngine {
59    /// Create a new RSP Engine from an RSPQL query
60    pub fn new(query: String) -> Self {
61        let parser = RSPQLParser::new(query);
62        let parsed_query = parser.parse();
63
64        #[cfg(debug_assertions)]
65        {
66            println!("[RSPEngine] Parsed SPARQL query:");
67            println!("{}", parsed_query.sparql_query);
68            println!();
69        }
70
71        let windows = HashMap::new();
72        let streams = HashMap::new();
73        let r2r = R2ROperator::new(parsed_query.sparql_query.clone());
74
75        Self {
76            windows,
77            streams,
78            r2r,
79            parsed_query,
80        }
81    }
82
83    /// Initialize the engine by creating windows and streams
84    pub fn initialize(&mut self) -> Result<(), String> {
85        // Create windows and streams based on parsed query
86        for window_def in &self.parsed_query.s2r {
87            let (tx, rx) = mpsc::channel::<(QuadContainer, String)>();
88
89            // Create window with full parameters
90            let window = Arc::new(Mutex::new(CSPARQLWindow::new(
91                window_def.window_name.clone(),
92                window_def.width,
93                window_def.slide,
94                crate::ReportStrategy::OnWindowClose,
95                crate::Tick::TimeDriven,
96                0,
97            )));
98
99            // Create stream
100            let stream = RDFStream::new(window_def.stream_name.clone(), tx);
101
102            // Store window and stream
103            self.windows
104                .insert(window_def.window_name.clone(), window.clone());
105            self.streams.insert(window_def.stream_name.clone(), stream);
106
107            // Spawn thread to handle incoming data
108            let window_clone = window.clone();
109            thread::spawn(move || {
110                while let Ok((container, _stream_name)) = rx.recv() {
111                    let mut win = window_clone.lock().unwrap();
112                    // Add all quads from the container to the window
113                    for quad in &container.elements {
114                        win.add(quad.clone(), container.last_timestamp_changed);
115                    }
116                }
117            });
118        }
119
120        Ok(())
121    }
122
123    /// Register a callback for processing window content
124    /// Returns a receiver for binding results
125    pub fn register(
126        windows: HashMap<String, Arc<Mutex<CSPARQLWindow>>>,
127        r2r: R2ROperator,
128        window_defs: Vec<WindowDefinition>,
129    ) -> mpsc::Receiver<BindingWithTimestamp> {
130        let (tx, rx) = mpsc::channel();
131
132        // For each window, subscribe to its RStream output
133        for (window_name, window_arc) in windows.iter() {
134            let r2r_clone = r2r.clone();
135            let tx_clone = tx.clone();
136            let all_windows = windows.clone();
137            let window_def = window_defs
138                .iter()
139                .find(|w| w.window_name == *window_name)
140                .cloned();
141            let window_name_owned = window_name.clone();
142
143            // Subscribe to window emissions using the callback system
144            {
145                let mut window = window_arc.lock().unwrap();
146                window.subscribe(crate::StreamType::RStream, move |mut container| {
147                    let timestamp = container.last_timestamp_changed;
148
149                    // Merge content from other windows
150                    for (other_name, other_window_arc) in &all_windows {
151                        if other_name != &window_name_owned {
152                            if let Ok(other_window) = other_window_arc.lock() {
153                                if let Some(other_container) =
154                                    other_window.get_content_from_window(timestamp)
155                                {
156                                    for quad in &other_container.elements {
157                                        container.add(quad.clone(), timestamp);
158                                    }
159                                }
160                            }
161                        }
162                    }
163
164                    // Execute R2R query
165                    if let Ok(results) = r2r_clone.execute(&container) {
166                        if let Some(def) = &window_def {
167                            if let oxigraph::sparql::QueryResults::Solutions(solutions) = results {
168                                for solution in solutions {
169                                    if let Ok(binding) = solution {
170                                        let binding_str = format!("{:?}", binding);
171                                        let result = BindingWithTimestamp {
172                                            bindings: binding_str,
173                                            timestamp_from: timestamp,
174                                            timestamp_to: timestamp + def.width,
175                                        };
176                                        let _ = tx_clone.send(result);
177                                    }
178                                }
179                            }
180                        }
181                    }
182                });
183            }
184        }
185
186        rx
187    }
188
189    /// Convenience method to register using the engine's own data
190    pub fn start_processing(&self) -> mpsc::Receiver<BindingWithTimestamp> {
191        Self::register(
192            self.windows.clone(),
193            self.r2r.clone(),
194            self.parsed_query.s2r.clone(),
195        )
196    }
197
198    /// Get a stream by name (returns a clone for easier usage)
199    pub fn get_stream(&self, stream_name: &str) -> Option<RDFStream> {
200        self.streams.get(stream_name).cloned()
201    }
202
203    /// Add static data to the R2R operator
204    pub fn add_static_data(&mut self, quad: Quad) {
205        self.r2r.add_static_data(quad);
206    }
207
208    /// Get all stream names
209    pub fn get_all_streams(&self) -> Vec<String> {
210        self.streams.keys().cloned().collect()
211    }
212
213    /// Add a sentinel event to trigger closure of all open windows
214    /// This should be called when the stream ends to emit final results
215    pub fn close_stream(&self, stream_uri: &str, final_timestamp: i64) -> Result<(), String> {
216        if let Some(stream) = self.get_stream(stream_uri) {
217            // Add a dummy quad with timestamp far in the future
218            let sentinel = oxigraph::model::Quad::new(
219                oxigraph::model::NamedNode::new("urn:rsp:sentinel")
220                    .map_err(|e| format!("Failed to create sentinel node: {}", e))?,
221                oxigraph::model::NamedNode::new("urn:rsp:type")
222                    .map_err(|e| format!("Failed to create sentinel node: {}", e))?,
223                oxigraph::model::Literal::new_simple_literal("end"),
224                oxigraph::model::GraphName::DefaultGraph,
225            );
226            stream.add_quads(vec![sentinel], final_timestamp)?;
227            Ok(())
228        } else {
229            Err(format!("Stream {} not found", stream_uri))
230        }
231    }
232
233    /// Get the parsed query
234    pub fn parsed_query(&self) -> &crate::parsed_query::ParsedQuery {
235        &self.parsed_query
236    }
237
238    /// Get a window by name
239    pub fn get_window(&self, window_name: &str) -> Option<Arc<Mutex<CSPARQLWindow>>> {
240        self.windows.get(window_name).cloned()
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn test_rsp_engine_creation() {
250        let query = r#"
251            REGISTER RStream <http://example.org/output> AS
252            PREFIX ex: <http://example.org/>
253            SELECT ?s ?p ?o
254            FROM NAMED WINDOW :win1 ON STREAM :stream1 [RANGE 10 STEP 5]
255            WHERE {
256                WINDOW :win1 { ?s ?p ?o }
257            }
258        "#
259        .to_string();
260
261        let engine = RSPEngine::new(query);
262        assert_eq!(engine.parsed_query.s2r.len(), 1);
263    }
264
265    #[test]
266    fn test_initialize_engine() {
267        let query = r#"
268            REGISTER RStream <http://example.org/output> AS
269            PREFIX ex: <http://example.org/>
270            SELECT ?s ?p ?o
271            FROM NAMED WINDOW :win1 ON STREAM :stream1 [RANGE 10 STEP 5]
272            WHERE {
273                WINDOW :win1 { ?s ?p ?o }
274            }
275        "#
276        .to_string();
277
278        let mut engine = RSPEngine::new(query);
279        let result = engine.initialize();
280        assert!(result.is_ok());
281        assert_eq!(engine.get_all_streams().len(), 1);
282    }
283}