Skip to main content

laminar_core/mv/
executor.rs

1//! Executor for cascading materialized view pipelines.
2//!
3//! Processes events through the MV DAG in topological order, ensuring
4//! that dependencies are updated before their dependents.
5
6use super::error::MvError;
7use super::registry::MvRegistry;
8use super::watermark::CascadingWatermarkTracker;
9use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output, OutputVec};
10use fxhash::FxHashMap;
11use std::collections::VecDeque;
12use std::sync::Arc;
13
14/// Executor for cascading materialized view pipelines.
15///
16/// Processes events through the MV DAG in topological order, ensuring
17/// correct watermark propagation and event routing between views.
18///
19/// # Example
20///
21/// ```rust,ignore
22/// use laminar_core::mv::{MvRegistry, MvPipelineExecutor, MaterializedView};
23/// use std::sync::Arc;
24///
25/// // Setup registry with cascading views
26/// let registry = Arc::new(setup_registry());
27///
28/// // Create executor with operators for each view
29/// let mut executor = MvPipelineExecutor::new(registry);
30/// executor.register_operator("ohlc_1s", Box::new(ohlc_1s_operator))?;
31/// executor.register_operator("ohlc_1m", Box::new(ohlc_1m_operator))?;
32///
33/// // Process events from base source
34/// let outputs = executor.process_source_event("trades", event, &mut ctx)?;
35/// ```
36pub struct MvPipelineExecutor {
37    /// MV registry.
38    registry: Arc<MvRegistry>,
39    /// Operators per MV.
40    operators: FxHashMap<String, Box<dyn Operator>>,
41    /// Watermark tracker.
42    watermarks: CascadingWatermarkTracker,
43    /// Output queues per source/MV (for downstream consumption).
44    output_queues: FxHashMap<String, VecDeque<Event>>,
45    /// Metrics for the pipeline.
46    metrics: PipelineMetrics,
47}
48
49/// Metrics for the MV pipeline.
50#[derive(Debug, Clone, Default)]
51pub struct PipelineMetrics {
52    /// Total events processed.
53    pub events_processed: u64,
54    /// Events processed per MV.
55    pub events_per_mv: FxHashMap<String, u64>,
56    /// Watermarks advanced.
57    pub watermarks_advanced: u64,
58    /// Processing errors.
59    pub errors: u64,
60}
61
62impl MvPipelineExecutor {
63    /// Creates a new executor with the given registry.
64    #[must_use]
65    pub fn new(registry: Arc<MvRegistry>) -> Self {
66        let watermarks = CascadingWatermarkTracker::new(Arc::clone(&registry));
67        Self {
68            registry,
69            operators: FxHashMap::default(),
70            watermarks,
71            output_queues: FxHashMap::default(),
72            metrics: PipelineMetrics::default(),
73        }
74    }
75
76    /// Registers an operator for a materialized view.
77    ///
78    /// # Errors
79    ///
80    /// Returns error if the view doesn't exist in the registry.
81    pub fn register_operator(
82        &mut self,
83        mv_name: &str,
84        operator: Box<dyn Operator>,
85    ) -> Result<(), MvError> {
86        if !self.registry.views().any(|v| v.name == mv_name) {
87            return Err(MvError::ViewNotFound(mv_name.to_string()));
88        }
89        self.operators.insert(mv_name.to_string(), operator);
90        Ok(())
91    }
92
93    /// Checks if all MVs have registered operators.
94    #[must_use]
95    pub fn is_ready(&self) -> bool {
96        self.registry
97            .views()
98            .all(|v| self.operators.contains_key(&v.name))
99    }
100
101    /// Returns MVs that are missing operators.
102    pub fn missing_operators(&self) -> impl Iterator<Item = &str> {
103        self.registry
104            .views()
105            .filter(|v| !self.operators.contains_key(&v.name))
106            .map(|v| v.name.as_str())
107    }
108
109    /// Processes an event from a base source.
110    ///
111    /// The event is queued for processing by dependent MVs.
112    /// MVs are processed in topological order to ensure dependencies
113    /// are updated before their dependents.
114    ///
115    /// Returns all outputs produced by the pipeline.
116    ///
117    /// # Errors
118    ///
119    /// Returns error if processing fails for any MV.
120    pub fn process_source_event(
121        &mut self,
122        source: &str,
123        event: Event,
124        ctx: &mut OperatorContext,
125    ) -> Result<Vec<Output>, MvError> {
126        // Queue the event for processing by dependents
127        self.output_queues
128            .entry(source.to_string())
129            .or_default()
130            .push_back(event);
131
132        self.metrics.events_processed += 1;
133
134        // Process MVs in topological order
135        let mut all_outputs = Vec::new();
136        for mv_name in self.registry.topo_order().to_vec() {
137            let outputs = self.process_mv_inputs(&mv_name, ctx)?;
138            all_outputs.extend(outputs);
139        }
140
141        Ok(all_outputs)
142    }
143
144    /// Processes a batch of events from a base source.
145    ///
146    /// More efficient than calling `process_source_event` repeatedly.
147    ///
148    /// # Errors
149    ///
150    /// Returns `MvError` if processing fails for any MV.
151    pub fn process_source_events(
152        &mut self,
153        source: &str,
154        events: impl IntoIterator<Item = Event>,
155        ctx: &mut OperatorContext,
156    ) -> Result<Vec<Output>, MvError> {
157        // Queue all events
158        let queue = self.output_queues.entry(source.to_string()).or_default();
159        let mut count = 0u64;
160        for event in events {
161            queue.push_back(event);
162            count += 1;
163        }
164        self.metrics.events_processed += count;
165
166        // Process MVs in topological order
167        let mut all_outputs = Vec::new();
168        for mv_name in self.registry.topo_order().to_vec() {
169            let outputs = self.process_mv_inputs(&mv_name, ctx)?;
170            all_outputs.extend(outputs);
171        }
172
173        Ok(all_outputs)
174    }
175
176    /// Advances the watermark for a source and propagates through the pipeline.
177    ///
178    /// Returns all watermark updates that occurred.
179    pub fn advance_watermark(&mut self, source: &str, watermark: i64) -> Vec<(String, i64)> {
180        self.metrics.watermarks_advanced += 1;
181        self.watermarks.update_watermark(source, watermark)
182    }
183
184    /// Triggers timer callbacks for all MVs.
185    ///
186    /// Returns all outputs produced by timer handlers.
187    ///
188    /// # Errors
189    ///
190    /// Returns `MvError` if any operator fails during processing.
191    pub fn on_timer(
192        &mut self,
193        timer: &crate::operator::Timer,
194        ctx: &mut OperatorContext,
195    ) -> Result<Vec<Output>, MvError> {
196        let mut all_outputs = Vec::new();
197
198        // Process timers for each MV in topological order
199        for mv_name in self.registry.topo_order().to_vec() {
200            if let Some(operator) = self.operators.get_mut(&mv_name) {
201                let outputs = operator.on_timer(timer.clone(), ctx);
202                for output in outputs {
203                    match output {
204                        Output::Event(event) => {
205                            // Queue for downstream processing
206                            self.output_queues
207                                .entry(mv_name.clone())
208                                .or_default()
209                                .push_back(event);
210                        }
211                        other => all_outputs.push(other),
212                    }
213                }
214            }
215        }
216
217        // Process any events generated by timers
218        for mv_name in self.registry.topo_order().to_vec() {
219            let outputs = self.process_mv_inputs(&mv_name, ctx)?;
220            all_outputs.extend(outputs);
221        }
222
223        Ok(all_outputs)
224    }
225
226    /// Gets the current watermark for a source or MV.
227    #[must_use]
228    pub fn get_watermark(&self, name: &str) -> Option<i64> {
229        self.watermarks.get_watermark(name)
230    }
231
232    /// Gets all pending events for a source/MV.
233    #[must_use]
234    pub fn pending_events(&self, name: &str) -> Option<&VecDeque<Event>> {
235        self.output_queues.get(name)
236    }
237
238    /// Gets pipeline metrics.
239    #[must_use]
240    pub fn metrics(&self) -> &PipelineMetrics {
241        &self.metrics
242    }
243
244    /// Resets pipeline metrics.
245    pub fn reset_metrics(&mut self) {
246        self.metrics = PipelineMetrics::default();
247    }
248
249    fn process_mv_inputs(
250        &mut self,
251        mv_name: &str,
252        ctx: &mut OperatorContext,
253    ) -> Result<Vec<Output>, MvError> {
254        let view = self
255            .registry
256            .get(mv_name)
257            .ok_or_else(|| MvError::ViewNotFound(mv_name.to_string()))?;
258
259        // Collect inputs from all sources
260        let mut inputs = Vec::new();
261        for source in &view.sources {
262            if let Some(queue) = self.output_queues.get_mut(source) {
263                inputs.extend(queue.drain(..));
264            }
265        }
266
267        if inputs.is_empty() {
268            return Ok(Vec::new());
269        }
270
271        // Get the operator
272        let operator = self
273            .operators
274            .get_mut(mv_name)
275            .ok_or_else(|| MvError::OperatorNotFound(mv_name.to_string()))?;
276
277        // Process all inputs
278        let mut outputs = Vec::new();
279        for input in inputs {
280            let op_outputs = operator.process(&input, ctx);
281            *self
282                .metrics
283                .events_per_mv
284                .entry(mv_name.to_string())
285                .or_default() += 1;
286
287            for output in op_outputs {
288                match output {
289                    Output::Event(event) => {
290                        // Queue for downstream MVs
291                        self.output_queues
292                            .entry(mv_name.to_string())
293                            .or_default()
294                            .push_back(event);
295                    }
296                    other => outputs.push(other),
297                }
298            }
299        }
300
301        Ok(outputs)
302    }
303}
304
305/// Checkpoint data for the MV pipeline.
306#[derive(Debug, Clone)]
307pub struct MvPipelineCheckpoint {
308    /// Operator states per MV.
309    pub operator_states: Vec<(String, OperatorState)>,
310    /// Watermarks at checkpoint time.
311    pub watermarks: Vec<(String, i64)>,
312    /// Pending events per source/MV.
313    pub pending_events: Vec<(String, Vec<Event>)>,
314}
315
316impl MvPipelineExecutor {
317    /// Creates a checkpoint of the pipeline state.
318    #[must_use]
319    pub fn checkpoint(&self) -> MvPipelineCheckpoint {
320        let operator_states = self
321            .operators
322            .iter()
323            .map(|(name, op)| (name.clone(), op.checkpoint()))
324            .collect();
325
326        let watermarks = self.watermarks.checkpoint().watermarks;
327
328        let pending_events = self
329            .output_queues
330            .iter()
331            .map(|(name, queue)| (name.clone(), queue.iter().cloned().collect()))
332            .collect();
333
334        MvPipelineCheckpoint {
335            operator_states,
336            watermarks,
337            pending_events,
338        }
339    }
340
341    /// Restores pipeline state from a checkpoint.
342    ///
343    /// # Errors
344    ///
345    /// Returns error if any operator fails to restore.
346    pub fn restore(&mut self, checkpoint: MvPipelineCheckpoint) -> Result<(), MvError> {
347        // Restore operator states
348        for (name, state) in checkpoint.operator_states {
349            if let Some(operator) = self.operators.get_mut(&name) {
350                operator.restore(state)?;
351            }
352        }
353
354        // Restore watermarks
355        self.watermarks
356            .restore(super::watermark::WatermarkTrackerCheckpoint {
357                watermarks: checkpoint.watermarks,
358            });
359
360        // Restore pending events
361        self.output_queues.clear();
362        for (name, events) in checkpoint.pending_events {
363            self.output_queues
364                .insert(name, events.into_iter().collect());
365        }
366
367        Ok(())
368    }
369}
370
371/// A simple pass-through operator for testing.
372#[derive(Debug, Clone)]
373pub struct PassThroughOperator {
374    /// Operator ID for checkpointing.
375    pub id: String,
376}
377
378impl PassThroughOperator {
379    /// Creates a new pass-through operator.
380    #[must_use]
381    pub fn new(id: impl Into<String>) -> Self {
382        Self { id: id.into() }
383    }
384}
385
386impl Operator for PassThroughOperator {
387    fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
388        let mut outputs = OutputVec::new();
389        outputs.push(Output::Event(event.clone()));
390        outputs
391    }
392
393    fn on_timer(
394        &mut self,
395        _timer: crate::operator::Timer,
396        _ctx: &mut OperatorContext,
397    ) -> OutputVec {
398        OutputVec::new()
399    }
400
401    fn checkpoint(&self) -> OperatorState {
402        OperatorState {
403            operator_id: self.id.clone(),
404            data: Vec::new(),
405        }
406    }
407
408    fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
409        Ok(())
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416    use crate::mv::registry::MaterializedView;
417    use crate::state::InMemoryStore;
418    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
419    use arrow_array::{Int64Array, RecordBatch};
420    use arrow_schema::{DataType, Field, Schema};
421
422    fn setup_registry() -> Arc<MvRegistry> {
423        let mut registry = MvRegistry::new();
424        registry.register_base_table("trades");
425
426        let schema = Arc::new(Schema::new(vec![Field::new(
427            "value",
428            DataType::Int64,
429            false,
430        )]));
431        let mv = |n: &str, s: Vec<&str>| {
432            MaterializedView::new(
433                n,
434                "",
435                s.into_iter().map(String::from).collect(),
436                schema.clone(),
437            )
438        };
439
440        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
441        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
442
443        Arc::new(registry)
444    }
445
446    fn create_test_event(value: i64, timestamp: i64) -> Event {
447        let array = Arc::new(Int64Array::from(vec![value]));
448        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
449        Event::new(timestamp, batch)
450    }
451
452    fn create_context() -> (InMemoryStore, TimerService, BoundedOutOfOrdernessGenerator) {
453        (
454            InMemoryStore::new(),
455            TimerService::new(),
456            BoundedOutOfOrdernessGenerator::new(1000),
457        )
458    }
459
460    #[test]
461    fn test_executor_creation() {
462        let registry = setup_registry();
463        let executor = MvPipelineExecutor::new(registry);
464
465        assert!(!executor.is_ready());
466        let missing: Vec<_> = executor.missing_operators().collect();
467        assert!(missing.contains(&"ohlc_1s"));
468        assert!(missing.contains(&"ohlc_1m"));
469    }
470
471    #[test]
472    fn test_register_operators() {
473        let registry = setup_registry();
474        let mut executor = MvPipelineExecutor::new(registry);
475
476        executor
477            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
478            .unwrap();
479        executor
480            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
481            .unwrap();
482
483        assert!(executor.is_ready());
484    }
485
486    #[test]
487    fn test_register_nonexistent_mv() {
488        let registry = setup_registry();
489        let mut executor = MvPipelineExecutor::new(registry);
490
491        let result =
492            executor.register_operator("nonexistent", Box::new(PassThroughOperator::new("x")));
493        assert!(matches!(result, Err(MvError::ViewNotFound(_))));
494    }
495
496    #[test]
497    fn test_process_event_propagation() {
498        let registry = setup_registry();
499        let mut executor = MvPipelineExecutor::new(registry);
500
501        executor
502            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
503            .unwrap();
504        executor
505            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
506            .unwrap();
507
508        let (mut state, mut timers, mut wm_gen) = create_context();
509        let mut ctx = OperatorContext {
510            event_time: 1000,
511            processing_time: 1000,
512            timers: &mut timers,
513            state: &mut state,
514            watermark_generator: &mut wm_gen,
515            operator_index: 0,
516        };
517
518        let event = create_test_event(100, 1000);
519        let _outputs = executor
520            .process_source_event("trades", event, &mut ctx)
521            .unwrap();
522
523        // Check metrics
524        assert_eq!(executor.metrics().events_processed, 1);
525        assert_eq!(executor.metrics().events_per_mv.get("ohlc_1s"), Some(&1));
526        assert_eq!(executor.metrics().events_per_mv.get("ohlc_1m"), Some(&1));
527
528        // Events should have propagated through to ohlc_1m's output queue
529        let pending = executor.pending_events("ohlc_1m");
530        assert!(pending.is_some());
531        assert_eq!(pending.unwrap().len(), 1);
532    }
533
534    #[test]
535    fn test_batch_processing() {
536        let registry = setup_registry();
537        let mut executor = MvPipelineExecutor::new(registry);
538
539        executor
540            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
541            .unwrap();
542        executor
543            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
544            .unwrap();
545
546        let (mut state, mut timers, mut wm_gen) = create_context();
547        let mut ctx = OperatorContext {
548            event_time: 1000,
549            processing_time: 1000,
550            timers: &mut timers,
551            state: &mut state,
552            watermark_generator: &mut wm_gen,
553            operator_index: 0,
554        };
555
556        let events = vec![
557            create_test_event(100, 1000),
558            create_test_event(200, 2000),
559            create_test_event(300, 3000),
560        ];
561
562        executor
563            .process_source_events("trades", events, &mut ctx)
564            .unwrap();
565
566        assert_eq!(executor.metrics().events_processed, 3);
567        assert_eq!(executor.metrics().events_per_mv.get("ohlc_1s"), Some(&3));
568        assert_eq!(executor.metrics().events_per_mv.get("ohlc_1m"), Some(&3));
569    }
570
571    #[test]
572    fn test_watermark_propagation() {
573        let registry = setup_registry();
574        let mut executor = MvPipelineExecutor::new(registry);
575
576        executor
577            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
578            .unwrap();
579        executor
580            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
581            .unwrap();
582
583        let updated = executor.advance_watermark("trades", 60_000);
584
585        assert!(!updated.is_empty());
586        assert_eq!(executor.get_watermark("trades"), Some(60_000));
587        assert_eq!(executor.get_watermark("ohlc_1s"), Some(60_000));
588        assert_eq!(executor.get_watermark("ohlc_1m"), Some(60_000));
589    }
590
591    #[test]
592    fn test_checkpoint_restore() {
593        let registry = setup_registry();
594        let mut executor = MvPipelineExecutor::new(Arc::clone(&registry));
595
596        executor
597            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
598            .unwrap();
599        executor
600            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
601            .unwrap();
602
603        // Process some events and advance watermark
604        let (mut state, mut timers, mut wm_gen) = create_context();
605        let mut ctx = OperatorContext {
606            event_time: 1000,
607            processing_time: 1000,
608            timers: &mut timers,
609            state: &mut state,
610            watermark_generator: &mut wm_gen,
611            operator_index: 0,
612        };
613
614        executor
615            .process_source_event("trades", create_test_event(100, 1000), &mut ctx)
616            .unwrap();
617        executor.advance_watermark("trades", 5000);
618
619        // Take checkpoint
620        let checkpoint = executor.checkpoint();
621        assert!(!checkpoint.operator_states.is_empty());
622        assert!(!checkpoint.watermarks.is_empty());
623
624        // Create new executor and restore
625        let mut executor2 = MvPipelineExecutor::new(registry);
626        executor2
627            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
628            .unwrap();
629        executor2
630            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
631            .unwrap();
632
633        executor2.restore(checkpoint).unwrap();
634
635        // Watermarks should be restored
636        assert_eq!(executor2.get_watermark("trades"), Some(5000));
637    }
638
639    #[test]
640    fn test_metrics_reset() {
641        let registry = setup_registry();
642        let mut executor = MvPipelineExecutor::new(registry);
643
644        executor
645            .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
646            .unwrap();
647        executor
648            .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
649            .unwrap();
650
651        let (mut state, mut timers, mut wm_gen) = create_context();
652        let mut ctx = OperatorContext {
653            event_time: 1000,
654            processing_time: 1000,
655            timers: &mut timers,
656            state: &mut state,
657            watermark_generator: &mut wm_gen,
658            operator_index: 0,
659        };
660
661        executor
662            .process_source_event("trades", create_test_event(100, 1000), &mut ctx)
663            .unwrap();
664        assert_eq!(executor.metrics().events_processed, 1);
665
666        executor.reset_metrics();
667        assert_eq!(executor.metrics().events_processed, 0);
668    }
669}