reifydb_sub_flow/subsystem/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod factory;
5pub mod intercept;
6
7use std::{any::Any, path::PathBuf, time::Duration};
8
9use async_trait::async_trait;
10pub use factory::FlowSubsystemFactory;
11use reifydb_cdc::{CdcConsumer, PollConsumer, PollConsumerConfig};
12use reifydb_core::{
13	Result,
14	interface::{
15		CdcConsumerId,
16		version::{ComponentType, HasVersion, SystemVersion},
17	},
18	ioc::IocContainer,
19};
20use reifydb_engine::StandardEngine;
21use reifydb_sub_api::{HealthStatus, Subsystem};
22use tracing::instrument;
23
24use crate::{builder::OperatorFactory, consumer::FlowConsumer};
25
26pub struct FlowSubsystemConfig {
27	/// Unique identifier for this consumer
28	pub consumer_id: CdcConsumerId,
29	/// How often to poll for new CDC events
30	pub poll_interval: Duration,
31	/// Custom operator factories
32	pub operators: Vec<(String, OperatorFactory)>,
33	/// Maximum batch size for CDC polling (None = unbounded)
34	pub max_batch_size: Option<u64>,
35	/// Directory to scan for FFI operator shared libraries
36	pub operators_dir: Option<PathBuf>,
37}
38
39pub struct FlowSubsystem {
40	consumer: PollConsumer<FlowConsumer>,
41	running: bool,
42}
43
44impl FlowSubsystem {
45	#[instrument(name = "flow::subsystem::new", level = "debug", skip(cfg, ioc))]
46	pub async fn new(cfg: FlowSubsystemConfig, ioc: &IocContainer) -> Result<Self> {
47		let engine = ioc.resolve::<StandardEngine>()?;
48
49		let consumer = FlowConsumer::new(engine.clone(), cfg.operators.clone(), cfg.operators_dir).await;
50
51		Ok(Self {
52			consumer: PollConsumer::new(
53				PollConsumerConfig::new(cfg.consumer_id.clone(), cfg.poll_interval, cfg.max_batch_size),
54				engine.clone(),
55				consumer,
56			),
57			running: false,
58		})
59	}
60}
61
62#[async_trait]
63impl Subsystem for FlowSubsystem {
64	fn name(&self) -> &'static str {
65		"sub-flow"
66	}
67
68	#[instrument(name = "flow::subsystem::start", level = "info", skip(self))]
69	async fn start(&mut self) -> Result<()> {
70		if self.running {
71			return Ok(());
72		}
73
74		self.consumer.start()?;
75		self.running = true;
76
77		Ok(())
78	}
79
80	#[instrument(name = "flow::subsystem::shutdown", level = "info", skip(self))]
81	async fn shutdown(&mut self) -> Result<()> {
82		if !self.running {
83			return Ok(());
84		}
85
86		self.consumer.stop()?;
87		self.running = false;
88		Ok(())
89	}
90
91	#[instrument(name = "flow::subsystem::is_running", level = "trace", skip(self))]
92	fn is_running(&self) -> bool {
93		self.running
94	}
95
96	#[instrument(name = "flow::subsystem::health_status", level = "debug", skip(self))]
97	fn health_status(&self) -> HealthStatus {
98		if self.is_running() {
99			HealthStatus::Healthy
100		} else {
101			HealthStatus::Unknown
102		}
103	}
104
105	fn as_any(&self) -> &dyn Any {
106		self
107	}
108
109	fn as_any_mut(&mut self) -> &mut dyn Any {
110		self
111	}
112}
113
114impl HasVersion for FlowSubsystem {
115	fn version(&self) -> SystemVersion {
116		SystemVersion {
117			name: "sub-flow".to_string(),
118			version: env!("CARGO_PKG_VERSION").to_string(),
119			description: "Data flow and stream processing subsystem".to_string(),
120			r#type: ComponentType::Subsystem,
121		}
122	}
123}