reifydb_sub_flow/subsystem/
mod.rs1mod factory;
5pub mod intercept;
6
7use std::{any::Any, path::PathBuf, time::Duration};
8
9use async_trait::async_trait;
10pub use factory::FlowSubsystemFactory;
11use reifydb_cdc::{CdcConsumer, PollConsumer, PollConsumerConfig};
12use reifydb_core::{
13 Result,
14 interface::{
15 CdcConsumerId,
16 version::{ComponentType, HasVersion, SystemVersion},
17 },
18 ioc::IocContainer,
19};
20use reifydb_engine::StandardEngine;
21use reifydb_sub_api::{HealthStatus, Subsystem};
22use tracing::instrument;
23
24use crate::{builder::OperatorFactory, consumer::FlowConsumer};
25
26pub struct FlowSubsystemConfig {
27 pub consumer_id: CdcConsumerId,
29 pub poll_interval: Duration,
31 pub operators: Vec<(String, OperatorFactory)>,
33 pub max_batch_size: Option<u64>,
35 pub operators_dir: Option<PathBuf>,
37}
38
39pub struct FlowSubsystem {
40 consumer: PollConsumer<FlowConsumer>,
41 running: bool,
42}
43
44impl FlowSubsystem {
45 #[instrument(name = "flow::subsystem::new", level = "debug", skip(cfg, ioc))]
46 pub async fn new(cfg: FlowSubsystemConfig, ioc: &IocContainer) -> Result<Self> {
47 let engine = ioc.resolve::<StandardEngine>()?;
48
49 let consumer = FlowConsumer::new(engine.clone(), cfg.operators.clone(), cfg.operators_dir).await;
50
51 Ok(Self {
52 consumer: PollConsumer::new(
53 PollConsumerConfig::new(cfg.consumer_id.clone(), cfg.poll_interval, cfg.max_batch_size),
54 engine.clone(),
55 consumer,
56 ),
57 running: false,
58 })
59 }
60}
61
62#[async_trait]
63impl Subsystem for FlowSubsystem {
64 fn name(&self) -> &'static str {
65 "sub-flow"
66 }
67
68 #[instrument(name = "flow::subsystem::start", level = "info", skip(self))]
69 async fn start(&mut self) -> Result<()> {
70 if self.running {
71 return Ok(());
72 }
73
74 self.consumer.start()?;
75 self.running = true;
76
77 Ok(())
78 }
79
80 #[instrument(name = "flow::subsystem::shutdown", level = "info", skip(self))]
81 async fn shutdown(&mut self) -> Result<()> {
82 if !self.running {
83 return Ok(());
84 }
85
86 self.consumer.stop()?;
87 self.running = false;
88 Ok(())
89 }
90
91 #[instrument(name = "flow::subsystem::is_running", level = "trace", skip(self))]
92 fn is_running(&self) -> bool {
93 self.running
94 }
95
96 #[instrument(name = "flow::subsystem::health_status", level = "debug", skip(self))]
97 fn health_status(&self) -> HealthStatus {
98 if self.is_running() {
99 HealthStatus::Healthy
100 } else {
101 HealthStatus::Unknown
102 }
103 }
104
105 fn as_any(&self) -> &dyn Any {
106 self
107 }
108
109 fn as_any_mut(&mut self) -> &mut dyn Any {
110 self
111 }
112}
113
114impl HasVersion for FlowSubsystem {
115 fn version(&self) -> SystemVersion {
116 SystemVersion {
117 name: "sub-flow".to_string(),
118 version: env!("CARGO_PKG_VERSION").to_string(),
119 description: "Data flow and stream processing subsystem".to_string(),
120 r#type: ComponentType::Subsystem,
121 }
122 }
123}