rsp_rs/engine/
rsp_engine.rs1use crate::parsed_query::WindowDefinition;
2use crate::rspql_parser::RSPQLParser;
3use crate::{CSPARQLWindow, QuadContainer, R2ROperator};
4use oxigraph::model::Quad;
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8
9#[derive(Debug, Clone)]
11pub struct BindingWithTimestamp {
12 pub bindings: String,
13 pub timestamp_from: i64,
14 pub timestamp_to: i64,
15}
16
17#[derive(Clone)]
19pub struct RDFStream {
20 pub name: String,
21 pub(crate) window_sender: mpsc::Sender<(QuadContainer, String)>,
22}
23
24impl RDFStream {
25 pub fn new(name: String, window_sender: mpsc::Sender<(QuadContainer, String)>) -> Self {
26 Self {
27 name,
28 window_sender,
29 }
30 }
31
32 pub fn add(&self, container: QuadContainer) -> Result<(), String> {
34 self.window_sender
35 .send((container, self.name.clone()))
36 .map_err(|e| format!("Failed to send data to window: {}", e))
37 }
38
39 pub fn add_quads(&self, quads: Vec<Quad>, timestamp: i64) -> Result<(), String> {
41 let mut elements = std::collections::HashSet::new();
42 for quad in quads {
43 elements.insert(quad);
44 }
45 let container = QuadContainer::new(elements, timestamp);
46 self.add(container)
47 }
48}
49
50pub struct RSPEngine {
52 windows: HashMap<String, Arc<Mutex<CSPARQLWindow>>>,
53 streams: HashMap<String, RDFStream>,
54 r2r: R2ROperator,
55 parsed_query: crate::parsed_query::ParsedQuery,
56}
57
58impl RSPEngine {
59 pub fn new(query: String) -> Self {
61 let parser = RSPQLParser::new(query);
62 let parsed_query = parser.parse();
63
64 #[cfg(debug_assertions)]
65 {
66 println!("[RSPEngine] Parsed SPARQL query:");
67 println!("{}", parsed_query.sparql_query);
68 println!();
69 }
70
71 let windows = HashMap::new();
72 let streams = HashMap::new();
73 let r2r = R2ROperator::new(parsed_query.sparql_query.clone());
74
75 Self {
76 windows,
77 streams,
78 r2r,
79 parsed_query,
80 }
81 }
82
83 pub fn initialize(&mut self) -> Result<(), String> {
85 for window_def in &self.parsed_query.s2r {
87 let (tx, rx) = mpsc::channel::<(QuadContainer, String)>();
88
89 let window = Arc::new(Mutex::new(CSPARQLWindow::new(
91 window_def.window_name.clone(),
92 window_def.width,
93 window_def.slide,
94 crate::ReportStrategy::OnWindowClose,
95 crate::Tick::TimeDriven,
96 0,
97 )));
98
99 let stream = RDFStream::new(window_def.stream_name.clone(), tx);
101
102 self.windows
104 .insert(window_def.window_name.clone(), window.clone());
105 self.streams.insert(window_def.stream_name.clone(), stream);
106
107 let window_clone = window.clone();
109 thread::spawn(move || {
110 while let Ok((container, _stream_name)) = rx.recv() {
111 let mut win = window_clone.lock().unwrap();
112 for quad in &container.elements {
114 win.add(quad.clone(), container.last_timestamp_changed);
115 }
116 }
117 });
118 }
119
120 Ok(())
121 }
122
123 pub fn register(
126 windows: HashMap<String, Arc<Mutex<CSPARQLWindow>>>,
127 r2r: R2ROperator,
128 window_defs: Vec<WindowDefinition>,
129 ) -> mpsc::Receiver<BindingWithTimestamp> {
130 let (tx, rx) = mpsc::channel();
131
132 for (window_name, window_arc) in windows.iter() {
134 let r2r_clone = r2r.clone();
135 let tx_clone = tx.clone();
136 let all_windows = windows.clone();
137 let window_def = window_defs
138 .iter()
139 .find(|w| w.window_name == *window_name)
140 .cloned();
141 let window_name_owned = window_name.clone();
142
143 {
145 let mut window = window_arc.lock().unwrap();
146 window.subscribe(crate::StreamType::RStream, move |mut container| {
147 let timestamp = container.last_timestamp_changed;
148
149 for (other_name, other_window_arc) in &all_windows {
151 if other_name != &window_name_owned {
152 if let Ok(other_window) = other_window_arc.lock() {
153 if let Some(other_container) =
154 other_window.get_content_from_window(timestamp)
155 {
156 for quad in &other_container.elements {
157 container.add(quad.clone(), timestamp);
158 }
159 }
160 }
161 }
162 }
163
164 if let Ok(results) = r2r_clone.execute(&container) {
166 if let Some(def) = &window_def {
167 if let oxigraph::sparql::QueryResults::Solutions(solutions) = results {
168 for solution in solutions {
169 if let Ok(binding) = solution {
170 let binding_str = format!("{:?}", binding);
171 let result = BindingWithTimestamp {
172 bindings: binding_str,
173 timestamp_from: timestamp,
174 timestamp_to: timestamp + def.width,
175 };
176 let _ = tx_clone.send(result);
177 }
178 }
179 }
180 }
181 }
182 });
183 }
184 }
185
186 rx
187 }
188
189 pub fn start_processing(&self) -> mpsc::Receiver<BindingWithTimestamp> {
191 Self::register(
192 self.windows.clone(),
193 self.r2r.clone(),
194 self.parsed_query.s2r.clone(),
195 )
196 }
197
198 pub fn get_stream(&self, stream_name: &str) -> Option<RDFStream> {
200 self.streams.get(stream_name).cloned()
201 }
202
203 pub fn add_static_data(&mut self, quad: Quad) {
205 self.r2r.add_static_data(quad);
206 }
207
208 pub fn get_all_streams(&self) -> Vec<String> {
210 self.streams.keys().cloned().collect()
211 }
212
213 pub fn close_stream(&self, stream_uri: &str, final_timestamp: i64) -> Result<(), String> {
216 if let Some(stream) = self.get_stream(stream_uri) {
217 let sentinel = oxigraph::model::Quad::new(
219 oxigraph::model::NamedNode::new("urn:rsp:sentinel")
220 .map_err(|e| format!("Failed to create sentinel node: {}", e))?,
221 oxigraph::model::NamedNode::new("urn:rsp:type")
222 .map_err(|e| format!("Failed to create sentinel node: {}", e))?,
223 oxigraph::model::Literal::new_simple_literal("end"),
224 oxigraph::model::GraphName::DefaultGraph,
225 );
226 stream.add_quads(vec![sentinel], final_timestamp)?;
227 Ok(())
228 } else {
229 Err(format!("Stream {} not found", stream_uri))
230 }
231 }
232
233 pub fn parsed_query(&self) -> &crate::parsed_query::ParsedQuery {
235 &self.parsed_query
236 }
237
238 pub fn get_window(&self, window_name: &str) -> Option<Arc<Mutex<CSPARQLWindow>>> {
240 self.windows.get(window_name).cloned()
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn test_rsp_engine_creation() {
250 let query = r#"
251 REGISTER RStream <http://example.org/output> AS
252 PREFIX ex: <http://example.org/>
253 SELECT ?s ?p ?o
254 FROM NAMED WINDOW :win1 ON STREAM :stream1 [RANGE 10 STEP 5]
255 WHERE {
256 WINDOW :win1 { ?s ?p ?o }
257 }
258 "#
259 .to_string();
260
261 let engine = RSPEngine::new(query);
262 assert_eq!(engine.parsed_query.s2r.len(), 1);
263 }
264
265 #[test]
266 fn test_initialize_engine() {
267 let query = r#"
268 REGISTER RStream <http://example.org/output> AS
269 PREFIX ex: <http://example.org/>
270 SELECT ?s ?p ?o
271 FROM NAMED WINDOW :win1 ON STREAM :stream1 [RANGE 10 STEP 5]
272 WHERE {
273 WINDOW :win1 { ?s ?p ?o }
274 }
275 "#
276 .to_string();
277
278 let mut engine = RSPEngine::new(query);
279 let result = engine.initialize();
280 assert!(result.is_ok());
281 assert_eq!(engine.get_all_streams().len(), 1);
282 }
283}