reifydb_sub_flow/subsystem/
mod.rs1mod factory;
5pub mod intercept;
6
7use std::{any::Any, path::PathBuf, time::Duration};
8
9pub use factory::FlowSubsystemFactory;
10use reifydb_cdc::{CdcConsumer, PollConsumer, PollConsumerConfig};
11use reifydb_core::{
12 Result,
13 interface::{
14 CdcConsumerId,
15 version::{ComponentType, HasVersion, SystemVersion},
16 },
17 ioc::IocContainer,
18};
19use reifydb_engine::StandardEngine;
20use reifydb_sub_api::{HealthStatus, Priority, SchedulerService, Subsystem};
21
22use crate::{builder::OperatorFactory, consumer::FlowConsumer};
23
24pub struct FlowSubsystemConfig {
25 pub consumer_id: CdcConsumerId,
27 pub poll_interval: Duration,
29 pub priority: Priority,
31 pub operators: Vec<(String, OperatorFactory)>,
33 pub max_batch_size: Option<u64>,
35 pub operators_dir: Option<PathBuf>,
37}
38
39pub struct FlowSubsystem {
40 consumer: PollConsumer<FlowConsumer>,
41 running: bool,
42}
43
44impl FlowSubsystem {
45 pub fn new(cfg: FlowSubsystemConfig, ioc: &IocContainer) -> Result<Self> {
46 let engine = ioc.resolve::<StandardEngine>()?;
47 let scheduler = ioc.resolve::<SchedulerService>().ok();
48
49 let consumer = FlowConsumer::new(engine.clone(), cfg.operators.clone(), cfg.operators_dir, scheduler);
50
51 Ok(Self {
52 consumer: PollConsumer::new(
53 PollConsumerConfig::new(cfg.consumer_id.clone(), cfg.poll_interval, cfg.max_batch_size),
54 engine.clone(),
55 consumer,
56 ),
57 running: false,
58 })
59 }
60}
61
62impl Drop for FlowSubsystem {
63 fn drop(&mut self) {
64 let _ = self.shutdown();
65 }
66}
67
68impl Subsystem for FlowSubsystem {
69 fn name(&self) -> &'static str {
70 "sub-flow"
71 }
72
73 fn start(&mut self) -> Result<()> {
74 if self.running {
75 return Ok(());
76 }
77
78 self.consumer.start()?;
79 self.running = true;
80
81 Ok(())
82 }
83
84 fn shutdown(&mut self) -> Result<()> {
85 if !self.running {
86 return Ok(());
87 }
88
89 self.consumer.stop()?;
90 self.running = false;
91 Ok(())
92 }
93
94 fn is_running(&self) -> bool {
95 self.running
96 }
97
98 fn health_status(&self) -> HealthStatus {
99 if self.is_running() {
100 HealthStatus::Healthy
101 } else {
102 HealthStatus::Unknown
103 }
104 }
105
106 fn as_any(&self) -> &dyn Any {
107 self
108 }
109
110 fn as_any_mut(&mut self) -> &mut dyn Any {
111 self
112 }
113}
114
115impl HasVersion for FlowSubsystem {
116 fn version(&self) -> SystemVersion {
117 SystemVersion {
118 name: "sub-flow".to_string(),
119 version: env!("CARGO_PKG_VERSION").to_string(),
120 description: "Data flow and stream processing subsystem".to_string(),
121 r#type: ComponentType::Subsystem,
122 }
123 }
124}