Skip to main content

reifydb_sub_flow/subsystem/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9	any::Any,
10	sync::{Arc, RwLock},
11	time::Duration,
12};
13
14#[cfg(reifydb_target = "native")]
15use ffi::load_ffi_operators;
16use reifydb_cdc::{
17	consume::{
18		consumer::{CdcConsume, CdcConsumer},
19		poll::{PollConsumer, PollConsumerConfig},
20	},
21	storage::CdcStore,
22};
23use reifydb_core::{
24	interface::{
25		WithEventBus,
26		cdc::{Cdc, CdcConsumerId},
27		flow::FlowLagsProvider,
28		version::{ComponentType, HasVersion, SystemVersion},
29	},
30	key::{EncodableKey, cdc_consumer::CdcConsumerKey},
31	util::ioc::IocContainer,
32};
33use reifydb_engine::engine::StandardEngine;
34use reifydb_runtime::{SharedRuntime, actor::system::ActorHandle};
35use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
36use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::Transaction};
37use reifydb_type::Result;
38use tracing::{info, warn};
39
40use crate::{
41	builder::FlowBuilderConfig,
42	catalog::FlowCatalog,
43	deferred::{
44		coordinator::{CoordinatorActor, CoordinatorMsg, FlowConsumeRef, extract_new_flow_ids},
45		lag::FlowLags,
46		pool::{PoolActor, PoolMsg},
47		tracker::PrimitiveVersionTracker,
48		worker::{FlowMsg, FlowWorkerActor},
49	},
50	engine::FlowEngine,
51	transactional::{
52		interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
53		registrar::TransactionalFlowRegistrar,
54	},
55};
56
57/// Thin wrapper around the deferred coordinator that intercepts new flows
58/// and registers transactional ones before forwarding to the coordinator.
59struct FlowConsumeDispatcher {
60	coordinator: FlowConsumeRef,
61	registrar: TransactionalFlowRegistrar,
62	flow_catalog: FlowCatalog,
63	engine: StandardEngine,
64}
65
66impl CdcConsume for FlowConsumeDispatcher {
67	fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
68		// Check for newly-created flows that might be transactional views.
69		let new_flow_ids = extract_new_flow_ids(&cdcs);
70		if !new_flow_ids.is_empty() {
71			if let Ok(mut query) = self.engine.begin_query() {
72				for flow_id in new_flow_ids {
73					match self
74						.flow_catalog
75						.get_or_load_flow(&mut Transaction::Query(&mut query), flow_id)
76					{
77						Ok((flow, true)) => {
78							// Newly-loaded flow: try to register as transactional.
79							// If transactional, FlowCatalog now caches it so the
80							// coordinator's get_or_load_flow sees is_new=false.
81							match self.registrar.try_register(flow) {
82								Ok(true) => { /* transactional, leave cached */ }
83								Ok(false) => {
84									// NOT transactional — remove from cache so
85									// the coordinator discovers it as new.
86									self.flow_catalog.remove(flow_id);
87								}
88								Err(e) => {
89									self.flow_catalog.remove(flow_id);
90									warn!(
91										flow_id = flow_id.0,
92										error = %e,
93										"failed to register transactional flow"
94									);
95								}
96							}
97						}
98						Ok((_, false)) => {
99							// Already cached — nothing to do.
100						}
101						Err(e) => {
102							warn!(
103								flow_id = flow_id.0,
104								error = %e,
105								"failed to load flow for transactional check"
106							);
107						}
108					}
109				}
110			}
111		}
112
113		// Forward CDC batch to the deferred coordinator.
114		// Transactional flows will have is_new=false in the coordinator's
115		// get_or_load_flow call (shared cache), so they are skipped automatically.
116		self.coordinator.consume(cdcs, reply);
117	}
118}
119
120/// Flow subsystem - single-threaded flow processing.
121pub struct FlowSubsystem {
122	consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
123	worker_handles: Vec<ActorHandle<FlowMsg>>,
124	pool_handle: Option<ActorHandle<PoolMsg>>,
125	coordinator_handle: Option<ActorHandle<CoordinatorMsg>>,
126	transactional_flow_engine: Arc<RwLock<FlowEngine>>,
127	running: bool,
128}
129
130impl FlowSubsystem {
131	/// Create a new flow subsystem.
132	pub fn new(config: FlowBuilderConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
133		let catalog = engine.catalog();
134		let executor = engine.executor();
135		let event_bus = engine.event_bus().clone();
136
137		#[cfg(reifydb_target = "native")]
138		if let Some(ref operators_dir) = config.operators_dir {
139			if let Err(e) = load_ffi_operators(operators_dir, &event_bus) {
140				panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
141			}
142			event_bus.wait_for_completion();
143		}
144
145		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
146		let clock = runtime.clock().clone();
147		let clock_for_factory = clock.clone();
148
149		let custom_operators = Arc::new(config.custom_operators);
150		let custom_operators_for_factory = custom_operators.clone();
151
152		let factory_builder = move || {
153			let cat = catalog.clone();
154			let exec = executor.clone();
155			let bus = event_bus.clone();
156			let clk = clock_for_factory.clone();
157			let co = custom_operators_for_factory.clone();
158
159			move || FlowEngine::new(cat, exec, bus, clk, co)
160		};
161
162		let primitive_tracker = Arc::new(PrimitiveVersionTracker::new());
163
164		let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
165
166		let num_workers = config.num_workers;
167		info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
168
169		let actor_system = engine.actor_system();
170
171		let mut worker_refs = Vec::with_capacity(num_workers);
172		let mut worker_handles = Vec::with_capacity(num_workers);
173		for i in 0..num_workers {
174			let worker_factory = factory_builder();
175			let worker = FlowWorkerActor::new(worker_factory, engine.clone(), engine.catalog());
176			let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
177			worker_refs.push(handle.actor_ref().clone());
178			worker_handles.push(handle);
179		}
180
181		let pool = PoolActor::new(worker_refs, clock.clone());
182		let pool_handle = actor_system.spawn("flow-pool", pool);
183		let pool_ref = pool_handle.actor_ref().clone();
184
185		// Shared flow catalog: clones share the same cache so the dispatcher
186		// and coordinator see the same flow-cache state.
187		let flow_catalog = FlowCatalog::new(engine.catalog());
188
189		let coordinator = CoordinatorActor::new(
190			engine.clone(),
191			flow_catalog.clone(),
192			pool_ref,
193			primitive_tracker.clone(),
194			cdc_store.clone(),
195			num_workers,
196			clock,
197		);
198		let coordinator_handle = actor_system.spawn("flow-coordinator", coordinator);
199		let actor_ref = coordinator_handle.actor_ref().clone();
200
201		let consumer_id = CdcConsumerId::new("flow-coordinator");
202		let consumer_key = CdcConsumerKey {
203			consumer: consumer_id.clone(),
204		}
205		.encode();
206		let consume_ref = FlowConsumeRef {
207			actor_ref,
208			consumer_key,
209		};
210
211		// Transactional flow engine — a separate FlowEngine for transactional views only.
212		let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
213			engine.catalog(),
214			engine.executor(),
215			engine.event_bus().clone(),
216			runtime.clock().clone(),
217			custom_operators.clone(),
218		)));
219
220		// Registrar: detects transactional flows from CDC and registers them.
221		let registrar = TransactionalFlowRegistrar {
222			flow_engine: transactional_flow_engine.clone(),
223			engine: engine.clone(),
224			catalog: engine.catalog(),
225		};
226
227		let transactional_flow_engine_for_self = transactional_flow_engine.clone();
228
229		// Register both pre-commit and post-commit interceptors via a single factory function.
230		{
231			let flow_engine_for_interceptor = transactional_flow_engine.clone();
232			let engine_for_interceptor = engine.clone();
233			let catalog_for_interceptor = engine.catalog();
234			let registrar_for_interceptor = TransactionalFlowRegistrar {
235				flow_engine: transactional_flow_engine,
236				engine: engine.clone(),
237				catalog: engine.catalog(),
238			};
239
240			engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
241				interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
242					flow_engine: flow_engine_for_interceptor.clone(),
243					engine: engine_for_interceptor.clone(),
244					catalog: catalog_for_interceptor.clone(),
245				}));
246				interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
247					registrar: TransactionalFlowRegistrar {
248						flow_engine: registrar_for_interceptor.flow_engine.clone(),
249						engine: registrar_for_interceptor.engine.clone(),
250						catalog: registrar_for_interceptor.catalog.clone(),
251					},
252				}));
253			}));
254		}
255
256		ioc.register_service::<Arc<dyn FlowLagsProvider>>(Arc::new(FlowLags::new(
257			primitive_tracker,
258			engine.clone(),
259			flow_catalog.clone(),
260		)));
261
262		let poll_config =
263			PollConsumerConfig::new(consumer_id, "flow-cdc-poll", Duration::from_millis(10), Some(100));
264
265		// Wrap the coordinator reference in a dispatcher that handles transactional flows.
266		let dispatcher = FlowConsumeDispatcher {
267			coordinator: consume_ref,
268			registrar,
269			flow_catalog: flow_catalog.clone(),
270			engine: engine.clone(),
271		};
272
273		let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
274
275		Self {
276			consumer,
277			worker_handles,
278			pool_handle: Some(pool_handle),
279			coordinator_handle: Some(coordinator_handle),
280			transactional_flow_engine: transactional_flow_engine_for_self,
281			running: false,
282		}
283	}
284}
285
286impl Subsystem for FlowSubsystem {
287	fn name(&self) -> &'static str {
288		"sub-flow"
289	}
290
291	fn start(&mut self) -> Result<()> {
292		if self.running {
293			return Ok(());
294		}
295
296		self.consumer.start()?;
297		self.running = true;
298		Ok(())
299	}
300
301	fn shutdown(&mut self) -> Result<()> {
302		if !self.running {
303			return Ok(());
304		}
305
306		self.consumer.stop()?;
307
308		if let Some(handle) = self.coordinator_handle.take() {
309			let _ = handle.join();
310		}
311
312		if let Some(handle) = self.pool_handle.take() {
313			let _ = handle.join();
314		}
315
316		for handle in self.worker_handles.drain(..) {
317			let _ = handle.join();
318		}
319
320		// Clear the transactional flow engine to drop all Arc<Operators>,
321		// which triggers FFI operator cleanup and frees LRU caches.
322		if let Ok(mut engine) = self.transactional_flow_engine.write() {
323			engine.clear();
324		}
325
326		self.running = false;
327		Ok(())
328	}
329
330	fn is_running(&self) -> bool {
331		self.running
332	}
333
334	fn health_status(&self) -> HealthStatus {
335		if self.is_running() {
336			HealthStatus::Healthy
337		} else {
338			HealthStatus::Unknown
339		}
340	}
341
342	fn as_any(&self) -> &dyn Any {
343		self
344	}
345
346	fn as_any_mut(&mut self) -> &mut dyn Any {
347		self
348	}
349}
350
351impl HasVersion for FlowSubsystem {
352	fn version(&self) -> SystemVersion {
353		SystemVersion {
354			name: env!("CARGO_PKG_NAME")
355				.strip_prefix("reifydb-")
356				.unwrap_or(env!("CARGO_PKG_NAME"))
357				.to_string(),
358			version: env!("CARGO_PKG_VERSION").to_string(),
359			description: "Data flow and stream processing subsystem".to_string(),
360			r#type: ComponentType::Subsystem,
361		}
362	}
363}
364
365impl Drop for FlowSubsystem {
366	fn drop(&mut self) {
367		if self.running {
368			let _ = self.shutdown();
369		}
370	}
371}