rsp_rs/windowing/
csparql_window.rs

1use crate::{QuadContainer, WindowInstance};
2use oxigraph::model::Quad;
3use oxigraph::store::Store;
4use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6
7/// Report strategy for window content emission
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ReportStrategy {
10    NonEmptyContent,
11    OnContentChange,
12    OnWindowClose,
13    Periodic,
14}
15
16/// Tick mechanism for window progression
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum Tick {
19    TimeDriven,
20    TupleDriven,
21    BatchDriven,
22}
23
24/// Output stream type
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum StreamType {
27    RStream,
28    IStream,
29    DStream,
30}
31
32/// Callback type for window content emission
33pub type WindowCallback = Arc<dyn Fn(QuadContainer) + Send + Sync>;
34
35/// CSPARQL Window implementation
36pub struct CSPARQLWindow {
37    pub name: String,
38    pub width: i64,
39    pub slide: i64,
40    pub time: i64,
41    pub t0: i64,
42    pub active_windows: HashMap<WindowInstance, QuadContainer>,
43    pub report: ReportStrategy,
44    pub tick: Tick,
45    callbacks: HashMap<StreamType, Vec<WindowCallback>>,
46    pub debug_mode: bool,
47}
48
49impl CSPARQLWindow {
50    pub fn new(
51        name: String,
52        width: i64,
53        slide: i64,
54        report: ReportStrategy,
55        tick: Tick,
56        start_time: i64,
57    ) -> Self {
58        Self {
59            name,
60            width,
61            slide,
62            report,
63            tick,
64            time: start_time,
65            t0: start_time,
66            active_windows: HashMap::new(),
67            callbacks: HashMap::new(),
68            debug_mode: false,
69        }
70    }
71
72    /// Get window content at a specific timestamp
73    /// Returns the window with the smallest close time that contains the timestamp
74    pub fn get_content(&self, timestamp: i64) -> Option<&QuadContainer> {
75        let mut max_window: Option<&WindowInstance> = None;
76        let mut max_time = i64::MAX;
77
78        for (window, _container) in &self.active_windows {
79            if window.open <= timestamp && timestamp <= window.close {
80                if window.close < max_time {
81                    max_time = window.close;
82                    max_window = Some(window);
83                }
84            }
85        }
86
87        max_window.and_then(|w| self.active_windows.get(w))
88    }
89
90    /// Add a quad to the window at the given timestamp
91    pub fn add(&mut self, quad: Quad, timestamp: i64) {
92        if self.debug_mode {
93            eprintln!(
94                "[WINDOW {}] Received element ({:?},{}) ",
95                self.name, quad, timestamp
96            );
97        }
98
99        // Create a new quad with the window's graph name
100        // This ensures the quad's graph matches the SPARQL query's GRAPH clause
101        let quad_with_window_graph = oxigraph::model::Quad::new(
102            quad.subject.clone(),
103            quad.predicate.clone(),
104            quad.object.clone(),
105            oxigraph::model::GraphName::NamedNode(
106                oxigraph::model::NamedNode::new(&self.name).unwrap_or_else(|_| {
107                    // Fallback if window name isn't a valid IRI
108                    oxigraph::model::NamedNode::new("http://default-window").unwrap()
109                }),
110            ),
111        );
112
113        let mut to_evict = Vec::new();
114        let t_e = timestamp;
115
116        if self.time > t_e {
117            eprintln!("OUT OF ORDER NOT HANDLED");
118        }
119
120        self.scope(t_e);
121
122        // Add element to appropriate windows
123        for (window, container) in &mut self.active_windows {
124            if self.debug_mode {
125                eprintln!(
126                    "[WINDOW {}] Processing Window [{},{}) for element ({:?},{})",
127                    self.name, window.open, window.close, quad_with_window_graph, timestamp
128                );
129            }
130
131            if window.open <= t_e && t_e < window.close {
132                if self.debug_mode {
133                    eprintln!(
134                        "[WINDOW {}] Adding element to Window [{},{})",
135                        self.name, window.open, window.close
136                    );
137                }
138                container.add(quad_with_window_graph.clone(), timestamp);
139                if self.debug_mode {
140                    eprintln!(
141                        "[WINDOW {}] Window [{},{}) now has {} quads",
142                        self.name,
143                        window.open,
144                        window.close,
145                        container.len()
146                    );
147                }
148            } else if t_e >= window.close {
149                if self.debug_mode {
150                    eprintln!(
151                        "[WINDOW {}] Scheduling for Eviction [{},{})",
152                        self.name, window.open, window.close
153                    );
154                }
155                // Don't add to eviction list yet - windows need to report before being evicted
156                // to_evict.push(window.clone());
157            }
158        }
159
160        // Find the window to report
161        if self.debug_mode {
162            eprintln!(
163                "[WINDOW {}] Active windows before reporting check: {}",
164                self.name,
165                self.active_windows.len()
166            );
167        }
168
169        let mut max_window: Option<WindowInstance> = None;
170        let mut max_time = 0i64;
171
172        for (window, container) in &self.active_windows {
173            if self.compute_report(window, container, timestamp) {
174                if self.debug_mode {
175                    eprintln!(
176                        "[WINDOW {}] Window [{},{}) should report (has {} quads)",
177                        self.name,
178                        window.open,
179                        window.close,
180                        container.len()
181                    );
182                }
183                if window.close > max_time {
184                    max_time = window.close;
185                    max_window = Some(window.clone());
186                }
187                // Mark window for eviction after it reports
188                to_evict.push(window.clone());
189            }
190        }
191
192        // Emit window content if conditions are met
193        if let Some(window) = max_window {
194            if self.debug_mode {
195                eprintln!(
196                    "[WINDOW {}] Max window selected for reporting: [{},{})",
197                    self.name, window.open, window.close
198                );
199            }
200            if self.tick == Tick::TimeDriven {
201                if timestamp > self.time {
202                    self.time = timestamp;
203                    if let Some(content) = self.active_windows.get(&window) {
204                        if self.debug_mode {
205                            eprintln!(
206                                "[WINDOW {}] Emitting {} quads at t={} for window [{},{})",
207                                self.name,
208                                content.len(),
209                                timestamp,
210                                window.open,
211                                window.close
212                            );
213                        }
214                        self.emit(StreamType::RStream, content.clone());
215                    } else {
216                        if self.debug_mode {
217                            eprintln!(
218                                "[WINDOW {}] ERROR: Window [{},{}) not found in active_windows!",
219                                self.name, window.open, window.close
220                            );
221                        }
222                    }
223                }
224            }
225        }
226
227        // Evict old windows
228        for window in to_evict {
229            if self.debug_mode {
230                eprintln!(
231                    "[WINDOW {}] Evicting [{},{})",
232                    self.name, window.open, window.close
233                );
234            }
235            self.active_windows.remove(&window);
236        }
237    }
238
239    /// Compute whether to report this window based on the report strategy
240    fn compute_report(
241        &self,
242        window: &WindowInstance,
243        _content: &QuadContainer,
244        timestamp: i64,
245    ) -> bool {
246        match self.report {
247            ReportStrategy::OnWindowClose => window.close < timestamp,
248            ReportStrategy::NonEmptyContent => !_content.is_empty(),
249            ReportStrategy::OnContentChange => true, // TODO : Tracking content changes needed here but for now always true as a placeholder for future implementation
250            ReportStrategy::Periodic => true, // TODO : Implement periodic reporting logic here as content is always true for now
251        }
252    }
253
254    /// Calculate and create windows based on the event time
255    pub fn scope(&mut self, t_e: i64) {
256        if self.t0 == 0 {
257            self.t0 = t_e;
258        }
259
260        // Use integer arithmetic to avoid precision loss with large timestamps
261        // This computes ceiling division: ceil(dividend / divisor) = (dividend + divisor - 1) / divisor
262        let delta = (t_e - self.t0).abs();
263        let c_sup = self.t0 + ((delta + self.slide - 1) / self.slide) * self.slide;
264        let mut o_i = c_sup - self.width;
265
266        if self.debug_mode {
267            eprintln!(
268                "[WINDOW {}] Calculating the Windows to Open. First one opens at [{}] and closes at [{}]",
269                self.name, o_i, c_sup
270            );
271        }
272
273        while o_i <= t_e {
274            if self.debug_mode {
275                eprintln!(
276                    "[WINDOW {}] Computing Window [{},{}) if absent",
277                    self.name,
278                    o_i,
279                    o_i + self.width
280                );
281            }
282
283            let window = WindowInstance::new(o_i, o_i + self.width);
284            self.compute_window_if_absent(window);
285            o_i += self.slide;
286        }
287    }
288
289    /// Add window if it doesn't already exist
290    fn compute_window_if_absent(&mut self, key: WindowInstance) {
291        self.active_windows
292            .entry(key)
293            .or_insert_with(|| QuadContainer::new(HashSet::new(), 0));
294    }
295
296    /// Subscribe a callback to window emissions
297    pub fn subscribe<F>(&mut self, stream_type: StreamType, callback: F)
298    where
299        F: Fn(QuadContainer) + Send + Sync + 'static,
300    {
301        let callbacks = self.callbacks.entry(stream_type).or_insert_with(Vec::new);
302        callbacks.push(Arc::new(callback));
303    }
304
305    /// Emit window content to subscribers
306    fn emit(&self, stream_type: StreamType, content: QuadContainer) {
307        if let Some(callbacks) = self.callbacks.get(&stream_type) {
308            for callback in callbacks {
309                callback(content.clone());
310            }
311        }
312    }
313
314    /// Get content from window at specific timestamp (alternative method name for compatibility)
315    pub fn get_content_from_window(&self, timestamp: i64) -> Option<&QuadContainer> {
316        self.get_content(timestamp)
317    }
318
319    /// Get the current number of active windows
320    pub fn get_active_window_count(&self) -> usize {
321        self.active_windows.len()
322    }
323
324    /// Get the timestamp range of active windows
325    pub fn get_active_window_ranges(&self) -> Vec<(i64, i64)> {
326        self.active_windows
327            .iter()
328            .map(|(window, _)| (window.open, window.close))
329            .collect()
330    }
331
332    /// Enable or disable debug mode for verbose logging
333    pub fn set_debug_mode(&mut self, enabled: bool) {
334        self.debug_mode = enabled;
335    }
336}
337
338use oxigraph::sparql::QueryResults;
339
340pub fn execute_query<'a>(
341    container: &'a QuadContainer,
342    query: &str,
343) -> Result<QueryResults<'a>, Box<dyn std::error::Error>> {
344    let store = Store::new()?;
345    for quad in &container.elements {
346        store.insert(quad)?;
347    }
348    use oxigraph::sparql::SparqlEvaluator;
349    let results = SparqlEvaluator::new()
350        .parse_query(query)?
351        .on_store(&store)
352        .execute()
353        .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
354    Ok(results)
355}