reifydb_sub_flow/subsystem/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod factory;
5pub mod intercept;
6
7use std::{any::Any, path::PathBuf, time::Duration};
8
9pub use factory::FlowSubsystemFactory;
10use reifydb_cdc::{CdcConsumer, PollConsumer, PollConsumerConfig};
11use reifydb_core::{
12	Result,
13	interface::{
14		CdcConsumerId,
15		version::{ComponentType, HasVersion, SystemVersion},
16	},
17	ioc::IocContainer,
18};
19use reifydb_engine::StandardEngine;
20use reifydb_sub_api::{HealthStatus, Priority, SchedulerService, Subsystem};
21use tracing::instrument;
22
23use crate::{builder::OperatorFactory, consumer::FlowConsumer};
24
25pub struct FlowSubsystemConfig {
26	/// Unique identifier for this consumer
27	pub consumer_id: CdcConsumerId,
28	/// How often to poll for new CDC events
29	pub poll_interval: Duration,
30	/// Priority for the polling task in the worker pool
31	pub priority: Priority,
32	/// Custom operator factories
33	pub operators: Vec<(String, OperatorFactory)>,
34	/// Maximum batch size for CDC polling (None = unbounded)
35	pub max_batch_size: Option<u64>,
36	/// Directory to scan for FFI operator shared libraries
37	pub operators_dir: Option<PathBuf>,
38}
39
40pub struct FlowSubsystem {
41	consumer: PollConsumer<FlowConsumer>,
42	running: bool,
43}
44
45impl FlowSubsystem {
46	#[instrument(level = "debug", skip(cfg, ioc))]
47	pub fn new(cfg: FlowSubsystemConfig, ioc: &IocContainer) -> Result<Self> {
48		let engine = ioc.resolve::<StandardEngine>()?;
49		let scheduler = ioc.resolve::<SchedulerService>().ok();
50
51		let consumer = FlowConsumer::new(engine.clone(), cfg.operators.clone(), cfg.operators_dir, scheduler);
52
53		Ok(Self {
54			consumer: PollConsumer::new(
55				PollConsumerConfig::new(cfg.consumer_id.clone(), cfg.poll_interval, cfg.max_batch_size),
56				engine.clone(),
57				consumer,
58			),
59			running: false,
60		})
61	}
62}
63
64impl Drop for FlowSubsystem {
65	fn drop(&mut self) {
66		let _ = self.shutdown();
67	}
68}
69
70impl Subsystem for FlowSubsystem {
71	fn name(&self) -> &'static str {
72		"sub-flow"
73	}
74
75	#[instrument(level = "info", skip(self))]
76	fn start(&mut self) -> Result<()> {
77		if self.running {
78			return Ok(());
79		}
80
81		self.consumer.start()?;
82		self.running = true;
83
84		Ok(())
85	}
86
87	#[instrument(level = "info", skip(self))]
88	fn shutdown(&mut self) -> Result<()> {
89		if !self.running {
90			return Ok(());
91		}
92
93		self.consumer.stop()?;
94		self.running = false;
95		Ok(())
96	}
97
98	#[instrument(level = "trace", skip(self))]
99	fn is_running(&self) -> bool {
100		self.running
101	}
102
103	#[instrument(level = "debug", skip(self))]
104	fn health_status(&self) -> HealthStatus {
105		if self.is_running() {
106			HealthStatus::Healthy
107		} else {
108			HealthStatus::Unknown
109		}
110	}
111
112	fn as_any(&self) -> &dyn Any {
113		self
114	}
115
116	fn as_any_mut(&mut self) -> &mut dyn Any {
117		self
118	}
119}
120
121impl HasVersion for FlowSubsystem {
122	fn version(&self) -> SystemVersion {
123		SystemVersion {
124			name: "sub-flow".to_string(),
125			version: env!("CARGO_PKG_VERSION").to_string(),
126			description: "Data flow and stream processing subsystem".to_string(),
127			r#type: ComponentType::Subsystem,
128		}
129	}
130}