1use std::collections::HashMap;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14fn now_ms() -> i64 {
19 SystemTime::now()
20 .duration_since(UNIX_EPOCH)
21 .map(|d| d.as_millis() as i64)
22 .unwrap_or(0)
23}
24
25#[derive(Debug, Clone)]
36pub struct ViewDefinition {
37 pub name: String,
39 pub sparql_query: String,
41 pub is_materialized: bool,
43 pub dependencies: Vec<String>,
48}
49
50impl ViewDefinition {
51 pub fn new(
53 name: impl Into<String>,
54 sparql_query: impl Into<String>,
55 is_materialized: bool,
56 dependencies: Vec<String>,
57 ) -> Self {
58 Self {
59 name: name.into(),
60 sparql_query: sparql_query.into(),
61 is_materialized,
62 dependencies,
63 }
64 }
65
66 pub fn depends_on(&self, predicate: &str) -> bool {
69 self.dependencies.is_empty() || self.dependencies.iter().any(|p| p.as_str() == predicate)
70 }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
82pub enum DeltaChange {
83 Insert {
85 subject: String,
86 predicate: String,
87 object: String,
88 },
89 Delete {
91 subject: String,
92 predicate: String,
93 object: String,
94 },
95}
96
97impl DeltaChange {
98 pub fn predicate(&self) -> &str {
100 match self {
101 DeltaChange::Insert { predicate, .. } | DeltaChange::Delete { predicate, .. } => {
102 predicate.as_str()
103 }
104 }
105 }
106
107 pub fn subject(&self) -> &str {
109 match self {
110 DeltaChange::Insert { subject, .. } | DeltaChange::Delete { subject, .. } => {
111 subject.as_str()
112 }
113 }
114 }
115
116 pub fn object(&self) -> &str {
118 match self {
119 DeltaChange::Insert { object, .. } | DeltaChange::Delete { object, .. } => {
120 object.as_str()
121 }
122 }
123 }
124
125 pub fn is_insert(&self) -> bool {
127 matches!(self, DeltaChange::Insert { .. })
128 }
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct ViewRow(pub HashMap<String, String>);
139
140impl ViewRow {
141 pub fn new(map: HashMap<String, String>) -> Self {
143 Self(map)
144 }
145
146 pub fn from_pairs(pairs: &[(&str, &str)]) -> Self {
148 Self(
149 pairs
150 .iter()
151 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
152 .collect(),
153 )
154 }
155
156 pub fn get(&self, variable: &str) -> Option<&str> {
158 self.0.get(variable).map(|s| s.as_str())
159 }
160}
161
162pub struct MaterializedView {
171 pub definition: ViewDefinition,
173 pub rows: Vec<ViewRow>,
175 pub last_updated_ms: i64,
177 pub version: u64,
179 pub is_stale: bool,
181}
182
183impl MaterializedView {
184 pub fn new(definition: ViewDefinition, initial_rows: Vec<ViewRow>) -> Self {
189 Self {
190 definition,
191 rows: initial_rows,
192 last_updated_ms: now_ms(),
193 version: 0,
194 is_stale: false,
195 }
196 }
197
198 pub fn refresh(&mut self, new_rows: Vec<ViewRow>) {
201 self.rows = new_rows;
202 self.last_updated_ms = now_ms();
203 self.version += 1;
204 self.is_stale = false;
205 }
206
207 pub fn invalidate(&mut self) {
209 self.is_stale = true;
210 }
211}
212
213pub struct IncrementalViewMaintainer {
234 views: HashMap<String, MaterializedView>,
235 change_queue: Vec<DeltaChange>,
236}
237
238impl IncrementalViewMaintainer {
239 pub fn new() -> Self {
241 Self {
242 views: HashMap::new(),
243 change_queue: Vec::new(),
244 }
245 }
246
247 pub fn register_view(&mut self, def: ViewDefinition, initial_rows: Vec<ViewRow>) {
252 let name = def.name.clone();
253 let view = MaterializedView::new(def, initial_rows);
254 self.views.insert(name, view);
255 }
256
257 pub fn apply_delta(&mut self, change: DeltaChange) -> Vec<String> {
264 let predicate = change.predicate().to_string();
265 let mut invalidated = Vec::new();
266
267 for (name, view) in self.views.iter_mut() {
268 if !view.is_stale && view.definition.depends_on(&predicate) {
269 view.is_stale = true;
270 invalidated.push(name.clone());
271 }
272 }
273
274 invalidated
275 }
276
277 pub fn invalidate_view(&mut self, name: &str) {
281 if let Some(view) = self.views.get_mut(name) {
282 view.invalidate();
283 }
284 }
285
286 pub fn get_view(&self, name: &str) -> Option<&MaterializedView> {
288 self.views.get(name)
289 }
290
291 pub fn list_views(&self) -> Vec<&str> {
293 self.views.keys().map(|s| s.as_str()).collect()
294 }
295
296 pub fn affected_views(&self, predicate: &str) -> Vec<&str> {
301 self.views
302 .iter()
303 .filter(|(_, v)| v.definition.depends_on(predicate))
304 .map(|(name, _)| name.as_str())
305 .collect()
306 }
307
308 pub fn queue_change(&mut self, change: DeltaChange) {
310 self.change_queue.push(change);
311 }
312
313 pub fn flush_changes(&mut self) -> HashMap<String, usize> {
320 let changes: Vec<DeltaChange> = self.change_queue.drain(..).collect();
321 let mut result: HashMap<String, usize> = HashMap::new();
322
323 for change in changes {
324 let invalidated = self.apply_delta(change);
325 for name in invalidated {
326 result.entry(name).or_insert(0);
329 }
330 }
331
332 result
333 }
334
335 pub fn view_count(&self) -> usize {
337 self.views.len()
338 }
339
340 pub fn total_rows(&self) -> usize {
342 self.views
343 .values()
344 .filter(|v| !v.is_stale)
345 .map(|v| v.rows.len())
346 .sum()
347 }
348}
349
350impl Default for IncrementalViewMaintainer {
351 fn default() -> Self {
352 Self::new()
353 }
354}
355
356pub struct ViewStalenessDetector;
363
364impl ViewStalenessDetector {
365 pub fn is_stale(view: &MaterializedView, max_age_ms: i64) -> bool {
368 if view.is_stale {
369 return true;
370 }
371 let age = now_ms() - view.last_updated_ms;
372 age > max_age_ms
373 }
374
375 pub fn stale_views<'a>(
380 views: &[&'a MaterializedView],
381 max_age_ms: i64,
382 ) -> Vec<&'a MaterializedView> {
383 views
384 .iter()
385 .copied()
386 .filter(|v| Self::is_stale(v, max_age_ms))
387 .collect()
388 }
389}
390
391#[cfg(test)]
396mod tests {
397 use super::*;
398 use std::thread;
399 use std::time::Duration;
400
401 fn make_def(name: &str, deps: &[&str], materialized: bool) -> ViewDefinition {
404 ViewDefinition::new(
405 name,
406 format!("SELECT * WHERE {{ ?s <http://p/{}> ?o }}", name),
407 materialized,
408 deps.iter().map(|s| s.to_string()).collect(),
409 )
410 }
411
412 fn rows(n: usize) -> Vec<ViewRow> {
413 (0..n)
414 .map(|i| ViewRow::from_pairs(&[("s", &format!("s{}", i)), ("o", &format!("o{}", i))]))
415 .collect()
416 }
417
418 #[test]
421 fn test_view_definition_depends_on_listed_predicate() {
422 let def = make_def("v", &["http://p/age"], true);
423 assert!(def.depends_on("http://p/age"));
424 assert!(!def.depends_on("http://p/name"));
425 }
426
427 #[test]
428 fn test_view_definition_empty_deps_matches_all() {
429 let def = make_def("v", &[], true);
430 assert!(def.depends_on("http://anything"));
431 }
432
433 #[test]
434 fn test_view_definition_multiple_deps() {
435 let def = make_def("v", &["http://p/age", "http://p/name"], false);
436 assert!(def.depends_on("http://p/age"));
437 assert!(def.depends_on("http://p/name"));
438 assert!(!def.depends_on("http://p/color"));
439 }
440
441 #[test]
444 fn test_delta_change_insert_accessors() {
445 let d = DeltaChange::Insert {
446 subject: "s".into(),
447 predicate: "p".into(),
448 object: "o".into(),
449 };
450 assert_eq!(d.subject(), "s");
451 assert_eq!(d.predicate(), "p");
452 assert_eq!(d.object(), "o");
453 assert!(d.is_insert());
454 }
455
456 #[test]
457 fn test_delta_change_delete_accessors() {
458 let d = DeltaChange::Delete {
459 subject: "s".into(),
460 predicate: "p".into(),
461 object: "o".into(),
462 };
463 assert!(!d.is_insert());
464 }
465
466 #[test]
467 fn test_delta_change_equality() {
468 let a = DeltaChange::Insert {
469 subject: "s".into(),
470 predicate: "p".into(),
471 object: "o".into(),
472 };
473 let b = a.clone();
474 assert_eq!(a, b);
475 }
476
477 #[test]
480 fn test_view_row_get() {
481 let row = ViewRow::from_pairs(&[("x", "Alice"), ("y", "30")]);
482 assert_eq!(row.get("x"), Some("Alice"));
483 assert_eq!(row.get("z"), None);
484 }
485
486 #[test]
489 fn test_materialized_view_initial_state() {
490 let def = make_def("v", &["http://p/age"], true);
491 let mv = MaterializedView::new(def, rows(3));
492 assert_eq!(mv.rows.len(), 3);
493 assert_eq!(mv.version, 0);
494 assert!(!mv.is_stale);
495 }
496
497 #[test]
498 fn test_materialized_view_refresh_bumps_version() {
499 let def = make_def("v", &["http://p/age"], true);
500 let mut mv = MaterializedView::new(def, rows(2));
501 mv.refresh(rows(5));
502 assert_eq!(mv.rows.len(), 5);
503 assert_eq!(mv.version, 1);
504 assert!(!mv.is_stale);
505 }
506
507 #[test]
508 fn test_materialized_view_invalidate_sets_stale() {
509 let def = make_def("v", &["http://p/age"], true);
510 let mut mv = MaterializedView::new(def, rows(2));
511 mv.invalidate();
512 assert!(mv.is_stale);
513 }
514
515 #[test]
518 fn test_maintainer_register_and_count() {
519 let mut m = IncrementalViewMaintainer::new();
520 m.register_view(make_def("v1", &["http://p/age"], true), rows(1));
521 m.register_view(make_def("v2", &["http://p/name"], true), rows(2));
522 assert_eq!(m.view_count(), 2);
523 }
524
525 #[test]
526 fn test_maintainer_apply_delta_invalidates_affected() {
527 let mut m = IncrementalViewMaintainer::new();
528 m.register_view(make_def("v_age", &["http://p/age"], true), rows(2));
529 m.register_view(make_def("v_name", &["http://p/name"], true), rows(3));
530
531 let changed = m.apply_delta(DeltaChange::Insert {
532 subject: "s".into(),
533 predicate: "http://p/age".into(),
534 object: "42".into(),
535 });
536
537 assert_eq!(changed.len(), 1);
538 assert_eq!(changed[0], "v_age");
539 assert!(m.get_view("v_age").map(|v| v.is_stale).unwrap_or(false));
540 assert!(!m.get_view("v_name").map(|v| v.is_stale).unwrap_or(true));
541 }
542
543 #[test]
544 fn test_maintainer_apply_delta_already_stale_not_returned_again() {
545 let mut m = IncrementalViewMaintainer::new();
546 m.register_view(make_def("v", &["http://p/age"], true), rows(1));
547
548 m.apply_delta(DeltaChange::Insert {
549 subject: "s".into(),
550 predicate: "http://p/age".into(),
551 object: "1".into(),
552 });
553 let changed = m.apply_delta(DeltaChange::Insert {
555 subject: "s2".into(),
556 predicate: "http://p/age".into(),
557 object: "2".into(),
558 });
559 assert!(
560 changed.is_empty(),
561 "Already stale, should not be re-reported"
562 );
563 }
564
565 #[test]
566 fn test_maintainer_apply_delta_universal_dependency() {
567 let mut m = IncrementalViewMaintainer::new();
568 m.register_view(make_def("v_all", &[], true), rows(0)); let changed = m.apply_delta(DeltaChange::Delete {
571 subject: "s".into(),
572 predicate: "http://totally/unknown".into(),
573 object: "o".into(),
574 });
575 assert_eq!(changed.len(), 1);
576 }
577
578 #[test]
579 fn test_maintainer_invalidate_view_by_name() {
580 let mut m = IncrementalViewMaintainer::new();
581 m.register_view(make_def("v", &["http://p/x"], true), rows(1));
582 m.invalidate_view("v");
583 assert!(m.get_view("v").map(|v| v.is_stale).unwrap_or(false));
584 }
585
586 #[test]
587 fn test_maintainer_invalidate_nonexistent_view_is_noop() {
588 let mut m = IncrementalViewMaintainer::new();
589 m.invalidate_view("does_not_exist");
591 }
592
593 #[test]
594 fn test_maintainer_affected_views_returns_matching() {
595 let mut m = IncrementalViewMaintainer::new();
596 m.register_view(make_def("v_age", &["http://p/age"], true), rows(0));
597 m.register_view(make_def("v_name", &["http://p/name"], true), rows(0));
598 m.register_view(make_def("v_all", &[], true), rows(0));
599
600 let mut affected = m.affected_views("http://p/age");
601 affected.sort_unstable();
602 assert!(affected.contains(&"v_age"));
604 assert!(affected.contains(&"v_all"));
605 assert!(!affected.contains(&"v_name"));
606 }
607
608 #[test]
609 fn test_maintainer_queue_and_flush_changes() {
610 let mut m = IncrementalViewMaintainer::new();
611 m.register_view(make_def("v_age", &["http://p/age"], true), rows(3));
612 m.register_view(make_def("v_name", &["http://p/name"], true), rows(2));
613
614 m.queue_change(DeltaChange::Insert {
615 subject: "s1".into(),
616 predicate: "http://p/age".into(),
617 object: "25".into(),
618 });
619 m.queue_change(DeltaChange::Delete {
620 subject: "s2".into(),
621 predicate: "http://p/name".into(),
622 object: "Alice".into(),
623 });
624
625 let result = m.flush_changes();
626 assert_eq!(result.len(), 2);
627 assert!(result.contains_key("v_age"));
628 assert!(result.contains_key("v_name"));
629 assert_eq!(result["v_age"], 0);
631 assert_eq!(result["v_name"], 0);
632 }
633
634 #[test]
635 fn test_maintainer_flush_clears_queue() {
636 let mut m = IncrementalViewMaintainer::new();
637 m.register_view(make_def("v", &["http://p/x"], true), rows(1));
638 m.queue_change(DeltaChange::Insert {
639 subject: "s".into(),
640 predicate: "http://p/x".into(),
641 object: "o".into(),
642 });
643 m.flush_changes();
644 let second = m.flush_changes();
646 assert!(second.is_empty());
647 }
648
649 #[test]
650 fn test_maintainer_total_rows_excludes_stale() {
651 let mut m = IncrementalViewMaintainer::new();
652 m.register_view(make_def("v1", &["http://p/age"], true), rows(5));
653 m.register_view(make_def("v2", &["http://p/name"], true), rows(3));
654
655 assert_eq!(m.total_rows(), 8);
656
657 m.apply_delta(DeltaChange::Insert {
659 subject: "s".into(),
660 predicate: "http://p/age".into(),
661 object: "1".into(),
662 });
663
664 assert_eq!(m.total_rows(), 3); }
666
667 #[test]
668 fn test_maintainer_list_views() {
669 let mut m = IncrementalViewMaintainer::new();
670 m.register_view(make_def("alpha", &[], true), rows(0));
671 m.register_view(make_def("beta", &[], true), rows(0));
672
673 let mut names = m.list_views();
674 names.sort_unstable();
675 assert_eq!(names, vec!["alpha", "beta"]);
676 }
677
678 #[test]
679 fn test_maintainer_replace_existing_view() {
680 let mut m = IncrementalViewMaintainer::new();
681 m.register_view(make_def("v", &["http://p/x"], true), rows(2));
682 m.register_view(make_def("v", &["http://p/x"], true), rows(10));
684 assert_eq!(m.view_count(), 1);
685 assert_eq!(m.get_view("v").map(|v| v.rows.len()), Some(10));
686 }
687
688 #[test]
691 fn test_staleness_detector_explicitly_stale() {
692 let def = make_def("v", &[], true);
693 let mut mv = MaterializedView::new(def, rows(1));
694 mv.is_stale = true;
695 assert!(ViewStalenessDetector::is_stale(&mv, i64::MAX));
697 }
698
699 #[test]
700 fn test_staleness_detector_freshly_created_not_stale() {
701 let def = make_def("v", &[], true);
702 let mv = MaterializedView::new(def, rows(1));
703 assert!(!ViewStalenessDetector::is_stale(&mv, 3_600_000));
705 }
706
707 #[test]
708 fn test_staleness_detector_zero_max_age_always_stale() {
709 let def = make_def("v", &[], true);
710 let mv = MaterializedView::new(def, rows(0));
712 thread::sleep(Duration::from_millis(5));
713 assert!(ViewStalenessDetector::is_stale(&mv, 0));
714 }
715
716 #[test]
717 fn test_staleness_detector_stale_views_filters_correctly() {
718 let def1 = make_def("v1", &[], true);
719 let def2 = make_def("v2", &[], true);
720 let mv1 = MaterializedView::new(def1, rows(0));
721 let mut mv2 = MaterializedView::new(def2, rows(0));
722 mv2.is_stale = true;
723
724 let stale = ViewStalenessDetector::stale_views(&[&mv1, &mv2], 3_600_000);
725 assert_eq!(stale.len(), 1);
726 assert!(stale[0].is_stale);
727 }
728}