1use super::error::MvError;
7use super::registry::MvRegistry;
8use super::watermark::CascadingWatermarkTracker;
9use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output, OutputVec};
10use fxhash::FxHashMap;
11use std::collections::VecDeque;
12use std::sync::Arc;
13
14pub struct MvPipelineExecutor {
37 registry: Arc<MvRegistry>,
39 operators: FxHashMap<String, Box<dyn Operator>>,
41 watermarks: CascadingWatermarkTracker,
43 output_queues: FxHashMap<String, VecDeque<Event>>,
45 metrics: PipelineMetrics,
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct PipelineMetrics {
52 pub events_processed: u64,
54 pub events_per_mv: FxHashMap<String, u64>,
56 pub watermarks_advanced: u64,
58 pub errors: u64,
60}
61
62impl MvPipelineExecutor {
63 #[must_use]
65 pub fn new(registry: Arc<MvRegistry>) -> Self {
66 let watermarks = CascadingWatermarkTracker::new(Arc::clone(®istry));
67 Self {
68 registry,
69 operators: FxHashMap::default(),
70 watermarks,
71 output_queues: FxHashMap::default(),
72 metrics: PipelineMetrics::default(),
73 }
74 }
75
76 pub fn register_operator(
82 &mut self,
83 mv_name: &str,
84 operator: Box<dyn Operator>,
85 ) -> Result<(), MvError> {
86 if !self.registry.views().any(|v| v.name == mv_name) {
87 return Err(MvError::ViewNotFound(mv_name.to_string()));
88 }
89 self.operators.insert(mv_name.to_string(), operator);
90 Ok(())
91 }
92
93 #[must_use]
95 pub fn is_ready(&self) -> bool {
96 self.registry
97 .views()
98 .all(|v| self.operators.contains_key(&v.name))
99 }
100
101 pub fn missing_operators(&self) -> impl Iterator<Item = &str> {
103 self.registry
104 .views()
105 .filter(|v| !self.operators.contains_key(&v.name))
106 .map(|v| v.name.as_str())
107 }
108
109 pub fn process_source_event(
121 &mut self,
122 source: &str,
123 event: Event,
124 ctx: &mut OperatorContext,
125 ) -> Result<Vec<Output>, MvError> {
126 self.output_queues
128 .entry(source.to_string())
129 .or_default()
130 .push_back(event);
131
132 self.metrics.events_processed += 1;
133
134 let mut all_outputs = Vec::new();
136 for mv_name in self.registry.topo_order().to_vec() {
137 let outputs = self.process_mv_inputs(&mv_name, ctx)?;
138 all_outputs.extend(outputs);
139 }
140
141 Ok(all_outputs)
142 }
143
144 pub fn process_source_events(
152 &mut self,
153 source: &str,
154 events: impl IntoIterator<Item = Event>,
155 ctx: &mut OperatorContext,
156 ) -> Result<Vec<Output>, MvError> {
157 let queue = self.output_queues.entry(source.to_string()).or_default();
159 let mut count = 0u64;
160 for event in events {
161 queue.push_back(event);
162 count += 1;
163 }
164 self.metrics.events_processed += count;
165
166 let mut all_outputs = Vec::new();
168 for mv_name in self.registry.topo_order().to_vec() {
169 let outputs = self.process_mv_inputs(&mv_name, ctx)?;
170 all_outputs.extend(outputs);
171 }
172
173 Ok(all_outputs)
174 }
175
176 pub fn advance_watermark(&mut self, source: &str, watermark: i64) -> Vec<(String, i64)> {
180 self.metrics.watermarks_advanced += 1;
181 self.watermarks.update_watermark(source, watermark)
182 }
183
184 pub fn on_timer(
192 &mut self,
193 timer: &crate::operator::Timer,
194 ctx: &mut OperatorContext,
195 ) -> Result<Vec<Output>, MvError> {
196 let mut all_outputs = Vec::new();
197
198 for mv_name in self.registry.topo_order().to_vec() {
200 if let Some(operator) = self.operators.get_mut(&mv_name) {
201 let outputs = operator.on_timer(timer.clone(), ctx);
202 for output in outputs {
203 match output {
204 Output::Event(event) => {
205 self.output_queues
207 .entry(mv_name.clone())
208 .or_default()
209 .push_back(event);
210 }
211 other => all_outputs.push(other),
212 }
213 }
214 }
215 }
216
217 for mv_name in self.registry.topo_order().to_vec() {
219 let outputs = self.process_mv_inputs(&mv_name, ctx)?;
220 all_outputs.extend(outputs);
221 }
222
223 Ok(all_outputs)
224 }
225
226 #[must_use]
228 pub fn get_watermark(&self, name: &str) -> Option<i64> {
229 self.watermarks.get_watermark(name)
230 }
231
232 #[must_use]
234 pub fn pending_events(&self, name: &str) -> Option<&VecDeque<Event>> {
235 self.output_queues.get(name)
236 }
237
238 #[must_use]
240 pub fn metrics(&self) -> &PipelineMetrics {
241 &self.metrics
242 }
243
244 pub fn reset_metrics(&mut self) {
246 self.metrics = PipelineMetrics::default();
247 }
248
249 fn process_mv_inputs(
250 &mut self,
251 mv_name: &str,
252 ctx: &mut OperatorContext,
253 ) -> Result<Vec<Output>, MvError> {
254 let view = self
255 .registry
256 .get(mv_name)
257 .ok_or_else(|| MvError::ViewNotFound(mv_name.to_string()))?;
258
259 let mut inputs = Vec::new();
261 for source in &view.sources {
262 if let Some(queue) = self.output_queues.get_mut(source) {
263 inputs.extend(queue.drain(..));
264 }
265 }
266
267 if inputs.is_empty() {
268 return Ok(Vec::new());
269 }
270
271 let operator = self
273 .operators
274 .get_mut(mv_name)
275 .ok_or_else(|| MvError::OperatorNotFound(mv_name.to_string()))?;
276
277 let mut outputs = Vec::new();
279 for input in inputs {
280 let op_outputs = operator.process(&input, ctx);
281 *self
282 .metrics
283 .events_per_mv
284 .entry(mv_name.to_string())
285 .or_default() += 1;
286
287 for output in op_outputs {
288 match output {
289 Output::Event(event) => {
290 self.output_queues
292 .entry(mv_name.to_string())
293 .or_default()
294 .push_back(event);
295 }
296 other => outputs.push(other),
297 }
298 }
299 }
300
301 Ok(outputs)
302 }
303}
304
305#[derive(Debug, Clone)]
307pub struct MvPipelineCheckpoint {
308 pub operator_states: Vec<(String, OperatorState)>,
310 pub watermarks: Vec<(String, i64)>,
312 pub pending_events: Vec<(String, Vec<Event>)>,
314}
315
316impl MvPipelineExecutor {
317 #[must_use]
319 pub fn checkpoint(&self) -> MvPipelineCheckpoint {
320 let operator_states = self
321 .operators
322 .iter()
323 .map(|(name, op)| (name.clone(), op.checkpoint()))
324 .collect();
325
326 let watermarks = self.watermarks.checkpoint().watermarks;
327
328 let pending_events = self
329 .output_queues
330 .iter()
331 .map(|(name, queue)| (name.clone(), queue.iter().cloned().collect()))
332 .collect();
333
334 MvPipelineCheckpoint {
335 operator_states,
336 watermarks,
337 pending_events,
338 }
339 }
340
341 pub fn restore(&mut self, checkpoint: MvPipelineCheckpoint) -> Result<(), MvError> {
347 for (name, state) in checkpoint.operator_states {
349 if let Some(operator) = self.operators.get_mut(&name) {
350 operator.restore(state)?;
351 }
352 }
353
354 self.watermarks
356 .restore(super::watermark::WatermarkTrackerCheckpoint {
357 watermarks: checkpoint.watermarks,
358 });
359
360 self.output_queues.clear();
362 for (name, events) in checkpoint.pending_events {
363 self.output_queues
364 .insert(name, events.into_iter().collect());
365 }
366
367 Ok(())
368 }
369}
370
371#[derive(Debug, Clone)]
373pub struct PassThroughOperator {
374 pub id: String,
376}
377
378impl PassThroughOperator {
379 #[must_use]
381 pub fn new(id: impl Into<String>) -> Self {
382 Self { id: id.into() }
383 }
384}
385
386impl Operator for PassThroughOperator {
387 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
388 let mut outputs = OutputVec::new();
389 outputs.push(Output::Event(event.clone()));
390 outputs
391 }
392
393 fn on_timer(
394 &mut self,
395 _timer: crate::operator::Timer,
396 _ctx: &mut OperatorContext,
397 ) -> OutputVec {
398 OutputVec::new()
399 }
400
401 fn checkpoint(&self) -> OperatorState {
402 OperatorState {
403 operator_id: self.id.clone(),
404 data: Vec::new(),
405 }
406 }
407
408 fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
409 Ok(())
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use crate::mv::registry::MaterializedView;
417 use crate::state::InMemoryStore;
418 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
419 use arrow_array::{Int64Array, RecordBatch};
420 use arrow_schema::{DataType, Field, Schema};
421
422 fn setup_registry() -> Arc<MvRegistry> {
423 let mut registry = MvRegistry::new();
424 registry.register_base_table("trades");
425
426 let schema = Arc::new(Schema::new(vec![Field::new(
427 "value",
428 DataType::Int64,
429 false,
430 )]));
431 let mv = |n: &str, s: Vec<&str>| {
432 MaterializedView::new(
433 n,
434 "",
435 s.into_iter().map(String::from).collect(),
436 schema.clone(),
437 )
438 };
439
440 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
441 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
442
443 Arc::new(registry)
444 }
445
446 fn create_test_event(value: i64, timestamp: i64) -> Event {
447 let array = Arc::new(Int64Array::from(vec![value]));
448 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
449 Event::new(timestamp, batch)
450 }
451
452 fn create_context() -> (InMemoryStore, TimerService, BoundedOutOfOrdernessGenerator) {
453 (
454 InMemoryStore::new(),
455 TimerService::new(),
456 BoundedOutOfOrdernessGenerator::new(1000),
457 )
458 }
459
460 #[test]
461 fn test_executor_creation() {
462 let registry = setup_registry();
463 let executor = MvPipelineExecutor::new(registry);
464
465 assert!(!executor.is_ready());
466 let missing: Vec<_> = executor.missing_operators().collect();
467 assert!(missing.contains(&"ohlc_1s"));
468 assert!(missing.contains(&"ohlc_1m"));
469 }
470
471 #[test]
472 fn test_register_operators() {
473 let registry = setup_registry();
474 let mut executor = MvPipelineExecutor::new(registry);
475
476 executor
477 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
478 .unwrap();
479 executor
480 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
481 .unwrap();
482
483 assert!(executor.is_ready());
484 }
485
486 #[test]
487 fn test_register_nonexistent_mv() {
488 let registry = setup_registry();
489 let mut executor = MvPipelineExecutor::new(registry);
490
491 let result =
492 executor.register_operator("nonexistent", Box::new(PassThroughOperator::new("x")));
493 assert!(matches!(result, Err(MvError::ViewNotFound(_))));
494 }
495
496 #[test]
497 fn test_process_event_propagation() {
498 let registry = setup_registry();
499 let mut executor = MvPipelineExecutor::new(registry);
500
501 executor
502 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
503 .unwrap();
504 executor
505 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
506 .unwrap();
507
508 let (mut state, mut timers, mut wm_gen) = create_context();
509 let mut ctx = OperatorContext {
510 event_time: 1000,
511 processing_time: 1000,
512 timers: &mut timers,
513 state: &mut state,
514 watermark_generator: &mut wm_gen,
515 operator_index: 0,
516 };
517
518 let event = create_test_event(100, 1000);
519 let _outputs = executor
520 .process_source_event("trades", event, &mut ctx)
521 .unwrap();
522
523 assert_eq!(executor.metrics().events_processed, 1);
525 assert_eq!(executor.metrics().events_per_mv.get("ohlc_1s"), Some(&1));
526 assert_eq!(executor.metrics().events_per_mv.get("ohlc_1m"), Some(&1));
527
528 let pending = executor.pending_events("ohlc_1m");
530 assert!(pending.is_some());
531 assert_eq!(pending.unwrap().len(), 1);
532 }
533
534 #[test]
535 fn test_batch_processing() {
536 let registry = setup_registry();
537 let mut executor = MvPipelineExecutor::new(registry);
538
539 executor
540 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
541 .unwrap();
542 executor
543 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
544 .unwrap();
545
546 let (mut state, mut timers, mut wm_gen) = create_context();
547 let mut ctx = OperatorContext {
548 event_time: 1000,
549 processing_time: 1000,
550 timers: &mut timers,
551 state: &mut state,
552 watermark_generator: &mut wm_gen,
553 operator_index: 0,
554 };
555
556 let events = vec![
557 create_test_event(100, 1000),
558 create_test_event(200, 2000),
559 create_test_event(300, 3000),
560 ];
561
562 executor
563 .process_source_events("trades", events, &mut ctx)
564 .unwrap();
565
566 assert_eq!(executor.metrics().events_processed, 3);
567 assert_eq!(executor.metrics().events_per_mv.get("ohlc_1s"), Some(&3));
568 assert_eq!(executor.metrics().events_per_mv.get("ohlc_1m"), Some(&3));
569 }
570
571 #[test]
572 fn test_watermark_propagation() {
573 let registry = setup_registry();
574 let mut executor = MvPipelineExecutor::new(registry);
575
576 executor
577 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
578 .unwrap();
579 executor
580 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
581 .unwrap();
582
583 let updated = executor.advance_watermark("trades", 60_000);
584
585 assert!(!updated.is_empty());
586 assert_eq!(executor.get_watermark("trades"), Some(60_000));
587 assert_eq!(executor.get_watermark("ohlc_1s"), Some(60_000));
588 assert_eq!(executor.get_watermark("ohlc_1m"), Some(60_000));
589 }
590
591 #[test]
592 fn test_checkpoint_restore() {
593 let registry = setup_registry();
594 let mut executor = MvPipelineExecutor::new(Arc::clone(®istry));
595
596 executor
597 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
598 .unwrap();
599 executor
600 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
601 .unwrap();
602
603 let (mut state, mut timers, mut wm_gen) = create_context();
605 let mut ctx = OperatorContext {
606 event_time: 1000,
607 processing_time: 1000,
608 timers: &mut timers,
609 state: &mut state,
610 watermark_generator: &mut wm_gen,
611 operator_index: 0,
612 };
613
614 executor
615 .process_source_event("trades", create_test_event(100, 1000), &mut ctx)
616 .unwrap();
617 executor.advance_watermark("trades", 5000);
618
619 let checkpoint = executor.checkpoint();
621 assert!(!checkpoint.operator_states.is_empty());
622 assert!(!checkpoint.watermarks.is_empty());
623
624 let mut executor2 = MvPipelineExecutor::new(registry);
626 executor2
627 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
628 .unwrap();
629 executor2
630 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
631 .unwrap();
632
633 executor2.restore(checkpoint).unwrap();
634
635 assert_eq!(executor2.get_watermark("trades"), Some(5000));
637 }
638
639 #[test]
640 fn test_metrics_reset() {
641 let registry = setup_registry();
642 let mut executor = MvPipelineExecutor::new(registry);
643
644 executor
645 .register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s")))
646 .unwrap();
647 executor
648 .register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m")))
649 .unwrap();
650
651 let (mut state, mut timers, mut wm_gen) = create_context();
652 let mut ctx = OperatorContext {
653 event_time: 1000,
654 processing_time: 1000,
655 timers: &mut timers,
656 state: &mut state,
657 watermark_generator: &mut wm_gen,
658 operator_index: 0,
659 };
660
661 executor
662 .process_source_event("trades", create_test_event(100, 1000), &mut ctx)
663 .unwrap();
664 assert_eq!(executor.metrics().events_processed, 1);
665
666 executor.reset_metrics();
667 assert_eq!(executor.metrics().events_processed, 0);
668 }
669}