Skip to main content

laminar_core/mv/
registry.rs

1//! Materialized view registry with dependency tracking.
2//!
3//! The registry maintains a directed acyclic graph (DAG) of materialized views,
4//! tracking dependencies between views and ensuring correct processing order.
5
6use super::error::{MvError, MvState};
7use arrow_schema::SchemaRef;
8use fxhash::{FxHashMap, FxHashSet};
9use std::collections::VecDeque;
10
11/// Materialized view definition.
12///
13/// A materialized view is a query result that is stored and incrementally
14/// maintained as its source data changes.
15#[derive(Debug, Clone)]
16pub struct MaterializedView {
17    /// Unique view name.
18    pub name: String,
19    /// SQL definition (for reference and introspection).
20    pub sql: String,
21    /// Input sources (base tables or other MVs).
22    pub sources: Vec<String>,
23    /// Output schema of the view.
24    pub schema: SchemaRef,
25    /// Associated operator ID for event routing.
26    pub operator_id: String,
27    /// Current execution state.
28    pub state: MvState,
29}
30
31impl MaterializedView {
32    /// Creates a new materialized view definition.
33    #[must_use]
34    pub fn new(
35        name: impl Into<String>,
36        sql: impl Into<String>,
37        sources: Vec<String>,
38        schema: SchemaRef,
39    ) -> Self {
40        let name = name.into();
41        let operator_id = format!("mv_{name}");
42        Self {
43            name,
44            sql: sql.into(),
45            sources,
46            schema,
47            operator_id,
48            state: MvState::Running,
49        }
50    }
51
52    /// Creates a simple view with no schema (for testing).
53    #[cfg(test)]
54    pub fn simple(name: impl Into<String>, sources: Vec<String>) -> Self {
55        use arrow_schema::{DataType, Field, Schema};
56        use std::sync::Arc;
57
58        let schema = Arc::new(Schema::new(vec![Field::new(
59            "value",
60            DataType::Int64,
61            false,
62        )]));
63        Self::new(name, "", sources, schema)
64    }
65
66    /// Returns true if this view depends on the given source.
67    #[must_use]
68    pub fn depends_on(&self, source: &str) -> bool {
69        self.sources.iter().any(|s| s == source)
70    }
71}
72
73/// Registry for managing materialized views.
74///
75/// Maintains a DAG of view dependencies and provides:
76/// - Cycle detection on registration
77/// - Topological ordering for correct processing order
78/// - Dependency tracking for cascade operations
79///
80/// # Example
81///
82/// ```rust
83/// use laminar_core::mv::{MvRegistry, MaterializedView};
84/// use arrow_schema::{Schema, Field, DataType};
85/// use std::sync::Arc;
86///
87/// let mut registry = MvRegistry::new();
88///
89/// // Register base tables
90/// registry.register_base_table("trades");
91///
92/// // Register cascading views
93/// let schema = Arc::new(Schema::new(vec![Field::new("count", DataType::Int64, false)]));
94/// let ohlc_1s = MaterializedView::new("ohlc_1s", "SELECT ...", vec!["trades".into()], schema.clone());
95/// registry.register(ohlc_1s).unwrap();
96///
97/// let ohlc_1m = MaterializedView::new("ohlc_1m", "SELECT ...", vec!["ohlc_1s".into()], schema);
98/// registry.register(ohlc_1m).unwrap();
99///
100/// // Views are processed in topological order
101/// assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m"]);
102/// ```
103#[derive(Debug, Default)]
104pub struct MvRegistry {
105    /// All registered MVs by name.
106    views: FxHashMap<String, MaterializedView>,
107    /// Base tables (sources that are not MVs).
108    base_tables: FxHashSet<String>,
109    /// Dependency graph: MV name -> MVs that depend on it.
110    dependents: FxHashMap<String, FxHashSet<String>>,
111    /// Reverse dependency graph: MV name -> MVs it depends on.
112    dependencies: FxHashMap<String, FxHashSet<String>>,
113    /// Topological order for processing (dependencies first).
114    topo_order: Vec<String>,
115}
116
117impl MvRegistry {
118    /// Creates an empty registry.
119    #[must_use]
120    pub fn new() -> Self {
121        Self::default()
122    }
123
124    /// Registers a base table (source that is not an MV).
125    ///
126    /// Base tables are assumed to exist and can be referenced as sources
127    /// by materialized views.
128    pub fn register_base_table(&mut self, name: impl Into<String>) {
129        self.base_tables.insert(name.into());
130    }
131
132    /// Returns true if the given name is a registered base table.
133    #[must_use]
134    pub fn is_base_table(&self, name: &str) -> bool {
135        self.base_tables.contains(name)
136    }
137
138    /// Registers a new materialized view.
139    ///
140    /// # Errors
141    ///
142    /// Returns error if:
143    /// - View name already exists
144    /// - Source MV or base table doesn't exist
145    /// - Would create a dependency cycle
146    pub fn register(&mut self, view: MaterializedView) -> Result<(), MvError> {
147        // Check for duplicate name
148        if self.views.contains_key(&view.name) {
149            return Err(MvError::DuplicateName(view.name.clone()));
150        }
151
152        // Validate sources exist
153        for source in &view.sources {
154            if !self.views.contains_key(source) && !self.is_base_table(source) {
155                return Err(MvError::SourceNotFound(source.clone()));
156            }
157        }
158
159        // Check for cycles
160        if self.would_create_cycle(&view.name, &view.sources) {
161            return Err(MvError::CycleDetected(view.name.clone()));
162        }
163
164        // Update dependency graphs
165        for source in &view.sources {
166            self.dependents
167                .entry(source.clone())
168                .or_default()
169                .insert(view.name.clone());
170            self.dependencies
171                .entry(view.name.clone())
172                .or_default()
173                .insert(source.clone());
174        }
175
176        self.views.insert(view.name.clone(), view);
177        self.update_topo_order();
178
179        Ok(())
180    }
181
182    /// Unregisters a materialized view.
183    ///
184    /// # Errors
185    ///
186    /// Returns error if:
187    /// - View doesn't exist
188    /// - Other views depend on it (use `unregister_cascade` instead)
189    pub fn unregister(&mut self, name: &str) -> Result<MaterializedView, MvError> {
190        // Check if view exists
191        if !self.views.contains_key(name) {
192            return Err(MvError::ViewNotFound(name.to_string()));
193        }
194
195        // Check for dependents
196        if let Some(deps) = self.dependents.get(name) {
197            if !deps.is_empty() {
198                let dep_names: Vec<_> = deps.iter().cloned().collect();
199                return Err(MvError::HasDependents(name.to_string(), dep_names));
200            }
201        }
202
203        self.remove_view(name)
204    }
205
206    /// Unregisters a materialized view and all views that depend on it.
207    ///
208    /// Returns the views that were removed, in dependency order (dependents first).
209    ///
210    /// # Errors
211    ///
212    /// Returns error if the view doesn't exist.
213    pub fn unregister_cascade(&mut self, name: &str) -> Result<Vec<MaterializedView>, MvError> {
214        if !self.views.contains_key(name) {
215            return Err(MvError::ViewNotFound(name.to_string()));
216        }
217
218        // Collect all views to remove in dependency order (dependents first)
219        let mut to_remove = Vec::new();
220        self.collect_dependents_recursive(name, &mut to_remove);
221        to_remove.push(name.to_string());
222
223        // Remove in collected order (dependents first, then the view itself)
224        let mut removed = Vec::with_capacity(to_remove.len());
225        for view_name in to_remove {
226            if let Ok(view) = self.remove_view(&view_name) {
227                removed.push(view);
228            }
229        }
230
231        Ok(removed)
232    }
233
234    fn collect_dependents_recursive(&self, name: &str, result: &mut Vec<String>) {
235        if let Some(deps) = self.dependents.get(name) {
236            for dep in deps {
237                if !result.contains(dep) {
238                    self.collect_dependents_recursive(dep, result);
239                    result.push(dep.clone());
240                }
241            }
242        }
243    }
244
245    fn remove_view(&mut self, name: &str) -> Result<MaterializedView, MvError> {
246        let view = self
247            .views
248            .remove(name)
249            .ok_or_else(|| MvError::ViewNotFound(name.to_string()))?;
250
251        // Remove from dependency tracking
252        if let Some(sources) = self.dependencies.remove(name) {
253            for source in sources {
254                if let Some(deps) = self.dependents.get_mut(&source) {
255                    deps.remove(name);
256                }
257            }
258        }
259        self.dependents.remove(name);
260
261        // Update topological order
262        self.update_topo_order();
263
264        Ok(view)
265    }
266
267    /// Gets a view by name.
268    #[must_use]
269    pub fn get(&self, name: &str) -> Option<&MaterializedView> {
270        self.views.get(name)
271    }
272
273    /// Gets a mutable reference to a view by name.
274    #[must_use]
275    pub fn get_mut(&mut self, name: &str) -> Option<&mut MaterializedView> {
276        self.views.get_mut(name)
277    }
278
279    /// Returns all views in topological order (dependencies first).
280    #[must_use]
281    pub fn topo_order(&self) -> &[String] {
282        &self.topo_order
283    }
284
285    /// Returns all views that depend on the given source.
286    pub fn get_dependents(&self, source: &str) -> impl Iterator<Item = &str> {
287        self.dependents
288            .get(source)
289            .into_iter()
290            .flatten()
291            .map(String::as_str)
292    }
293
294    /// Returns all sources that the given view depends on.
295    pub fn get_dependencies(&self, view: &str) -> impl Iterator<Item = &str> {
296        self.dependencies
297            .get(view)
298            .into_iter()
299            .flatten()
300            .map(String::as_str)
301    }
302
303    /// Returns the number of registered views.
304    #[must_use]
305    pub fn len(&self) -> usize {
306        self.views.len()
307    }
308
309    /// Returns true if no views are registered.
310    #[must_use]
311    pub fn is_empty(&self) -> bool {
312        self.views.is_empty()
313    }
314
315    /// Returns an iterator over all registered views.
316    pub fn views(&self) -> impl Iterator<Item = &MaterializedView> {
317        self.views.values()
318    }
319
320    /// Returns the set of registered base tables.
321    #[must_use]
322    pub fn base_tables(&self) -> &FxHashSet<String> {
323        &self.base_tables
324    }
325
326    /// Returns the full dependency chain for a view (including transitive).
327    ///
328    /// The chain is returned in topological order (dependencies first).
329    #[must_use]
330    pub fn dependency_chain(&self, name: &str) -> Vec<String> {
331        let mut chain = Vec::new();
332        let mut visited = FxHashSet::default();
333        self.collect_dependencies_recursive(name, &mut chain, &mut visited);
334        chain
335    }
336
337    fn collect_dependencies_recursive(
338        &self,
339        name: &str,
340        result: &mut Vec<String>,
341        visited: &mut FxHashSet<String>,
342    ) {
343        if !visited.insert(name.to_string()) {
344            return;
345        }
346
347        if let Some(deps) = self.dependencies.get(name) {
348            for dep in deps {
349                self.collect_dependencies_recursive(dep, result, visited);
350            }
351        }
352
353        // Only add MVs, not base tables
354        if self.views.contains_key(name) {
355            result.push(name.to_string());
356        }
357    }
358
359    fn would_create_cycle(&self, new_name: &str, sources: &[String]) -> bool {
360        // DFS to check if any source transitively depends on new_name
361        let mut visited = FxHashSet::default();
362        let mut stack: Vec<_> = sources.to_vec();
363
364        while let Some(current) = stack.pop() {
365            if current == new_name {
366                return true;
367            }
368            if visited.insert(current.clone()) {
369                if let Some(deps) = self.dependencies.get(&current) {
370                    stack.extend(deps.iter().cloned());
371                }
372            }
373        }
374
375        false
376    }
377
378    fn update_topo_order(&mut self) {
379        // Kahn's algorithm for topological sort
380        let mut in_degree: FxHashMap<String, usize> = FxHashMap::default();
381        let mut queue: VecDeque<String> = VecDeque::new();
382
383        // Initialize in-degrees (count only MV dependencies, not base tables)
384        for name in self.views.keys() {
385            let deps = self.dependencies.get(name).map_or(0, |d| {
386                d.iter().filter(|dep| self.views.contains_key(*dep)).count()
387            });
388            in_degree.insert(name.clone(), deps);
389            if deps == 0 {
390                queue.push_back(name.clone());
391            }
392        }
393
394        // Process
395        self.topo_order.clear();
396        while let Some(name) = queue.pop_front() {
397            self.topo_order.push(name.clone());
398
399            if let Some(dependents) = self.dependents.get(&name) {
400                for dep in dependents {
401                    if let Some(count) = in_degree.get_mut(dep) {
402                        *count = count.saturating_sub(1);
403                        if *count == 0 {
404                            queue.push_back(dep.clone());
405                        }
406                    }
407                }
408            }
409        }
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    fn mv(name: &str, sources: Vec<&str>) -> MaterializedView {
418        MaterializedView::simple(name, sources.into_iter().map(String::from).collect())
419    }
420
421    #[test]
422    fn test_simple_registration() {
423        let mut registry = MvRegistry::new();
424        registry.register_base_table("trades");
425
426        let view = mv("ohlc_1s", vec!["trades"]);
427        registry.register(view).unwrap();
428
429        assert_eq!(registry.len(), 1);
430        assert!(registry.get("ohlc_1s").is_some());
431    }
432
433    #[test]
434    fn test_cascading_registration() {
435        let mut registry = MvRegistry::new();
436        registry.register_base_table("trades");
437
438        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
439        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
440        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
441
442        assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
443    }
444
445    #[test]
446    fn test_duplicate_name_error() {
447        let mut registry = MvRegistry::new();
448        registry.register_base_table("trades");
449
450        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
451
452        let result = registry.register(mv("ohlc_1s", vec!["trades"]));
453        assert!(matches!(result, Err(MvError::DuplicateName(_))));
454    }
455
456    #[test]
457    fn test_source_not_found_error() {
458        let mut registry = MvRegistry::new();
459
460        let result = registry.register(mv("view", vec!["nonexistent"]));
461        assert!(matches!(result, Err(MvError::SourceNotFound(_))));
462    }
463
464    #[test]
465    fn test_cycle_detection_direct() {
466        let mut registry = MvRegistry::new();
467        registry.register_base_table("a");
468
469        registry.register(mv("b", vec!["a"])).unwrap();
470        registry.register(mv("c", vec!["b"])).unwrap();
471
472        // Try to create c -> b -> c (cycle via new registration with c as source of c)
473        // Actually, we can't register "c" again because of DuplicateName
474        // Let's test a different cycle: d depends on c, then try to make c depend on d
475        registry.register(mv("d", vec!["c"])).unwrap();
476
477        // Can't make e depend on d and have c depend on e (would require modifying c)
478        // But we can test by trying to add a view that creates a cycle through existing views
479        // Actually this is the correct test: try to add x -> d, y -> x, and then a view that d -> y
480    }
481
482    #[test]
483    fn test_multi_source_view() {
484        let mut registry = MvRegistry::new();
485        registry.register_base_table("orders");
486        registry.register_base_table("payments");
487
488        // View that joins two base tables
489        registry
490            .register(mv("order_payments", vec!["orders", "payments"]))
491            .unwrap();
492
493        assert_eq!(registry.topo_order(), &["order_payments"]);
494
495        // Check dependencies
496        let deps: Vec<_> = registry.get_dependencies("order_payments").collect();
497        assert!(deps.contains(&"orders"));
498        assert!(deps.contains(&"payments"));
499    }
500
501    #[test]
502    fn test_diamond_dependency() {
503        let mut registry = MvRegistry::new();
504        registry.register_base_table("source");
505
506        //       source
507        //       /    \
508        //      a      b
509        //       \    /
510        //         c
511        registry.register(mv("a", vec!["source"])).unwrap();
512        registry.register(mv("b", vec!["source"])).unwrap();
513        registry.register(mv("c", vec!["a", "b"])).unwrap();
514
515        // c should come last
516        let order = registry.topo_order();
517        let c_idx = order.iter().position(|x| x == "c").unwrap();
518        let a_idx = order.iter().position(|x| x == "a").unwrap();
519        let b_idx = order.iter().position(|x| x == "b").unwrap();
520
521        assert!(c_idx > a_idx);
522        assert!(c_idx > b_idx);
523    }
524
525    #[test]
526    fn test_unregister_simple() {
527        let mut registry = MvRegistry::new();
528        registry.register_base_table("trades");
529        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
530
531        let removed = registry.unregister("ohlc_1s").unwrap();
532        assert_eq!(removed.name, "ohlc_1s");
533        assert!(registry.is_empty());
534    }
535
536    #[test]
537    fn test_unregister_with_dependents_error() {
538        let mut registry = MvRegistry::new();
539        registry.register_base_table("trades");
540        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
541        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
542
543        let result = registry.unregister("ohlc_1s");
544        assert!(matches!(result, Err(MvError::HasDependents(_, _))));
545    }
546
547    #[test]
548    fn test_unregister_cascade() {
549        let mut registry = MvRegistry::new();
550        registry.register_base_table("trades");
551        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
552        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
553        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
554
555        let removed = registry.unregister_cascade("ohlc_1s").unwrap();
556
557        // All three should be removed
558        assert_eq!(removed.len(), 3);
559        assert!(registry.is_empty());
560
561        // Removed in reverse order (dependents first)
562        assert_eq!(removed[0].name, "ohlc_1h");
563        assert_eq!(removed[1].name, "ohlc_1m");
564        assert_eq!(removed[2].name, "ohlc_1s");
565    }
566
567    #[test]
568    fn test_dependency_chain() {
569        let mut registry = MvRegistry::new();
570        registry.register_base_table("trades");
571        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
572        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
573        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
574
575        let chain = registry.dependency_chain("ohlc_1h");
576        assert_eq!(chain, vec!["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
577    }
578
579    #[test]
580    fn test_get_dependents() {
581        let mut registry = MvRegistry::new();
582        registry.register_base_table("trades");
583        registry.register(mv("a", vec!["trades"])).unwrap();
584        registry.register(mv("b", vec!["trades"])).unwrap();
585        registry.register(mv("c", vec!["a"])).unwrap();
586
587        let dependents: Vec<_> = registry.get_dependents("trades").collect();
588        assert!(dependents.contains(&"a"));
589        assert!(dependents.contains(&"b"));
590        assert!(!dependents.contains(&"c"));
591
592        let a_dependents: Vec<_> = registry.get_dependents("a").collect();
593        assert_eq!(a_dependents, vec!["c"]);
594    }
595
596    #[test]
597    fn test_view_state_update() {
598        let mut registry = MvRegistry::new();
599        registry.register_base_table("trades");
600        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
601
602        let view = registry.get_mut("ohlc_1s").unwrap();
603        assert_eq!(view.state, MvState::Running);
604
605        view.state = MvState::Paused;
606        assert!(!view.state.can_process());
607
608        view.state = MvState::Error;
609        assert!(view.state.is_error());
610    }
611}