reifydb_sub_flow/subsystem/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod factory;
5pub mod intercept;
6
7use std::{any::Any, path::PathBuf, time::Duration};
8
9pub use factory::FlowSubsystemFactory;
10use reifydb_cdc::{CdcConsumer, PollConsumer, PollConsumerConfig};
11use reifydb_core::{
12	Result,
13	interface::{
14		CdcConsumerId,
15		version::{ComponentType, HasVersion, SystemVersion},
16	},
17	ioc::IocContainer,
18};
19use reifydb_engine::StandardEngine;
20use reifydb_sub_api::{HealthStatus, Priority, SchedulerService, Subsystem};
21
22use crate::{builder::OperatorFactory, consumer::FlowConsumer};
23
24pub struct FlowSubsystemConfig {
25	/// Unique identifier for this consumer
26	pub consumer_id: CdcConsumerId,
27	/// How often to poll for new CDC events
28	pub poll_interval: Duration,
29	/// Priority for the polling task in the worker pool
30	pub priority: Priority,
31	/// Custom operator factories
32	pub operators: Vec<(String, OperatorFactory)>,
33	/// Maximum batch size for CDC polling (None = unbounded)
34	pub max_batch_size: Option<u64>,
35	/// Directory to scan for FFI operator shared libraries
36	pub operators_dir: Option<PathBuf>,
37}
38
39pub struct FlowSubsystem {
40	consumer: PollConsumer<FlowConsumer>,
41	running: bool,
42}
43
44impl FlowSubsystem {
45	pub fn new(cfg: FlowSubsystemConfig, ioc: &IocContainer) -> Result<Self> {
46		let engine = ioc.resolve::<StandardEngine>()?;
47		let scheduler = ioc.resolve::<SchedulerService>().ok();
48
49		let consumer = FlowConsumer::new(engine.clone(), cfg.operators.clone(), cfg.operators_dir, scheduler);
50
51		Ok(Self {
52			consumer: PollConsumer::new(
53				PollConsumerConfig::new(cfg.consumer_id.clone(), cfg.poll_interval, cfg.max_batch_size),
54				engine.clone(),
55				consumer,
56			),
57			running: false,
58		})
59	}
60}
61
62impl Drop for FlowSubsystem {
63	fn drop(&mut self) {
64		let _ = self.shutdown();
65	}
66}
67
68impl Subsystem for FlowSubsystem {
69	fn name(&self) -> &'static str {
70		"sub-flow"
71	}
72
73	fn start(&mut self) -> Result<()> {
74		if self.running {
75			return Ok(());
76		}
77
78		self.consumer.start()?;
79		self.running = true;
80
81		Ok(())
82	}
83
84	fn shutdown(&mut self) -> Result<()> {
85		if !self.running {
86			return Ok(());
87		}
88
89		self.consumer.stop()?;
90		self.running = false;
91		Ok(())
92	}
93
94	fn is_running(&self) -> bool {
95		self.running
96	}
97
98	fn health_status(&self) -> HealthStatus {
99		if self.is_running() {
100			HealthStatus::Healthy
101		} else {
102			HealthStatus::Unknown
103		}
104	}
105
106	fn as_any(&self) -> &dyn Any {
107		self
108	}
109
110	fn as_any_mut(&mut self) -> &mut dyn Any {
111		self
112	}
113}
114
115impl HasVersion for FlowSubsystem {
116	fn version(&self) -> SystemVersion {
117		SystemVersion {
118			name: "sub-flow".to_string(),
119			version: env!("CARGO_PKG_VERSION").to_string(),
120			description: "Data flow and stream processing subsystem".to_string(),
121			r#type: ComponentType::Subsystem,
122		}
123	}
124}