1use crate::{QuadContainer, WindowInstance};
2use oxigraph::model::Quad;
3use oxigraph::store::Store;
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ReportStrategy {
10 NonEmptyContent,
11 OnContentChange,
12 OnWindowClose,
13 Periodic,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum Tick {
19 TimeDriven,
20 TupleDriven,
21 BatchDriven,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum StreamType {
27 RStream,
28 IStream,
29 DStream,
30}
31
32pub type WindowCallback = Arc<dyn Fn(QuadContainer) + Send + Sync>;
34
35pub struct CSPARQLWindow {
37 pub name: String,
38 pub width: i64,
39 pub slide: i64,
40 pub time: i64,
41 pub t0: i64,
42 pub active_windows: HashMap<WindowInstance, QuadContainer>,
43 pub report: ReportStrategy,
44 pub tick: Tick,
45 callbacks: HashMap<StreamType, Vec<WindowCallback>>,
46 pub debug_mode: bool,
47}
48
49impl CSPARQLWindow {
50 pub fn new(
51 name: String,
52 width: i64,
53 slide: i64,
54 report: ReportStrategy,
55 tick: Tick,
56 start_time: i64,
57 ) -> Self {
58 Self {
59 name,
60 width,
61 slide,
62 report,
63 tick,
64 time: start_time,
65 t0: start_time,
66 active_windows: HashMap::new(),
67 callbacks: HashMap::new(),
68 debug_mode: false,
69 }
70 }
71
72 pub fn get_content(&self, timestamp: i64) -> Option<&QuadContainer> {
75 let mut max_window: Option<&WindowInstance> = None;
76 let mut max_time = i64::MAX;
77
78 for (window, _container) in &self.active_windows {
79 if window.open <= timestamp && timestamp <= window.close {
80 if window.close < max_time {
81 max_time = window.close;
82 max_window = Some(window);
83 }
84 }
85 }
86
87 max_window.and_then(|w| self.active_windows.get(w))
88 }
89
90 pub fn add(&mut self, quad: Quad, timestamp: i64) {
92 if self.debug_mode {
93 eprintln!(
94 "[WINDOW {}] Received element ({:?},{}) ",
95 self.name, quad, timestamp
96 );
97 }
98
99 let quad_with_window_graph = oxigraph::model::Quad::new(
102 quad.subject.clone(),
103 quad.predicate.clone(),
104 quad.object.clone(),
105 oxigraph::model::GraphName::NamedNode(
106 oxigraph::model::NamedNode::new(&self.name).unwrap_or_else(|_| {
107 oxigraph::model::NamedNode::new("http://default-window").unwrap()
109 }),
110 ),
111 );
112
113 let mut to_evict = Vec::new();
114 let t_e = timestamp;
115
116 if self.time > t_e {
117 eprintln!("OUT OF ORDER NOT HANDLED");
118 }
119
120 self.scope(t_e);
121
122 for (window, container) in &mut self.active_windows {
124 if self.debug_mode {
125 eprintln!(
126 "[WINDOW {}] Processing Window [{},{}) for element ({:?},{})",
127 self.name, window.open, window.close, quad_with_window_graph, timestamp
128 );
129 }
130
131 if window.open <= t_e && t_e < window.close {
132 if self.debug_mode {
133 eprintln!(
134 "[WINDOW {}] Adding element to Window [{},{})",
135 self.name, window.open, window.close
136 );
137 }
138 container.add(quad_with_window_graph.clone(), timestamp);
139 if self.debug_mode {
140 eprintln!(
141 "[WINDOW {}] Window [{},{}) now has {} quads",
142 self.name,
143 window.open,
144 window.close,
145 container.len()
146 );
147 }
148 } else if t_e >= window.close {
149 if self.debug_mode {
150 eprintln!(
151 "[WINDOW {}] Scheduling for Eviction [{},{})",
152 self.name, window.open, window.close
153 );
154 }
155 }
158 }
159
160 if self.debug_mode {
162 eprintln!(
163 "[WINDOW {}] Active windows before reporting check: {}",
164 self.name,
165 self.active_windows.len()
166 );
167 }
168
169 let mut max_window: Option<WindowInstance> = None;
170 let mut max_time = 0i64;
171
172 for (window, container) in &self.active_windows {
173 if self.compute_report(window, container, timestamp) {
174 if self.debug_mode {
175 eprintln!(
176 "[WINDOW {}] Window [{},{}) should report (has {} quads)",
177 self.name,
178 window.open,
179 window.close,
180 container.len()
181 );
182 }
183 if window.close > max_time {
184 max_time = window.close;
185 max_window = Some(window.clone());
186 }
187 to_evict.push(window.clone());
189 }
190 }
191
192 if let Some(window) = max_window {
194 if self.debug_mode {
195 eprintln!(
196 "[WINDOW {}] Max window selected for reporting: [{},{})",
197 self.name, window.open, window.close
198 );
199 }
200 if self.tick == Tick::TimeDriven {
201 if timestamp > self.time {
202 self.time = timestamp;
203 if let Some(content) = self.active_windows.get(&window) {
204 if self.debug_mode {
205 eprintln!(
206 "[WINDOW {}] Emitting {} quads at t={} for window [{},{})",
207 self.name,
208 content.len(),
209 timestamp,
210 window.open,
211 window.close
212 );
213 }
214 self.emit(StreamType::RStream, content.clone());
215 } else {
216 if self.debug_mode {
217 eprintln!(
218 "[WINDOW {}] ERROR: Window [{},{}) not found in active_windows!",
219 self.name, window.open, window.close
220 );
221 }
222 }
223 }
224 }
225 }
226
227 for window in to_evict {
229 if self.debug_mode {
230 eprintln!(
231 "[WINDOW {}] Evicting [{},{})",
232 self.name, window.open, window.close
233 );
234 }
235 self.active_windows.remove(&window);
236 }
237 }
238
239 fn compute_report(
241 &self,
242 window: &WindowInstance,
243 _content: &QuadContainer,
244 timestamp: i64,
245 ) -> bool {
246 match self.report {
247 ReportStrategy::OnWindowClose => window.close < timestamp,
248 ReportStrategy::NonEmptyContent => !_content.is_empty(),
249 ReportStrategy::OnContentChange => true, ReportStrategy::Periodic => true, }
252 }
253
254 pub fn scope(&mut self, t_e: i64) {
256 if self.t0 == 0 {
257 self.t0 = t_e;
258 }
259
260 let delta = (t_e - self.t0).abs();
263 let c_sup = self.t0 + ((delta + self.slide - 1) / self.slide) * self.slide;
264 let mut o_i = c_sup - self.width;
265
266 if self.debug_mode {
267 eprintln!(
268 "[WINDOW {}] Calculating the Windows to Open. First one opens at [{}] and closes at [{}]",
269 self.name, o_i, c_sup
270 );
271 }
272
273 while o_i <= t_e {
274 if self.debug_mode {
275 eprintln!(
276 "[WINDOW {}] Computing Window [{},{}) if absent",
277 self.name,
278 o_i,
279 o_i + self.width
280 );
281 }
282
283 let window = WindowInstance::new(o_i, o_i + self.width);
284 self.compute_window_if_absent(window);
285 o_i += self.slide;
286 }
287 }
288
289 fn compute_window_if_absent(&mut self, key: WindowInstance) {
291 self.active_windows
292 .entry(key)
293 .or_insert_with(|| QuadContainer::new(HashSet::new(), 0));
294 }
295
296 pub fn subscribe<F>(&mut self, stream_type: StreamType, callback: F)
298 where
299 F: Fn(QuadContainer) + Send + Sync + 'static,
300 {
301 let callbacks = self.callbacks.entry(stream_type).or_insert_with(Vec::new);
302 callbacks.push(Arc::new(callback));
303 }
304
305 fn emit(&self, stream_type: StreamType, content: QuadContainer) {
307 if let Some(callbacks) = self.callbacks.get(&stream_type) {
308 for callback in callbacks {
309 callback(content.clone());
310 }
311 }
312 }
313
314 pub fn get_content_from_window(&self, timestamp: i64) -> Option<&QuadContainer> {
316 self.get_content(timestamp)
317 }
318
319 pub fn get_active_window_count(&self) -> usize {
321 self.active_windows.len()
322 }
323
324 pub fn get_active_window_ranges(&self) -> Vec<(i64, i64)> {
326 self.active_windows
327 .iter()
328 .map(|(window, _)| (window.open, window.close))
329 .collect()
330 }
331
332 pub fn set_debug_mode(&mut self, enabled: bool) {
334 self.debug_mode = enabled;
335 }
336}
337
338use oxigraph::sparql::QueryResults;
339
340pub fn execute_query<'a>(
341 container: &'a QuadContainer,
342 query: &str,
343) -> Result<QueryResults<'a>, Box<dyn std::error::Error>> {
344 let store = Store::new()?;
345 for quad in &container.elements {
346 store.insert(quad)?;
347 }
348 use oxigraph::sparql::SparqlEvaluator;
349 let results = SparqlEvaluator::new()
350 .parse_query(query)?
351 .on_store(&store)
352 .execute()
353 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
354 Ok(results)
355}