use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[derive(Debug, Clone)]
pub struct ViewDefinition {
pub name: String,
pub sparql_query: String,
pub is_materialized: bool,
pub dependencies: Vec<String>,
}
impl ViewDefinition {
pub fn new(
name: impl Into<String>,
sparql_query: impl Into<String>,
is_materialized: bool,
dependencies: Vec<String>,
) -> Self {
Self {
name: name.into(),
sparql_query: sparql_query.into(),
is_materialized,
dependencies,
}
}
pub fn depends_on(&self, predicate: &str) -> bool {
self.dependencies.is_empty() || self.dependencies.iter().any(|p| p.as_str() == predicate)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeltaChange {
Insert {
subject: String,
predicate: String,
object: String,
},
Delete {
subject: String,
predicate: String,
object: String,
},
}
impl DeltaChange {
pub fn predicate(&self) -> &str {
match self {
DeltaChange::Insert { predicate, .. } | DeltaChange::Delete { predicate, .. } => {
predicate.as_str()
}
}
}
pub fn subject(&self) -> &str {
match self {
DeltaChange::Insert { subject, .. } | DeltaChange::Delete { subject, .. } => {
subject.as_str()
}
}
}
pub fn object(&self) -> &str {
match self {
DeltaChange::Insert { object, .. } | DeltaChange::Delete { object, .. } => {
object.as_str()
}
}
}
pub fn is_insert(&self) -> bool {
matches!(self, DeltaChange::Insert { .. })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ViewRow(pub HashMap<String, String>);
impl ViewRow {
pub fn new(map: HashMap<String, String>) -> Self {
Self(map)
}
pub fn from_pairs(pairs: &[(&str, &str)]) -> Self {
Self(
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect(),
)
}
pub fn get(&self, variable: &str) -> Option<&str> {
self.0.get(variable).map(|s| s.as_str())
}
}
pub struct MaterializedView {
pub definition: ViewDefinition,
pub rows: Vec<ViewRow>,
pub last_updated_ms: i64,
pub version: u64,
pub is_stale: bool,
}
impl MaterializedView {
pub fn new(definition: ViewDefinition, initial_rows: Vec<ViewRow>) -> Self {
Self {
definition,
rows: initial_rows,
last_updated_ms: now_ms(),
version: 0,
is_stale: false,
}
}
pub fn refresh(&mut self, new_rows: Vec<ViewRow>) {
self.rows = new_rows;
self.last_updated_ms = now_ms();
self.version += 1;
self.is_stale = false;
}
pub fn invalidate(&mut self) {
self.is_stale = true;
}
}
pub struct IncrementalViewMaintainer {
views: HashMap<String, MaterializedView>,
change_queue: Vec<DeltaChange>,
}
impl IncrementalViewMaintainer {
pub fn new() -> Self {
Self {
views: HashMap::new(),
change_queue: Vec::new(),
}
}
pub fn register_view(&mut self, def: ViewDefinition, initial_rows: Vec<ViewRow>) {
let name = def.name.clone();
let view = MaterializedView::new(def, initial_rows);
self.views.insert(name, view);
}
pub fn apply_delta(&mut self, change: DeltaChange) -> Vec<String> {
let predicate = change.predicate().to_string();
let mut invalidated = Vec::new();
for (name, view) in self.views.iter_mut() {
if !view.is_stale && view.definition.depends_on(&predicate) {
view.is_stale = true;
invalidated.push(name.clone());
}
}
invalidated
}
pub fn invalidate_view(&mut self, name: &str) {
if let Some(view) = self.views.get_mut(name) {
view.invalidate();
}
}
pub fn get_view(&self, name: &str) -> Option<&MaterializedView> {
self.views.get(name)
}
pub fn list_views(&self) -> Vec<&str> {
self.views.keys().map(|s| s.as_str()).collect()
}
pub fn affected_views(&self, predicate: &str) -> Vec<&str> {
self.views
.iter()
.filter(|(_, v)| v.definition.depends_on(predicate))
.map(|(name, _)| name.as_str())
.collect()
}
pub fn queue_change(&mut self, change: DeltaChange) {
self.change_queue.push(change);
}
pub fn flush_changes(&mut self) -> HashMap<String, usize> {
let changes: Vec<DeltaChange> = self.change_queue.drain(..).collect();
let mut result: HashMap<String, usize> = HashMap::new();
for change in changes {
let invalidated = self.apply_delta(change);
for name in invalidated {
result.entry(name).or_insert(0);
}
}
result
}
pub fn view_count(&self) -> usize {
self.views.len()
}
pub fn total_rows(&self) -> usize {
self.views
.values()
.filter(|v| !v.is_stale)
.map(|v| v.rows.len())
.sum()
}
}
impl Default for IncrementalViewMaintainer {
fn default() -> Self {
Self::new()
}
}
pub struct ViewStalenessDetector;
impl ViewStalenessDetector {
pub fn is_stale(view: &MaterializedView, max_age_ms: i64) -> bool {
if view.is_stale {
return true;
}
let age = now_ms() - view.last_updated_ms;
age > max_age_ms
}
pub fn stale_views<'a>(
views: &[&'a MaterializedView],
max_age_ms: i64,
) -> Vec<&'a MaterializedView> {
views
.iter()
.copied()
.filter(|v| Self::is_stale(v, max_age_ms))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
fn make_def(name: &str, deps: &[&str], materialized: bool) -> ViewDefinition {
ViewDefinition::new(
name,
format!("SELECT * WHERE {{ ?s <http://p/{}> ?o }}", name),
materialized,
deps.iter().map(|s| s.to_string()).collect(),
)
}
fn rows(n: usize) -> Vec<ViewRow> {
(0..n)
.map(|i| ViewRow::from_pairs(&[("s", &format!("s{}", i)), ("o", &format!("o{}", i))]))
.collect()
}
#[test]
fn test_view_definition_depends_on_listed_predicate() {
let def = make_def("v", &["http://p/age"], true);
assert!(def.depends_on("http://p/age"));
assert!(!def.depends_on("http://p/name"));
}
#[test]
fn test_view_definition_empty_deps_matches_all() {
let def = make_def("v", &[], true);
assert!(def.depends_on("http://anything"));
}
#[test]
fn test_view_definition_multiple_deps() {
let def = make_def("v", &["http://p/age", "http://p/name"], false);
assert!(def.depends_on("http://p/age"));
assert!(def.depends_on("http://p/name"));
assert!(!def.depends_on("http://p/color"));
}
#[test]
fn test_delta_change_insert_accessors() {
let d = DeltaChange::Insert {
subject: "s".into(),
predicate: "p".into(),
object: "o".into(),
};
assert_eq!(d.subject(), "s");
assert_eq!(d.predicate(), "p");
assert_eq!(d.object(), "o");
assert!(d.is_insert());
}
#[test]
fn test_delta_change_delete_accessors() {
let d = DeltaChange::Delete {
subject: "s".into(),
predicate: "p".into(),
object: "o".into(),
};
assert!(!d.is_insert());
}
#[test]
fn test_delta_change_equality() {
let a = DeltaChange::Insert {
subject: "s".into(),
predicate: "p".into(),
object: "o".into(),
};
let b = a.clone();
assert_eq!(a, b);
}
#[test]
fn test_view_row_get() {
let row = ViewRow::from_pairs(&[("x", "Alice"), ("y", "30")]);
assert_eq!(row.get("x"), Some("Alice"));
assert_eq!(row.get("z"), None);
}
#[test]
fn test_materialized_view_initial_state() {
let def = make_def("v", &["http://p/age"], true);
let mv = MaterializedView::new(def, rows(3));
assert_eq!(mv.rows.len(), 3);
assert_eq!(mv.version, 0);
assert!(!mv.is_stale);
}
#[test]
fn test_materialized_view_refresh_bumps_version() {
let def = make_def("v", &["http://p/age"], true);
let mut mv = MaterializedView::new(def, rows(2));
mv.refresh(rows(5));
assert_eq!(mv.rows.len(), 5);
assert_eq!(mv.version, 1);
assert!(!mv.is_stale);
}
#[test]
fn test_materialized_view_invalidate_sets_stale() {
let def = make_def("v", &["http://p/age"], true);
let mut mv = MaterializedView::new(def, rows(2));
mv.invalidate();
assert!(mv.is_stale);
}
#[test]
fn test_maintainer_register_and_count() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v1", &["http://p/age"], true), rows(1));
m.register_view(make_def("v2", &["http://p/name"], true), rows(2));
assert_eq!(m.view_count(), 2);
}
#[test]
fn test_maintainer_apply_delta_invalidates_affected() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v_age", &["http://p/age"], true), rows(2));
m.register_view(make_def("v_name", &["http://p/name"], true), rows(3));
let changed = m.apply_delta(DeltaChange::Insert {
subject: "s".into(),
predicate: "http://p/age".into(),
object: "42".into(),
});
assert_eq!(changed.len(), 1);
assert_eq!(changed[0], "v_age");
assert!(m.get_view("v_age").map(|v| v.is_stale).unwrap_or(false));
assert!(!m.get_view("v_name").map(|v| v.is_stale).unwrap_or(true));
}
#[test]
fn test_maintainer_apply_delta_already_stale_not_returned_again() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v", &["http://p/age"], true), rows(1));
m.apply_delta(DeltaChange::Insert {
subject: "s".into(),
predicate: "http://p/age".into(),
object: "1".into(),
});
let changed = m.apply_delta(DeltaChange::Insert {
subject: "s2".into(),
predicate: "http://p/age".into(),
object: "2".into(),
});
assert!(
changed.is_empty(),
"Already stale, should not be re-reported"
);
}
#[test]
fn test_maintainer_apply_delta_universal_dependency() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v_all", &[], true), rows(0));
let changed = m.apply_delta(DeltaChange::Delete {
subject: "s".into(),
predicate: "http://totally/unknown".into(),
object: "o".into(),
});
assert_eq!(changed.len(), 1);
}
#[test]
fn test_maintainer_invalidate_view_by_name() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v", &["http://p/x"], true), rows(1));
m.invalidate_view("v");
assert!(m.get_view("v").map(|v| v.is_stale).unwrap_or(false));
}
#[test]
fn test_maintainer_invalidate_nonexistent_view_is_noop() {
let mut m = IncrementalViewMaintainer::new();
m.invalidate_view("does_not_exist");
}
#[test]
fn test_maintainer_affected_views_returns_matching() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v_age", &["http://p/age"], true), rows(0));
m.register_view(make_def("v_name", &["http://p/name"], true), rows(0));
m.register_view(make_def("v_all", &[], true), rows(0));
let mut affected = m.affected_views("http://p/age");
affected.sort_unstable();
assert!(affected.contains(&"v_age"));
assert!(affected.contains(&"v_all"));
assert!(!affected.contains(&"v_name"));
}
#[test]
fn test_maintainer_queue_and_flush_changes() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v_age", &["http://p/age"], true), rows(3));
m.register_view(make_def("v_name", &["http://p/name"], true), rows(2));
m.queue_change(DeltaChange::Insert {
subject: "s1".into(),
predicate: "http://p/age".into(),
object: "25".into(),
});
m.queue_change(DeltaChange::Delete {
subject: "s2".into(),
predicate: "http://p/name".into(),
object: "Alice".into(),
});
let result = m.flush_changes();
assert_eq!(result.len(), 2);
assert!(result.contains_key("v_age"));
assert!(result.contains_key("v_name"));
assert_eq!(result["v_age"], 0);
assert_eq!(result["v_name"], 0);
}
#[test]
fn test_maintainer_flush_clears_queue() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v", &["http://p/x"], true), rows(1));
m.queue_change(DeltaChange::Insert {
subject: "s".into(),
predicate: "http://p/x".into(),
object: "o".into(),
});
m.flush_changes();
let second = m.flush_changes();
assert!(second.is_empty());
}
#[test]
fn test_maintainer_total_rows_excludes_stale() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v1", &["http://p/age"], true), rows(5));
m.register_view(make_def("v2", &["http://p/name"], true), rows(3));
assert_eq!(m.total_rows(), 8);
m.apply_delta(DeltaChange::Insert {
subject: "s".into(),
predicate: "http://p/age".into(),
object: "1".into(),
});
assert_eq!(m.total_rows(), 3); }
#[test]
fn test_maintainer_list_views() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("alpha", &[], true), rows(0));
m.register_view(make_def("beta", &[], true), rows(0));
let mut names = m.list_views();
names.sort_unstable();
assert_eq!(names, vec!["alpha", "beta"]);
}
#[test]
fn test_maintainer_replace_existing_view() {
let mut m = IncrementalViewMaintainer::new();
m.register_view(make_def("v", &["http://p/x"], true), rows(2));
m.register_view(make_def("v", &["http://p/x"], true), rows(10));
assert_eq!(m.view_count(), 1);
assert_eq!(m.get_view("v").map(|v| v.rows.len()), Some(10));
}
#[test]
fn test_staleness_detector_explicitly_stale() {
let def = make_def("v", &[], true);
let mut mv = MaterializedView::new(def, rows(1));
mv.is_stale = true;
assert!(ViewStalenessDetector::is_stale(&mv, i64::MAX));
}
#[test]
fn test_staleness_detector_freshly_created_not_stale() {
let def = make_def("v", &[], true);
let mv = MaterializedView::new(def, rows(1));
assert!(!ViewStalenessDetector::is_stale(&mv, 3_600_000));
}
#[test]
fn test_staleness_detector_zero_max_age_always_stale() {
let def = make_def("v", &[], true);
let mv = MaterializedView::new(def, rows(0));
thread::sleep(Duration::from_millis(5));
assert!(ViewStalenessDetector::is_stale(&mv, 0));
}
#[test]
fn test_staleness_detector_stale_views_filters_correctly() {
let def1 = make_def("v1", &[], true);
let def2 = make_def("v2", &[], true);
let mv1 = MaterializedView::new(def1, rows(0));
let mut mv2 = MaterializedView::new(def2, rows(0));
mv2.is_stale = true;
let stale = ViewStalenessDetector::stale_views(&[&mv1, &mv2], 3_600_000);
assert_eq!(stale.len(), 1);
assert!(stale[0].is_stale);
}
}