Skip to main content

oxirs_core/view/
incremental.rs

1//! Incremental view maintenance for materialized SPARQL views.
2//!
3//! This module provides machinery for keeping materialized views up-to-date as triples are
4//! inserted into or deleted from the dataset, without requiring a full re-computation of the
5//! underlying SPARQL query for every change.
6//!
7//! # Architecture
8//!
9//! The key insight of incremental view maintenance (IVM) is that most triple changes only affect
10//! a small fraction of a materialized view.  The system works as follows:
11//!
12//! 1. A [`ViewDefinition`] captures the SPARQL query and the predicates it accesses.
13//! 2. When triples change, the caller publishes a list of [`TripleDelta`] events to the
14//!    [`ViewManager`].
15//! 3. The manager checks each registered view's `accessed_predicates` against the deltas.
16//! 4. Views that are affected are marked **stale**; the caller is responsible for refreshing them
17//!    (by re-executing the query and calling [`ViewManager::refresh_view`]).
18//!
19//! # Staleness vs Re-computation
20//!
21//! Full incremental re-computation (propagating delta rows through relational operators) is
22//! extremely complex to implement correctly for all SPARQL constructs, especially OPTIONAL, MINUS,
23//! and aggregations.  This implementation therefore uses a **mark-stale** strategy: affected views
24//! are invalidated and the caller must re-run the SPARQL query.  This is still far cheaper than
25//! always re-running all queries because:
26//!
27//! - Only views that *actually* overlap with the changed predicates are re-evaluated.
28//! - Views with no overlap remain fully valid and can serve cached results.
29//!
30//! Future versions may add true delta propagation for simple BGP-only views.
31
32use std::collections::HashMap;
33use std::time::Instant;
34
35/// A change to the triple store.
36///
37/// `Insert` means a new triple `(subject, predicate, object)` was added.
38/// `Delete` means an existing triple was removed.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum TripleDelta {
41    /// A triple was inserted.
42    Insert(String, String, String),
43    /// A triple was deleted.
44    Delete(String, String, String),
45}
46
47impl TripleDelta {
48    /// Return the predicate IRI of this delta.
49    pub fn predicate(&self) -> &str {
50        match self {
51            TripleDelta::Insert(_, p, _) | TripleDelta::Delete(_, p, _) => p.as_str(),
52        }
53    }
54
55    /// Return the subject IRI of this delta.
56    pub fn subject(&self) -> &str {
57        match self {
58            TripleDelta::Insert(s, _, _) | TripleDelta::Delete(s, _, _) => s.as_str(),
59        }
60    }
61
62    /// Return the object value of this delta.
63    pub fn object(&self) -> &str {
64        match self {
65            TripleDelta::Insert(_, _, o) | TripleDelta::Delete(_, _, o) => o.as_str(),
66        }
67    }
68
69    /// Return `true` if this is an insertion.
70    pub fn is_insert(&self) -> bool {
71        matches!(self, TripleDelta::Insert(_, _, _))
72    }
73}
74
75/// Definition of a materialized view.
76///
77/// A view is identified by a unique `id` and backed by a SPARQL SELECT query.  The
78/// `accessed_predicates` list is used to decide which triple deltas can affect this view; if the
79/// list is empty the view is considered to be affected by **all** changes.
80#[derive(Debug, Clone)]
81pub struct ViewDefinition {
82    /// Unique identifier for this view (e.g. a UUID or user-supplied name).
83    pub id: String,
84    /// Human-readable name.
85    pub name: String,
86    /// SPARQL SELECT query whose result set defines this view.
87    pub sparql_query: String,
88    /// Predicate IRIs that this view's query reads.  Used for selective delta propagation.
89    /// If empty, the view is assumed to depend on all predicates.
90    pub accessed_predicates: Vec<String>,
91    /// When this definition was registered.
92    pub created_at: Instant,
93}
94
95impl ViewDefinition {
96    /// Create a new view definition.
97    ///
98    /// `accessed_predicates` should list every predicate IRI that appears in the SPARQL query's
99    /// triple patterns.  An empty list means "depends on everything."
100    pub fn new(
101        id: impl Into<String>,
102        name: impl Into<String>,
103        sparql_query: impl Into<String>,
104        accessed_predicates: Vec<String>,
105    ) -> Self {
106        Self {
107            id: id.into(),
108            name: name.into(),
109            sparql_query: sparql_query.into(),
110            accessed_predicates,
111            created_at: Instant::now(),
112        }
113    }
114}
115
116/// The current materialized state of a SPARQL view.
117///
118/// Each row is a variable binding map `{variable_name → value}`.
119pub struct MaterializedView {
120    /// Definition driving this view.
121    pub definition: ViewDefinition,
122    /// Cached result rows.
123    pub rows: Vec<HashMap<String, String>>,
124    /// Snapshot of `rows.len()` for quick access.
125    pub row_count: usize,
126    /// When the view data was last refreshed.
127    pub last_updated: Instant,
128    /// Cumulative number of times this view has been refreshed.
129    pub update_count: u64,
130    /// Whether the view needs to be re-evaluated before it can be used.
131    pub is_stale: bool,
132}
133
134impl MaterializedView {
135    /// Construct a new `MaterializedView` from a definition and an initial result set.
136    pub fn new(definition: ViewDefinition, initial_rows: Vec<HashMap<String, String>>) -> Self {
137        let row_count = initial_rows.len();
138        Self {
139            definition,
140            rows: initial_rows,
141            row_count,
142            last_updated: Instant::now(),
143            update_count: 0,
144            is_stale: false,
145        }
146    }
147
148    /// Apply a batch of deltas, marking this view stale if any delta affects it.
149    ///
150    /// Returns `true` if the view was newly marked stale by this call.
151    pub fn apply_deltas(&mut self, deltas: &[TripleDelta]) -> bool {
152        if self.is_stale {
153            // Already stale — no need to re-check.
154            return false;
155        }
156        for delta in deltas {
157            if self.is_affected_by(delta) {
158                self.is_stale = true;
159                return true;
160            }
161        }
162        false
163    }
164
165    /// Return `true` if `delta` could affect this view's result set.
166    ///
167    /// A view is considered affected when:
168    /// - Its `accessed_predicates` list is empty (depends on everything), **or**
169    /// - The delta's predicate appears in `accessed_predicates`.
170    pub fn is_affected_by(&self, delta: &TripleDelta) -> bool {
171        if self.definition.accessed_predicates.is_empty() {
172            return true;
173        }
174        let pred = delta.predicate();
175        self.definition
176            .accessed_predicates
177            .iter()
178            .any(|p| p.as_str() == pred)
179    }
180
181    /// Replace the cached rows with `new_rows` and clear the stale flag.
182    pub fn refresh(&mut self, new_rows: Vec<HashMap<String, String>>) {
183        self.row_count = new_rows.len();
184        self.rows = new_rows;
185        self.last_updated = Instant::now();
186        self.update_count += 1;
187        self.is_stale = false;
188    }
189}
190
191/// Manager for a collection of materialized views.
192///
193/// The manager owns all [`MaterializedView`] instances and handles delta propagation.
194pub struct ViewManager {
195    views: HashMap<String, MaterializedView>,
196}
197
198impl ViewManager {
199    /// Create a new, empty view manager.
200    pub fn new() -> Self {
201        Self {
202            views: HashMap::new(),
203        }
204    }
205
206    /// Register a new view with its initial result rows.
207    ///
208    /// If a view with the same `id` already exists it is replaced.
209    pub fn register_view(
210        &mut self,
211        definition: ViewDefinition,
212        initial_rows: Vec<HashMap<String, String>>,
213    ) {
214        let id = definition.id.clone();
215        let view = MaterializedView::new(definition, initial_rows);
216        self.views.insert(id, view);
217    }
218
219    /// Remove a view by its ID.
220    ///
221    /// Returns `true` if the view existed and was removed, `false` otherwise.
222    pub fn drop_view(&mut self, view_id: &str) -> bool {
223        self.views.remove(view_id).is_some()
224    }
225
226    /// Propagate a batch of deltas to all registered views.
227    ///
228    /// Views whose predicate sets overlap with the deltas are marked stale.
229    ///
230    /// Returns the IDs of all views that were newly marked stale by this call.
231    pub fn propagate_deltas(&mut self, deltas: &[TripleDelta]) -> Vec<String> {
232        let mut stale_ids: Vec<String> = Vec::new();
233        for (id, view) in self.views.iter_mut() {
234            if view.apply_deltas(deltas) {
235                stale_ids.push(id.clone());
236            }
237        }
238        stale_ids
239    }
240
241    /// Return the current rows of a view, or `None` if the view does not exist or is stale.
242    pub fn get_view_data(&self, view_id: &str) -> Option<&[HashMap<String, String>]> {
243        let view = self.views.get(view_id)?;
244        if view.is_stale {
245            None
246        } else {
247            Some(&view.rows)
248        }
249    }
250
251    /// Refresh a stale view with freshly-computed result rows.
252    ///
253    /// This clears the stale flag so the view is immediately queryable again.
254    /// Does nothing if the view does not exist.
255    pub fn refresh_view(&mut self, view_id: &str, new_rows: Vec<HashMap<String, String>>) {
256        if let Some(view) = self.views.get_mut(view_id) {
257            view.refresh(new_rows);
258        }
259    }
260
261    /// Return the IDs of all currently stale views.
262    pub fn stale_views(&self) -> Vec<&str> {
263        self.views
264            .iter()
265            .filter(|(_, v)| v.is_stale)
266            .map(|(id, _)| id.as_str())
267            .collect()
268    }
269
270    /// Return the total number of registered views.
271    pub fn view_count(&self) -> usize {
272        self.views.len()
273    }
274
275    /// Return a reference to a view, regardless of stale status.
276    pub fn get_view(&self, view_id: &str) -> Option<&MaterializedView> {
277        self.views.get(view_id)
278    }
279
280    /// Return an iterator over all view IDs.
281    pub fn view_ids(&self) -> impl Iterator<Item = &str> {
282        self.views.keys().map(|s| s.as_str())
283    }
284}
285
286impl Default for ViewManager {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    fn make_row(pairs: &[(&str, &str)]) -> HashMap<String, String> {
297        pairs
298            .iter()
299            .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
300            .collect()
301    }
302
303    fn simple_view(id: &str, predicates: Vec<&str>) -> ViewDefinition {
304        ViewDefinition::new(
305            id,
306            format!("View {}", id),
307            format!("SELECT * WHERE {{ ?s <http://p/{}> ?o }}", id),
308            predicates.into_iter().map(|s| s.to_string()).collect(),
309        )
310    }
311
312    // --- TripleDelta tests ---
313
314    #[test]
315    fn test_triple_delta_predicate() {
316        let d = TripleDelta::Insert("s".into(), "p".into(), "o".into());
317        assert_eq!(d.predicate(), "p");
318        assert_eq!(d.subject(), "s");
319        assert_eq!(d.object(), "o");
320        assert!(d.is_insert());
321    }
322
323    #[test]
324    fn test_triple_delta_delete() {
325        let d = TripleDelta::Delete("s".into(), "p".into(), "o".into());
326        assert!(!d.is_insert());
327    }
328
329    #[test]
330    fn test_triple_delta_equality() {
331        let a = TripleDelta::Insert("s".into(), "p".into(), "o".into());
332        let b = TripleDelta::Insert("s".into(), "p".into(), "o".into());
333        let c = TripleDelta::Delete("s".into(), "p".into(), "o".into());
334        assert_eq!(a, b);
335        assert_ne!(a, c);
336    }
337
338    // --- ViewDefinition tests ---
339
340    #[test]
341    fn test_view_definition_creation() {
342        let def = ViewDefinition::new(
343            "v1",
344            "My View",
345            "SELECT ?s WHERE { ?s <http://p/name> ?o }",
346            vec!["http://p/name".to_string()],
347        );
348        assert_eq!(def.id, "v1");
349        assert_eq!(def.accessed_predicates.len(), 1);
350    }
351
352    // --- MaterializedView tests ---
353
354    #[test]
355    fn test_materialized_view_not_stale_initially() {
356        let def = simple_view("v1", vec!["http://p/age"]);
357        let rows = vec![make_row(&[("s", "Alice"), ("o", "30")])];
358        let view = MaterializedView::new(def, rows);
359        assert!(!view.is_stale);
360        assert_eq!(view.row_count, 1);
361    }
362
363    #[test]
364    fn test_is_affected_by_matching_predicate() {
365        let def = simple_view("v1", vec!["http://p/age"]);
366        let view = MaterializedView::new(def, vec![]);
367
368        let delta = TripleDelta::Insert("s".into(), "http://p/age".into(), "25".into());
369        assert!(view.is_affected_by(&delta));
370    }
371
372    #[test]
373    fn test_is_affected_by_non_matching_predicate() {
374        let def = simple_view("v1", vec!["http://p/age"]);
375        let view = MaterializedView::new(def, vec![]);
376
377        let delta = TripleDelta::Insert("s".into(), "http://p/name".into(), "Alice".into());
378        assert!(!view.is_affected_by(&delta));
379    }
380
381    #[test]
382    fn test_is_affected_by_empty_predicates_matches_all() {
383        let def = simple_view("v1", vec![]); // empty = depends on everything
384        let view = MaterializedView::new(def, vec![]);
385
386        let delta = TripleDelta::Insert("s".into(), "http://any/predicate".into(), "o".into());
387        assert!(view.is_affected_by(&delta));
388    }
389
390    #[test]
391    fn test_apply_deltas_marks_stale() {
392        let def = simple_view("v1", vec!["http://p/age"]);
393        let mut view = MaterializedView::new(def, vec![]);
394
395        let deltas = vec![TripleDelta::Insert(
396            "Alice".into(),
397            "http://p/age".into(),
398            "30".into(),
399        )];
400        let newly_stale = view.apply_deltas(&deltas);
401        assert!(newly_stale);
402        assert!(view.is_stale);
403    }
404
405    #[test]
406    fn test_apply_deltas_no_effect_different_predicate() {
407        let def = simple_view("v1", vec!["http://p/age"]);
408        let mut view = MaterializedView::new(def, vec![]);
409
410        let deltas = vec![TripleDelta::Insert(
411            "Alice".into(),
412            "http://p/name".into(),
413            "Alice".into(),
414        )];
415        let newly_stale = view.apply_deltas(&deltas);
416        assert!(!newly_stale);
417        assert!(!view.is_stale);
418    }
419
420    #[test]
421    fn test_apply_deltas_already_stale_returns_false() {
422        let def = simple_view("v1", vec!["http://p/age"]);
423        let mut view = MaterializedView::new(def, vec![]);
424        view.is_stale = true; // manually set stale
425
426        let deltas = vec![TripleDelta::Insert(
427            "Alice".into(),
428            "http://p/age".into(),
429            "30".into(),
430        )];
431        let newly_stale = view.apply_deltas(&deltas);
432        assert!(
433            !newly_stale,
434            "Already stale, so apply_deltas should return false"
435        );
436    }
437
438    #[test]
439    fn test_refresh_clears_stale_flag() {
440        let def = simple_view("v1", vec!["http://p/age"]);
441        let mut view = MaterializedView::new(def, vec![]);
442        view.is_stale = true;
443
444        let new_rows = vec![make_row(&[("s", "Bob"), ("o", "42")])];
445        view.refresh(new_rows);
446
447        assert!(!view.is_stale);
448        assert_eq!(view.row_count, 1);
449        assert_eq!(view.update_count, 1);
450    }
451
452    // --- ViewManager tests ---
453
454    #[test]
455    fn test_view_manager_register_and_count() {
456        let mut mgr = ViewManager::new();
457        mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
458        mgr.register_view(simple_view("v2", vec!["http://p/name"]), vec![]);
459        assert_eq!(mgr.view_count(), 2);
460    }
461
462    #[test]
463    fn test_view_manager_drop_view() {
464        let mut mgr = ViewManager::new();
465        mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
466        assert!(mgr.drop_view("v1"));
467        assert!(!mgr.drop_view("v1")); // already gone
468        assert_eq!(mgr.view_count(), 0);
469    }
470
471    #[test]
472    fn test_view_manager_propagate_deltas_selective() {
473        let mut mgr = ViewManager::new();
474        mgr.register_view(simple_view("v_age", vec!["http://p/age"]), vec![]);
475        mgr.register_view(simple_view("v_name", vec!["http://p/name"]), vec![]);
476
477        let deltas = vec![TripleDelta::Insert(
478            "Alice".into(),
479            "http://p/age".into(),
480            "30".into(),
481        )];
482        let stale = mgr.propagate_deltas(&deltas);
483        assert_eq!(stale.len(), 1);
484        assert_eq!(stale[0], "v_age");
485
486        // v_name should not be stale
487        let all_stale = mgr.stale_views();
488        assert!(all_stale.contains(&"v_age"));
489        assert!(!all_stale.contains(&"v_name"));
490    }
491
492    #[test]
493    fn test_view_manager_propagate_deltas_wildcard_view() {
494        let mut mgr = ViewManager::new();
495        // View with no accessed_predicates matches everything
496        mgr.register_view(simple_view("v_all", vec![]), vec![]);
497
498        let deltas = vec![TripleDelta::Delete(
499            "s".into(),
500            "http://any/pred".into(),
501            "o".into(),
502        )];
503        let stale = mgr.propagate_deltas(&deltas);
504        assert_eq!(stale.len(), 1);
505    }
506
507    #[test]
508    fn test_view_manager_get_view_data_not_stale() {
509        let mut mgr = ViewManager::new();
510        let rows = vec![make_row(&[("s", "Alice")])];
511        mgr.register_view(simple_view("v1", vec!["http://p/age"]), rows.clone());
512
513        let data = mgr.get_view_data("v1");
514        assert!(data.is_some());
515        assert_eq!(data.unwrap().len(), 1);
516    }
517
518    #[test]
519    fn test_view_manager_get_view_data_stale_returns_none() {
520        let mut mgr = ViewManager::new();
521        mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
522
523        // Mark stale
524        let deltas = vec![TripleDelta::Insert(
525            "s".into(),
526            "http://p/age".into(),
527            "99".into(),
528        )];
529        mgr.propagate_deltas(&deltas);
530
531        // get_view_data should return None for stale views
532        assert!(mgr.get_view_data("v1").is_none());
533    }
534
535    #[test]
536    fn test_view_manager_refresh_view() {
537        let mut mgr = ViewManager::new();
538        mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
539
540        // Mark stale
541        mgr.propagate_deltas(&[TripleDelta::Insert(
542            "s".into(),
543            "http://p/age".into(),
544            "10".into(),
545        )]);
546
547        // Refresh with new data
548        let new_rows = vec![
549            make_row(&[("s", "Alice"), ("o", "10")]),
550            make_row(&[("s", "Bob"), ("o", "20")]),
551        ];
552        mgr.refresh_view("v1", new_rows);
553
554        let data = mgr.get_view_data("v1").expect("should have fresh data");
555        assert_eq!(data.len(), 2);
556    }
557
558    #[test]
559    fn test_view_manager_stale_views_empty_initially() {
560        let mut mgr = ViewManager::new();
561        mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
562        assert!(mgr.stale_views().is_empty());
563    }
564
565    #[test]
566    fn test_view_manager_multiple_deltas_one_call() {
567        let mut mgr = ViewManager::new();
568        mgr.register_view(simple_view("v_age", vec!["http://p/age"]), vec![]);
569        mgr.register_view(simple_view("v_name", vec!["http://p/name"]), vec![]);
570        mgr.register_view(simple_view("v_color", vec!["http://p/color"]), vec![]);
571
572        let deltas = vec![
573            TripleDelta::Insert("s1".into(), "http://p/age".into(), "30".into()),
574            TripleDelta::Insert("s2".into(), "http://p/name".into(), "Alice".into()),
575        ];
576        let stale = mgr.propagate_deltas(&deltas);
577        assert_eq!(stale.len(), 2);
578        assert!(mgr.stale_views().contains(&"v_age"));
579        assert!(mgr.stale_views().contains(&"v_name"));
580        assert!(!mgr.stale_views().contains(&"v_color"));
581    }
582
583    #[test]
584    fn test_view_manager_refresh_nonexistent_view_is_noop() {
585        let mut mgr = ViewManager::new();
586        // Should not panic or error
587        mgr.refresh_view("nonexistent", vec![]);
588    }
589
590    #[test]
591    fn test_view_definition_empty_predicates_semantics() {
592        // A view with no accessed_predicates must be affected by ANY delta.
593        let def = ViewDefinition::new("v", "All", "SELECT * WHERE { ?s ?p ?o }", vec![]);
594        let view = MaterializedView::new(def, vec![]);
595        let d = TripleDelta::Delete("s".into(), "http://totally/random".into(), "o".into());
596        assert!(view.is_affected_by(&d));
597    }
598}