use std::collections::HashMap;
use std::time::Instant;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TripleDelta {
Insert(String, String, String),
Delete(String, String, String),
}
impl TripleDelta {
pub fn predicate(&self) -> &str {
match self {
TripleDelta::Insert(_, p, _) | TripleDelta::Delete(_, p, _) => p.as_str(),
}
}
pub fn subject(&self) -> &str {
match self {
TripleDelta::Insert(s, _, _) | TripleDelta::Delete(s, _, _) => s.as_str(),
}
}
pub fn object(&self) -> &str {
match self {
TripleDelta::Insert(_, _, o) | TripleDelta::Delete(_, _, o) => o.as_str(),
}
}
pub fn is_insert(&self) -> bool {
matches!(self, TripleDelta::Insert(_, _, _))
}
}
#[derive(Debug, Clone)]
pub struct ViewDefinition {
pub id: String,
pub name: String,
pub sparql_query: String,
pub accessed_predicates: Vec<String>,
pub created_at: Instant,
}
impl ViewDefinition {
pub fn new(
id: impl Into<String>,
name: impl Into<String>,
sparql_query: impl Into<String>,
accessed_predicates: Vec<String>,
) -> Self {
Self {
id: id.into(),
name: name.into(),
sparql_query: sparql_query.into(),
accessed_predicates,
created_at: Instant::now(),
}
}
}
pub struct MaterializedView {
pub definition: ViewDefinition,
pub rows: Vec<HashMap<String, String>>,
pub row_count: usize,
pub last_updated: Instant,
pub update_count: u64,
pub is_stale: bool,
}
impl MaterializedView {
pub fn new(definition: ViewDefinition, initial_rows: Vec<HashMap<String, String>>) -> Self {
let row_count = initial_rows.len();
Self {
definition,
rows: initial_rows,
row_count,
last_updated: Instant::now(),
update_count: 0,
is_stale: false,
}
}
pub fn apply_deltas(&mut self, deltas: &[TripleDelta]) -> bool {
if self.is_stale {
return false;
}
for delta in deltas {
if self.is_affected_by(delta) {
self.is_stale = true;
return true;
}
}
false
}
pub fn is_affected_by(&self, delta: &TripleDelta) -> bool {
if self.definition.accessed_predicates.is_empty() {
return true;
}
let pred = delta.predicate();
self.definition
.accessed_predicates
.iter()
.any(|p| p.as_str() == pred)
}
pub fn refresh(&mut self, new_rows: Vec<HashMap<String, String>>) {
self.row_count = new_rows.len();
self.rows = new_rows;
self.last_updated = Instant::now();
self.update_count += 1;
self.is_stale = false;
}
}
pub struct ViewManager {
views: HashMap<String, MaterializedView>,
}
impl ViewManager {
pub fn new() -> Self {
Self {
views: HashMap::new(),
}
}
pub fn register_view(
&mut self,
definition: ViewDefinition,
initial_rows: Vec<HashMap<String, String>>,
) {
let id = definition.id.clone();
let view = MaterializedView::new(definition, initial_rows);
self.views.insert(id, view);
}
pub fn drop_view(&mut self, view_id: &str) -> bool {
self.views.remove(view_id).is_some()
}
pub fn propagate_deltas(&mut self, deltas: &[TripleDelta]) -> Vec<String> {
let mut stale_ids: Vec<String> = Vec::new();
for (id, view) in self.views.iter_mut() {
if view.apply_deltas(deltas) {
stale_ids.push(id.clone());
}
}
stale_ids
}
pub fn get_view_data(&self, view_id: &str) -> Option<&[HashMap<String, String>]> {
let view = self.views.get(view_id)?;
if view.is_stale {
None
} else {
Some(&view.rows)
}
}
pub fn refresh_view(&mut self, view_id: &str, new_rows: Vec<HashMap<String, String>>) {
if let Some(view) = self.views.get_mut(view_id) {
view.refresh(new_rows);
}
}
pub fn stale_views(&self) -> Vec<&str> {
self.views
.iter()
.filter(|(_, v)| v.is_stale)
.map(|(id, _)| id.as_str())
.collect()
}
pub fn view_count(&self) -> usize {
self.views.len()
}
pub fn get_view(&self, view_id: &str) -> Option<&MaterializedView> {
self.views.get(view_id)
}
pub fn view_ids(&self) -> impl Iterator<Item = &str> {
self.views.keys().map(|s| s.as_str())
}
}
impl Default for ViewManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_row(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect()
}
fn simple_view(id: &str, predicates: Vec<&str>) -> ViewDefinition {
ViewDefinition::new(
id,
format!("View {}", id),
format!("SELECT * WHERE {{ ?s <http://p/{}> ?o }}", id),
predicates.into_iter().map(|s| s.to_string()).collect(),
)
}
#[test]
fn test_triple_delta_predicate() {
let d = TripleDelta::Insert("s".into(), "p".into(), "o".into());
assert_eq!(d.predicate(), "p");
assert_eq!(d.subject(), "s");
assert_eq!(d.object(), "o");
assert!(d.is_insert());
}
#[test]
fn test_triple_delta_delete() {
let d = TripleDelta::Delete("s".into(), "p".into(), "o".into());
assert!(!d.is_insert());
}
#[test]
fn test_triple_delta_equality() {
let a = TripleDelta::Insert("s".into(), "p".into(), "o".into());
let b = TripleDelta::Insert("s".into(), "p".into(), "o".into());
let c = TripleDelta::Delete("s".into(), "p".into(), "o".into());
assert_eq!(a, b);
assert_ne!(a, c);
}
#[test]
fn test_view_definition_creation() {
let def = ViewDefinition::new(
"v1",
"My View",
"SELECT ?s WHERE { ?s <http://p/name> ?o }",
vec!["http://p/name".to_string()],
);
assert_eq!(def.id, "v1");
assert_eq!(def.accessed_predicates.len(), 1);
}
#[test]
fn test_materialized_view_not_stale_initially() {
let def = simple_view("v1", vec!["http://p/age"]);
let rows = vec![make_row(&[("s", "Alice"), ("o", "30")])];
let view = MaterializedView::new(def, rows);
assert!(!view.is_stale);
assert_eq!(view.row_count, 1);
}
#[test]
fn test_is_affected_by_matching_predicate() {
let def = simple_view("v1", vec!["http://p/age"]);
let view = MaterializedView::new(def, vec![]);
let delta = TripleDelta::Insert("s".into(), "http://p/age".into(), "25".into());
assert!(view.is_affected_by(&delta));
}
#[test]
fn test_is_affected_by_non_matching_predicate() {
let def = simple_view("v1", vec!["http://p/age"]);
let view = MaterializedView::new(def, vec![]);
let delta = TripleDelta::Insert("s".into(), "http://p/name".into(), "Alice".into());
assert!(!view.is_affected_by(&delta));
}
#[test]
fn test_is_affected_by_empty_predicates_matches_all() {
let def = simple_view("v1", vec![]); let view = MaterializedView::new(def, vec![]);
let delta = TripleDelta::Insert("s".into(), "http://any/predicate".into(), "o".into());
assert!(view.is_affected_by(&delta));
}
#[test]
fn test_apply_deltas_marks_stale() {
let def = simple_view("v1", vec!["http://p/age"]);
let mut view = MaterializedView::new(def, vec![]);
let deltas = vec![TripleDelta::Insert(
"Alice".into(),
"http://p/age".into(),
"30".into(),
)];
let newly_stale = view.apply_deltas(&deltas);
assert!(newly_stale);
assert!(view.is_stale);
}
#[test]
fn test_apply_deltas_no_effect_different_predicate() {
let def = simple_view("v1", vec!["http://p/age"]);
let mut view = MaterializedView::new(def, vec![]);
let deltas = vec![TripleDelta::Insert(
"Alice".into(),
"http://p/name".into(),
"Alice".into(),
)];
let newly_stale = view.apply_deltas(&deltas);
assert!(!newly_stale);
assert!(!view.is_stale);
}
#[test]
fn test_apply_deltas_already_stale_returns_false() {
let def = simple_view("v1", vec!["http://p/age"]);
let mut view = MaterializedView::new(def, vec![]);
view.is_stale = true;
let deltas = vec![TripleDelta::Insert(
"Alice".into(),
"http://p/age".into(),
"30".into(),
)];
let newly_stale = view.apply_deltas(&deltas);
assert!(
!newly_stale,
"Already stale, so apply_deltas should return false"
);
}
#[test]
fn test_refresh_clears_stale_flag() {
let def = simple_view("v1", vec!["http://p/age"]);
let mut view = MaterializedView::new(def, vec![]);
view.is_stale = true;
let new_rows = vec![make_row(&[("s", "Bob"), ("o", "42")])];
view.refresh(new_rows);
assert!(!view.is_stale);
assert_eq!(view.row_count, 1);
assert_eq!(view.update_count, 1);
}
#[test]
fn test_view_manager_register_and_count() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
mgr.register_view(simple_view("v2", vec!["http://p/name"]), vec![]);
assert_eq!(mgr.view_count(), 2);
}
#[test]
fn test_view_manager_drop_view() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
assert!(mgr.drop_view("v1"));
assert!(!mgr.drop_view("v1")); assert_eq!(mgr.view_count(), 0);
}
#[test]
fn test_view_manager_propagate_deltas_selective() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v_age", vec!["http://p/age"]), vec![]);
mgr.register_view(simple_view("v_name", vec!["http://p/name"]), vec![]);
let deltas = vec![TripleDelta::Insert(
"Alice".into(),
"http://p/age".into(),
"30".into(),
)];
let stale = mgr.propagate_deltas(&deltas);
assert_eq!(stale.len(), 1);
assert_eq!(stale[0], "v_age");
let all_stale = mgr.stale_views();
assert!(all_stale.contains(&"v_age"));
assert!(!all_stale.contains(&"v_name"));
}
#[test]
fn test_view_manager_propagate_deltas_wildcard_view() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v_all", vec![]), vec![]);
let deltas = vec![TripleDelta::Delete(
"s".into(),
"http://any/pred".into(),
"o".into(),
)];
let stale = mgr.propagate_deltas(&deltas);
assert_eq!(stale.len(), 1);
}
#[test]
fn test_view_manager_get_view_data_not_stale() {
let mut mgr = ViewManager::new();
let rows = vec![make_row(&[("s", "Alice")])];
mgr.register_view(simple_view("v1", vec!["http://p/age"]), rows.clone());
let data = mgr.get_view_data("v1");
assert!(data.is_some());
assert_eq!(data.expect("data should be available").len(), 1);
}
#[test]
fn test_view_manager_get_view_data_stale_returns_none() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
let deltas = vec![TripleDelta::Insert(
"s".into(),
"http://p/age".into(),
"99".into(),
)];
mgr.propagate_deltas(&deltas);
assert!(mgr.get_view_data("v1").is_none());
}
#[test]
fn test_view_manager_refresh_view() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
mgr.propagate_deltas(&[TripleDelta::Insert(
"s".into(),
"http://p/age".into(),
"10".into(),
)]);
let new_rows = vec![
make_row(&[("s", "Alice"), ("o", "10")]),
make_row(&[("s", "Bob"), ("o", "20")]),
];
mgr.refresh_view("v1", new_rows);
let data = mgr.get_view_data("v1").expect("should have fresh data");
assert_eq!(data.len(), 2);
}
#[test]
fn test_view_manager_stale_views_empty_initially() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v1", vec!["http://p/age"]), vec![]);
assert!(mgr.stale_views().is_empty());
}
#[test]
fn test_view_manager_multiple_deltas_one_call() {
let mut mgr = ViewManager::new();
mgr.register_view(simple_view("v_age", vec!["http://p/age"]), vec![]);
mgr.register_view(simple_view("v_name", vec!["http://p/name"]), vec![]);
mgr.register_view(simple_view("v_color", vec!["http://p/color"]), vec![]);
let deltas = vec![
TripleDelta::Insert("s1".into(), "http://p/age".into(), "30".into()),
TripleDelta::Insert("s2".into(), "http://p/name".into(), "Alice".into()),
];
let stale = mgr.propagate_deltas(&deltas);
assert_eq!(stale.len(), 2);
assert!(mgr.stale_views().contains(&"v_age"));
assert!(mgr.stale_views().contains(&"v_name"));
assert!(!mgr.stale_views().contains(&"v_color"));
}
#[test]
fn test_view_manager_refresh_nonexistent_view_is_noop() {
let mut mgr = ViewManager::new();
mgr.refresh_view("nonexistent", vec![]);
}
#[test]
fn test_view_definition_empty_predicates_semantics() {
let def = ViewDefinition::new("v", "All", "SELECT * WHERE { ?s ?p ?o }", vec![]);
let view = MaterializedView::new(def, vec![]);
let d = TripleDelta::Delete("s".into(), "http://totally/random".into(), "o".into());
assert!(view.is_affected_by(&d));
}
}