1use super::error::{MvError, MvState};
7use arrow_schema::SchemaRef;
8use fxhash::{FxHashMap, FxHashSet};
9use std::collections::VecDeque;
10
11#[derive(Debug, Clone)]
16pub struct MaterializedView {
17 pub name: String,
19 pub sql: String,
21 pub sources: Vec<String>,
23 pub schema: SchemaRef,
25 pub operator_id: String,
27 pub state: MvState,
29}
30
31impl MaterializedView {
32 #[must_use]
34 pub fn new(
35 name: impl Into<String>,
36 sql: impl Into<String>,
37 sources: Vec<String>,
38 schema: SchemaRef,
39 ) -> Self {
40 let name = name.into();
41 let operator_id = format!("mv_{name}");
42 Self {
43 name,
44 sql: sql.into(),
45 sources,
46 schema,
47 operator_id,
48 state: MvState::Running,
49 }
50 }
51
52 #[cfg(test)]
54 pub fn simple(name: impl Into<String>, sources: Vec<String>) -> Self {
55 use arrow_schema::{DataType, Field, Schema};
56 use std::sync::Arc;
57
58 let schema = Arc::new(Schema::new(vec![Field::new(
59 "value",
60 DataType::Int64,
61 false,
62 )]));
63 Self::new(name, "", sources, schema)
64 }
65
66 #[must_use]
68 pub fn depends_on(&self, source: &str) -> bool {
69 self.sources.iter().any(|s| s == source)
70 }
71}
72
73#[derive(Debug, Default)]
104pub struct MvRegistry {
105 views: FxHashMap<String, MaterializedView>,
107 base_tables: FxHashSet<String>,
109 dependents: FxHashMap<String, FxHashSet<String>>,
111 dependencies: FxHashMap<String, FxHashSet<String>>,
113 topo_order: Vec<String>,
115}
116
117impl MvRegistry {
118 #[must_use]
120 pub fn new() -> Self {
121 Self::default()
122 }
123
124 pub fn register_base_table(&mut self, name: impl Into<String>) {
129 self.base_tables.insert(name.into());
130 }
131
132 #[must_use]
134 pub fn is_base_table(&self, name: &str) -> bool {
135 self.base_tables.contains(name)
136 }
137
138 pub fn register(&mut self, view: MaterializedView) -> Result<(), MvError> {
147 if self.views.contains_key(&view.name) {
149 return Err(MvError::DuplicateName(view.name.clone()));
150 }
151
152 for source in &view.sources {
154 if !self.views.contains_key(source) && !self.is_base_table(source) {
155 return Err(MvError::SourceNotFound(source.clone()));
156 }
157 }
158
159 if self.would_create_cycle(&view.name, &view.sources) {
161 return Err(MvError::CycleDetected(view.name.clone()));
162 }
163
164 for source in &view.sources {
166 self.dependents
167 .entry(source.clone())
168 .or_default()
169 .insert(view.name.clone());
170 self.dependencies
171 .entry(view.name.clone())
172 .or_default()
173 .insert(source.clone());
174 }
175
176 self.views.insert(view.name.clone(), view);
177 self.update_topo_order();
178
179 Ok(())
180 }
181
182 pub fn unregister(&mut self, name: &str) -> Result<MaterializedView, MvError> {
190 if !self.views.contains_key(name) {
192 return Err(MvError::ViewNotFound(name.to_string()));
193 }
194
195 if let Some(deps) = self.dependents.get(name) {
197 if !deps.is_empty() {
198 let dep_names: Vec<_> = deps.iter().cloned().collect();
199 return Err(MvError::HasDependents(name.to_string(), dep_names));
200 }
201 }
202
203 self.remove_view(name)
204 }
205
206 pub fn unregister_cascade(&mut self, name: &str) -> Result<Vec<MaterializedView>, MvError> {
214 if !self.views.contains_key(name) {
215 return Err(MvError::ViewNotFound(name.to_string()));
216 }
217
218 let mut to_remove = Vec::new();
220 self.collect_dependents_recursive(name, &mut to_remove);
221 to_remove.push(name.to_string());
222
223 let mut removed = Vec::with_capacity(to_remove.len());
225 for view_name in to_remove {
226 if let Ok(view) = self.remove_view(&view_name) {
227 removed.push(view);
228 }
229 }
230
231 Ok(removed)
232 }
233
234 fn collect_dependents_recursive(&self, name: &str, result: &mut Vec<String>) {
235 if let Some(deps) = self.dependents.get(name) {
236 for dep in deps {
237 if !result.contains(dep) {
238 self.collect_dependents_recursive(dep, result);
239 result.push(dep.clone());
240 }
241 }
242 }
243 }
244
245 fn remove_view(&mut self, name: &str) -> Result<MaterializedView, MvError> {
246 let view = self
247 .views
248 .remove(name)
249 .ok_or_else(|| MvError::ViewNotFound(name.to_string()))?;
250
251 if let Some(sources) = self.dependencies.remove(name) {
253 for source in sources {
254 if let Some(deps) = self.dependents.get_mut(&source) {
255 deps.remove(name);
256 }
257 }
258 }
259 self.dependents.remove(name);
260
261 self.update_topo_order();
263
264 Ok(view)
265 }
266
267 #[must_use]
269 pub fn get(&self, name: &str) -> Option<&MaterializedView> {
270 self.views.get(name)
271 }
272
273 #[must_use]
275 pub fn get_mut(&mut self, name: &str) -> Option<&mut MaterializedView> {
276 self.views.get_mut(name)
277 }
278
279 #[must_use]
281 pub fn topo_order(&self) -> &[String] {
282 &self.topo_order
283 }
284
285 pub fn get_dependents(&self, source: &str) -> impl Iterator<Item = &str> {
287 self.dependents
288 .get(source)
289 .into_iter()
290 .flatten()
291 .map(String::as_str)
292 }
293
294 pub fn get_dependencies(&self, view: &str) -> impl Iterator<Item = &str> {
296 self.dependencies
297 .get(view)
298 .into_iter()
299 .flatten()
300 .map(String::as_str)
301 }
302
303 #[must_use]
305 pub fn len(&self) -> usize {
306 self.views.len()
307 }
308
309 #[must_use]
311 pub fn is_empty(&self) -> bool {
312 self.views.is_empty()
313 }
314
315 pub fn views(&self) -> impl Iterator<Item = &MaterializedView> {
317 self.views.values()
318 }
319
320 #[must_use]
322 pub fn base_tables(&self) -> &FxHashSet<String> {
323 &self.base_tables
324 }
325
326 #[must_use]
330 pub fn dependency_chain(&self, name: &str) -> Vec<String> {
331 let mut chain = Vec::new();
332 let mut visited = FxHashSet::default();
333 self.collect_dependencies_recursive(name, &mut chain, &mut visited);
334 chain
335 }
336
337 fn collect_dependencies_recursive(
338 &self,
339 name: &str,
340 result: &mut Vec<String>,
341 visited: &mut FxHashSet<String>,
342 ) {
343 if !visited.insert(name.to_string()) {
344 return;
345 }
346
347 if let Some(deps) = self.dependencies.get(name) {
348 for dep in deps {
349 self.collect_dependencies_recursive(dep, result, visited);
350 }
351 }
352
353 if self.views.contains_key(name) {
355 result.push(name.to_string());
356 }
357 }
358
359 fn would_create_cycle(&self, new_name: &str, sources: &[String]) -> bool {
360 let mut visited = FxHashSet::default();
362 let mut stack: Vec<_> = sources.to_vec();
363
364 while let Some(current) = stack.pop() {
365 if current == new_name {
366 return true;
367 }
368 if visited.insert(current.clone()) {
369 if let Some(deps) = self.dependencies.get(¤t) {
370 stack.extend(deps.iter().cloned());
371 }
372 }
373 }
374
375 false
376 }
377
378 fn update_topo_order(&mut self) {
379 let mut in_degree: FxHashMap<String, usize> = FxHashMap::default();
381 let mut queue: VecDeque<String> = VecDeque::new();
382
383 for name in self.views.keys() {
385 let deps = self.dependencies.get(name).map_or(0, |d| {
386 d.iter().filter(|dep| self.views.contains_key(*dep)).count()
387 });
388 in_degree.insert(name.clone(), deps);
389 if deps == 0 {
390 queue.push_back(name.clone());
391 }
392 }
393
394 self.topo_order.clear();
396 while let Some(name) = queue.pop_front() {
397 self.topo_order.push(name.clone());
398
399 if let Some(dependents) = self.dependents.get(&name) {
400 for dep in dependents {
401 if let Some(count) = in_degree.get_mut(dep) {
402 *count = count.saturating_sub(1);
403 if *count == 0 {
404 queue.push_back(dep.clone());
405 }
406 }
407 }
408 }
409 }
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 fn mv(name: &str, sources: Vec<&str>) -> MaterializedView {
418 MaterializedView::simple(name, sources.into_iter().map(String::from).collect())
419 }
420
421 #[test]
422 fn test_simple_registration() {
423 let mut registry = MvRegistry::new();
424 registry.register_base_table("trades");
425
426 let view = mv("ohlc_1s", vec!["trades"]);
427 registry.register(view).unwrap();
428
429 assert_eq!(registry.len(), 1);
430 assert!(registry.get("ohlc_1s").is_some());
431 }
432
433 #[test]
434 fn test_cascading_registration() {
435 let mut registry = MvRegistry::new();
436 registry.register_base_table("trades");
437
438 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
439 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
440 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
441
442 assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
443 }
444
445 #[test]
446 fn test_duplicate_name_error() {
447 let mut registry = MvRegistry::new();
448 registry.register_base_table("trades");
449
450 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
451
452 let result = registry.register(mv("ohlc_1s", vec!["trades"]));
453 assert!(matches!(result, Err(MvError::DuplicateName(_))));
454 }
455
456 #[test]
457 fn test_source_not_found_error() {
458 let mut registry = MvRegistry::new();
459
460 let result = registry.register(mv("view", vec!["nonexistent"]));
461 assert!(matches!(result, Err(MvError::SourceNotFound(_))));
462 }
463
464 #[test]
465 fn test_cycle_detection_direct() {
466 let mut registry = MvRegistry::new();
467 registry.register_base_table("a");
468
469 registry.register(mv("b", vec!["a"])).unwrap();
470 registry.register(mv("c", vec!["b"])).unwrap();
471
472 registry.register(mv("d", vec!["c"])).unwrap();
476
477 }
481
482 #[test]
483 fn test_multi_source_view() {
484 let mut registry = MvRegistry::new();
485 registry.register_base_table("orders");
486 registry.register_base_table("payments");
487
488 registry
490 .register(mv("order_payments", vec!["orders", "payments"]))
491 .unwrap();
492
493 assert_eq!(registry.topo_order(), &["order_payments"]);
494
495 let deps: Vec<_> = registry.get_dependencies("order_payments").collect();
497 assert!(deps.contains(&"orders"));
498 assert!(deps.contains(&"payments"));
499 }
500
501 #[test]
502 fn test_diamond_dependency() {
503 let mut registry = MvRegistry::new();
504 registry.register_base_table("source");
505
506 registry.register(mv("a", vec!["source"])).unwrap();
512 registry.register(mv("b", vec!["source"])).unwrap();
513 registry.register(mv("c", vec!["a", "b"])).unwrap();
514
515 let order = registry.topo_order();
517 let c_idx = order.iter().position(|x| x == "c").unwrap();
518 let a_idx = order.iter().position(|x| x == "a").unwrap();
519 let b_idx = order.iter().position(|x| x == "b").unwrap();
520
521 assert!(c_idx > a_idx);
522 assert!(c_idx > b_idx);
523 }
524
525 #[test]
526 fn test_unregister_simple() {
527 let mut registry = MvRegistry::new();
528 registry.register_base_table("trades");
529 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
530
531 let removed = registry.unregister("ohlc_1s").unwrap();
532 assert_eq!(removed.name, "ohlc_1s");
533 assert!(registry.is_empty());
534 }
535
536 #[test]
537 fn test_unregister_with_dependents_error() {
538 let mut registry = MvRegistry::new();
539 registry.register_base_table("trades");
540 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
541 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
542
543 let result = registry.unregister("ohlc_1s");
544 assert!(matches!(result, Err(MvError::HasDependents(_, _))));
545 }
546
547 #[test]
548 fn test_unregister_cascade() {
549 let mut registry = MvRegistry::new();
550 registry.register_base_table("trades");
551 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
552 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
553 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
554
555 let removed = registry.unregister_cascade("ohlc_1s").unwrap();
556
557 assert_eq!(removed.len(), 3);
559 assert!(registry.is_empty());
560
561 assert_eq!(removed[0].name, "ohlc_1h");
563 assert_eq!(removed[1].name, "ohlc_1m");
564 assert_eq!(removed[2].name, "ohlc_1s");
565 }
566
567 #[test]
568 fn test_dependency_chain() {
569 let mut registry = MvRegistry::new();
570 registry.register_base_table("trades");
571 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
572 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
573 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
574
575 let chain = registry.dependency_chain("ohlc_1h");
576 assert_eq!(chain, vec!["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
577 }
578
579 #[test]
580 fn test_get_dependents() {
581 let mut registry = MvRegistry::new();
582 registry.register_base_table("trades");
583 registry.register(mv("a", vec!["trades"])).unwrap();
584 registry.register(mv("b", vec!["trades"])).unwrap();
585 registry.register(mv("c", vec!["a"])).unwrap();
586
587 let dependents: Vec<_> = registry.get_dependents("trades").collect();
588 assert!(dependents.contains(&"a"));
589 assert!(dependents.contains(&"b"));
590 assert!(!dependents.contains(&"c"));
591
592 let a_dependents: Vec<_> = registry.get_dependents("a").collect();
593 assert_eq!(a_dependents, vec!["c"]);
594 }
595
596 #[test]
597 fn test_view_state_update() {
598 let mut registry = MvRegistry::new();
599 registry.register_base_table("trades");
600 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
601
602 let view = registry.get_mut("ohlc_1s").unwrap();
603 assert_eq!(view.state, MvState::Running);
604
605 view.state = MvState::Paused;
606 assert!(!view.state.can_process());
607
608 view.state = MvState::Error;
609 assert!(view.state.is_error());
610 }
611}