hyperstack_server/
materialized_view.rs

1//! Materialized view evaluation for view pipelines.
2//!
3//! This module handles the runtime evaluation of ViewDef pipelines,
4//! maintaining materialized results that update as source data changes.
5
6use crate::cache::EntityCache;
7use serde_json::Value;
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12/// Result of evaluating whether an update affects a materialized view
13#[derive(Debug, Clone, PartialEq)]
14pub enum ViewEffect {
15    /// Update does not affect the view result
16    NoEffect,
17    /// Entity should be added to the view result
18    Add { key: String },
19    /// Entity should be removed from the view result
20    Remove { key: String },
21    /// Entity in view was updated
22    Update { key: String },
23    /// Entity replaces another in the view (for single-result views)
24    Replace { old_key: String, new_key: String },
25}
26
27/// Sort order for view evaluation
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum SortOrder {
30    Asc,
31    Desc,
32}
33
34/// Comparison operators
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum CompareOp {
37    Eq,
38    Ne,
39    Gt,
40    Gte,
41    Lt,
42    Lte,
43}
44
45/// A materialized view that tracks a subset of entities based on a pipeline
46#[derive(Debug)]
47pub struct MaterializedView {
48    /// View identifier
49    pub id: String,
50    /// Source view/entity this derives from
51    pub source_id: String,
52    /// Current set of entity keys in this view's result
53    current_keys: Arc<RwLock<HashSet<String>>>,
54    /// Pipeline configuration (simplified for now)
55    pipeline: ViewPipeline,
56}
57
58#[derive(Debug, Clone, Default)]
59pub struct ViewPipeline {
60    /// Filter predicate (field path, op, value)
61    pub filter: Option<FilterConfig>,
62    /// Sort configuration
63    pub sort: Option<SortConfig>,
64    /// Limit (take N) - if Some(1), treated as single-result view for Replace effects
65    pub limit: Option<usize>,
66}
67
68#[derive(Debug, Clone)]
69pub struct FilterConfig {
70    pub field_path: Vec<String>,
71    pub op: CompareOp,
72    pub value: Value,
73}
74
75#[derive(Debug, Clone)]
76pub struct SortConfig {
77    pub field_path: Vec<String>,
78    pub order: SortOrder,
79}
80
81impl MaterializedView {
82    /// Create a new materialized view
83    pub fn new(id: String, source_id: String, pipeline: ViewPipeline) -> Self {
84        Self {
85            id,
86            source_id,
87            current_keys: Arc::new(RwLock::new(HashSet::new())),
88            pipeline,
89        }
90    }
91
92    /// Get current keys in the view
93    pub async fn get_keys(&self) -> HashSet<String> {
94        self.current_keys.read().await.clone()
95    }
96
97    /// Evaluate initial state from cache
98    pub async fn evaluate_initial(&self, cache: &EntityCache) -> Vec<(String, Value)> {
99        let entities = cache.get_all(&self.source_id).await;
100        self.evaluate_pipeline(entities).await
101    }
102
103    /// Evaluate pipeline on a set of entities
104    async fn evaluate_pipeline(&self, mut entities: Vec<(String, Value)>) -> Vec<(String, Value)> {
105        // Apply filter
106        if let Some(ref filter) = self.pipeline.filter {
107            entities.retain(|(_, v)| self.matches_filter(v, filter));
108        }
109
110        // Apply sort
111        if let Some(ref sort) = self.pipeline.sort {
112            entities.sort_by(|(_, a), (_, b)| {
113                let a_val = extract_field(a, &sort.field_path);
114                let b_val = extract_field(b, &sort.field_path);
115                let cmp = compare_values(&a_val, &b_val);
116                match sort.order {
117                    SortOrder::Asc => cmp,
118                    SortOrder::Desc => cmp.reverse(),
119                }
120            });
121        }
122
123        // Apply limit
124        if let Some(limit) = self.pipeline.limit {
125            entities.truncate(limit);
126        }
127
128        // Update current keys
129        let keys: HashSet<String> = entities.iter().map(|(k, _)| k.clone()).collect();
130        *self.current_keys.write().await = keys;
131
132        entities
133    }
134
135    /// Check if an entity matches the filter
136    fn matches_filter(&self, entity: &Value, filter: &FilterConfig) -> bool {
137        let field_val = extract_field(entity, &filter.field_path);
138        match filter.op {
139            CompareOp::Eq => field_val == filter.value,
140            CompareOp::Ne => field_val != filter.value,
141            CompareOp::Gt => {
142                compare_values(&field_val, &filter.value) == std::cmp::Ordering::Greater
143            }
144            CompareOp::Gte => compare_values(&field_val, &filter.value) != std::cmp::Ordering::Less,
145            CompareOp::Lt => compare_values(&field_val, &filter.value) == std::cmp::Ordering::Less,
146            CompareOp::Lte => {
147                compare_values(&field_val, &filter.value) != std::cmp::Ordering::Greater
148            }
149        }
150    }
151
152    /// Determine the effect of an entity update on this view
153    pub async fn compute_effect(
154        &self,
155        key: &str,
156        new_value: Option<&Value>,
157        _cache: &EntityCache,
158    ) -> ViewEffect {
159        let current_keys = self.current_keys.read().await;
160        let was_in_view = current_keys.contains(key);
161        drop(current_keys);
162
163        // Check if entity now matches filter
164        let matches_now = match new_value {
165            Some(v) => {
166                if let Some(ref filter) = self.pipeline.filter {
167                    self.matches_filter(v, filter)
168                } else {
169                    true
170                }
171            }
172            None => false, // Deleted
173        };
174
175        match (was_in_view, matches_now) {
176            (false, true) => {
177                if self.pipeline.limit == Some(1) {
178                    let current_keys = self.current_keys.read().await;
179                    if let Some(current_key) = current_keys.iter().next() {
180                        if current_key != key {
181                            return ViewEffect::Replace {
182                                old_key: current_key.clone(),
183                                new_key: key.to_string(),
184                            };
185                        }
186                    }
187                }
188                ViewEffect::Add {
189                    key: key.to_string(),
190                }
191            }
192            (true, false) => ViewEffect::Remove {
193                key: key.to_string(),
194            },
195            (true, true) => ViewEffect::Update {
196                key: key.to_string(),
197            },
198            (false, false) => ViewEffect::NoEffect,
199        }
200    }
201
202    /// Apply an effect to update the current keys
203    pub async fn apply_effect(&self, effect: &ViewEffect) {
204        let mut keys = self.current_keys.write().await;
205        match effect {
206            ViewEffect::Add { key } => {
207                keys.insert(key.clone());
208            }
209            ViewEffect::Remove { key } => {
210                keys.remove(key);
211            }
212            ViewEffect::Replace { old_key, new_key } => {
213                keys.remove(old_key);
214                keys.insert(new_key.clone());
215            }
216            ViewEffect::Update { .. } | ViewEffect::NoEffect => {}
217        }
218    }
219}
220
221/// Extract a field value from a JSON object using a path
222fn extract_field(value: &Value, path: &[String]) -> Value {
223    let mut current = value;
224    for segment in path {
225        match current.get(segment) {
226            Some(v) => current = v,
227            None => return Value::Null,
228        }
229    }
230    current.clone()
231}
232
233/// Compare two JSON values
234fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
235    match (a, b) {
236        (Value::Number(a), Value::Number(b)) => {
237            let a_f = a.as_f64().unwrap_or(0.0);
238            let b_f = b.as_f64().unwrap_or(0.0);
239            a_f.partial_cmp(&b_f).unwrap_or(std::cmp::Ordering::Equal)
240        }
241        (Value::String(a), Value::String(b)) => a.cmp(b),
242        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
243        _ => std::cmp::Ordering::Equal,
244    }
245}
246
247/// Registry of materialized views
248#[derive(Default)]
249pub struct MaterializedViewRegistry {
250    views: HashMap<String, Arc<MaterializedView>>,
251    /// Map from source view ID to dependent materialized views
252    dependencies: HashMap<String, Vec<String>>,
253}
254
255impl MaterializedViewRegistry {
256    pub fn new() -> Self {
257        Self::default()
258    }
259
260    /// Register a materialized view
261    pub fn register(&mut self, view: MaterializedView) {
262        let view_id = view.id.clone();
263        let source_id = view.source_id.clone();
264
265        self.dependencies
266            .entry(source_id)
267            .or_default()
268            .push(view_id.clone());
269
270        self.views.insert(view_id, Arc::new(view));
271    }
272
273    /// Get a materialized view by ID
274    pub fn get(&self, id: &str) -> Option<Arc<MaterializedView>> {
275        self.views.get(id).cloned()
276    }
277
278    /// Get all views that depend on a source
279    pub fn get_dependents(&self, source_id: &str) -> Vec<Arc<MaterializedView>> {
280        self.dependencies
281            .get(source_id)
282            .map(|ids| {
283                ids.iter()
284                    .filter_map(|id| self.views.get(id).cloned())
285                    .collect()
286            })
287            .unwrap_or_default()
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use serde_json::json;
295
296    #[tokio::test]
297    async fn test_filter_evaluation() {
298        let pipeline = ViewPipeline {
299            filter: Some(FilterConfig {
300                field_path: vec!["status".to_string()],
301                op: CompareOp::Eq,
302                value: json!("active"),
303            }),
304            sort: None,
305            limit: None,
306        };
307
308        let view =
309            MaterializedView::new("test/active".to_string(), "test/list".to_string(), pipeline);
310
311        let entities = vec![
312            ("1".to_string(), json!({"status": "active", "value": 10})),
313            ("2".to_string(), json!({"status": "inactive", "value": 20})),
314            ("3".to_string(), json!({"status": "active", "value": 30})),
315        ];
316
317        let result = view.evaluate_pipeline(entities).await;
318        assert_eq!(result.len(), 2);
319        assert_eq!(result[0].0, "1");
320        assert_eq!(result[1].0, "3");
321    }
322
323    #[tokio::test]
324    async fn test_sort_and_limit() {
325        let pipeline = ViewPipeline {
326            filter: None,
327            sort: Some(SortConfig {
328                field_path: vec!["value".to_string()],
329                order: SortOrder::Desc,
330            }),
331            limit: Some(2),
332        };
333
334        let view =
335            MaterializedView::new("test/top2".to_string(), "test/list".to_string(), pipeline);
336
337        let entities = vec![
338            ("1".to_string(), json!({"value": 10})),
339            ("2".to_string(), json!({"value": 30})),
340            ("3".to_string(), json!({"value": 20})),
341        ];
342
343        let result = view.evaluate_pipeline(entities).await;
344        assert_eq!(result.len(), 2);
345        assert_eq!(result[0].0, "2"); // value: 30
346        assert_eq!(result[1].0, "3"); // value: 20
347    }
348}