reifydb_sub_flow/subsystem/
mod.rs1mod factory;
5pub mod intercept;
6
7use std::{any::Any, path::PathBuf, time::Duration};
8
9pub use factory::FlowSubsystemFactory;
10use reifydb_cdc::{CdcConsumer, PollConsumer, PollConsumerConfig};
11use reifydb_core::{
12 Result,
13 interface::{
14 CdcConsumerId,
15 version::{ComponentType, HasVersion, SystemVersion},
16 },
17 ioc::IocContainer,
18};
19use reifydb_engine::StandardEngine;
20use reifydb_sub_api::{HealthStatus, Priority, SchedulerService, Subsystem};
21use tracing::instrument;
22
23use crate::{builder::OperatorFactory, consumer::FlowConsumer};
24
25pub struct FlowSubsystemConfig {
26 pub consumer_id: CdcConsumerId,
28 pub poll_interval: Duration,
30 pub priority: Priority,
32 pub operators: Vec<(String, OperatorFactory)>,
34 pub max_batch_size: Option<u64>,
36 pub operators_dir: Option<PathBuf>,
38}
39
40pub struct FlowSubsystem {
41 consumer: PollConsumer<FlowConsumer>,
42 running: bool,
43}
44
45impl FlowSubsystem {
46 #[instrument(level = "debug", skip(cfg, ioc))]
47 pub fn new(cfg: FlowSubsystemConfig, ioc: &IocContainer) -> Result<Self> {
48 let engine = ioc.resolve::<StandardEngine>()?;
49 let scheduler = ioc.resolve::<SchedulerService>().ok();
50
51 let consumer = FlowConsumer::new(engine.clone(), cfg.operators.clone(), cfg.operators_dir, scheduler);
52
53 Ok(Self {
54 consumer: PollConsumer::new(
55 PollConsumerConfig::new(cfg.consumer_id.clone(), cfg.poll_interval, cfg.max_batch_size),
56 engine.clone(),
57 consumer,
58 ),
59 running: false,
60 })
61 }
62}
63
64impl Drop for FlowSubsystem {
65 fn drop(&mut self) {
66 let _ = self.shutdown();
67 }
68}
69
70impl Subsystem for FlowSubsystem {
71 fn name(&self) -> &'static str {
72 "sub-flow"
73 }
74
75 #[instrument(level = "info", skip(self))]
76 fn start(&mut self) -> Result<()> {
77 if self.running {
78 return Ok(());
79 }
80
81 self.consumer.start()?;
82 self.running = true;
83
84 Ok(())
85 }
86
87 #[instrument(level = "info", skip(self))]
88 fn shutdown(&mut self) -> Result<()> {
89 if !self.running {
90 return Ok(());
91 }
92
93 self.consumer.stop()?;
94 self.running = false;
95 Ok(())
96 }
97
98 #[instrument(level = "trace", skip(self))]
99 fn is_running(&self) -> bool {
100 self.running
101 }
102
103 #[instrument(level = "debug", skip(self))]
104 fn health_status(&self) -> HealthStatus {
105 if self.is_running() {
106 HealthStatus::Healthy
107 } else {
108 HealthStatus::Unknown
109 }
110 }
111
112 fn as_any(&self) -> &dyn Any {
113 self
114 }
115
116 fn as_any_mut(&mut self) -> &mut dyn Any {
117 self
118 }
119}
120
121impl HasVersion for FlowSubsystem {
122 fn version(&self) -> SystemVersion {
123 SystemVersion {
124 name: "sub-flow".to_string(),
125 version: env!("CARGO_PKG_VERSION").to_string(),
126 description: "Data flow and stream processing subsystem".to_string(),
127 r#type: ComponentType::Subsystem,
128 }
129 }
130}