Skip to main content

reifydb_sub_flow/subsystem/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9	any::Any,
10	collections::HashMap,
11	sync::{Arc, RwLock},
12	time::Duration,
13};
14
15#[cfg(reifydb_target = "native")]
16use ffi::load_ffi_operators;
17use reifydb_cdc::{
18	consume::{
19		consumer::{CdcConsume, CdcConsumer},
20		poll::{PollConsumer, PollConsumerConfig},
21	},
22	storage::CdcStore,
23};
24use reifydb_core::{
25	actors::flow::{FlowCoordinatorHandle, FlowHandle, FlowMessage, FlowPoolHandle},
26	interface::{
27		WithEventBus,
28		cdc::{Cdc, CdcConsumerId},
29		flow::FlowWatermarkSampler,
30		version::{ComponentType, HasVersion, SystemVersion},
31	},
32	util::ioc::IocContainer,
33};
34use reifydb_engine::engine::StandardEngine;
35use reifydb_rql::flow::loader::load_flow_dag;
36use reifydb_runtime::{
37	SharedRuntime,
38	actor::mailbox::ActorRef,
39	context::{RuntimeContext, clock::Clock},
40};
41use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
42use reifydb_transaction::{
43	interceptor::interceptors::Interceptors,
44	transaction::{TestTransaction, Transaction},
45};
46use reifydb_type::{Result, value::identity::IdentityId};
47use tracing::{info, warn};
48
49use crate::{
50	builder::{FlowConfig, OperatorFactory},
51	catalog::FlowCatalog,
52	deferred::{
53		coordinator::{CoordinatorActor, FlowConsumeRef, extract_new_flow_ids},
54		pool::PoolActor,
55		tracker::ShapeVersionTracker,
56		watermark::compute_flow_watermarks,
57		worker::FlowWorkerActor,
58	},
59	engine::FlowEngine,
60	transactional::{
61		interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
62		registry::TransactionalFlowRegistry,
63	},
64};
65
66/// Thin wrapper around the deferred coordinator that intercepts new flows
67/// and registers transactional ones before forwarding to the coordinator.
68struct FlowConsumeDispatcher {
69	coordinator: FlowConsumeRef,
70	registrar: TransactionalFlowRegistry,
71	flow_catalog: FlowCatalog,
72	engine: StandardEngine,
73}
74
75impl CdcConsume for FlowConsumeDispatcher {
76	fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
77		// Check for newly-created flows that might be transactional views.
78		let new_flow_ids = extract_new_flow_ids(&cdcs);
79		if !new_flow_ids.is_empty()
80			&& let Ok(mut query) = self.engine.begin_query(IdentityId::system())
81		{
82			for flow_id in new_flow_ids {
83				match self.flow_catalog.get_or_load_flow(&mut Transaction::Query(&mut query), flow_id) {
84					Ok((flow, true)) => {
85						// Newly-loaded flow: try to register as transactional.
86						// If transactional, FlowCatalog now caches it so the
87						// coordinator's get_or_load_flow sees is_new=false.
88						match self.registrar.try_register(flow) {
89							Ok(true) => { /* transactional, leave cached */ }
90							Ok(false) => {
91								// NOT transactional - remove from cache so
92								// the coordinator discovers it as new.
93								self.flow_catalog.remove(flow_id);
94							}
95							Err(e) => {
96								self.flow_catalog.remove(flow_id);
97								warn!(
98									flow_id = flow_id.0,
99									error = %e,
100									"failed to register transactional flow"
101								);
102							}
103						}
104					}
105					Ok((_, false)) => {
106						// Already cached - nothing to do.
107					}
108					Err(e) => {
109						warn!(
110							flow_id = flow_id.0,
111							error = %e,
112							"failed to load flow for transactional check"
113						);
114					}
115				}
116			}
117		}
118
119		// Forward CDC batch to the deferred coordinator.
120		// Transactional flows will have is_new=false in the coordinator's
121		// get_or_load_flow call (shared cache), so they are skipped automatically.
122		self.coordinator.consume(cdcs, reply);
123	}
124}
125
126/// Flow subsystem - single-threaded flow processing.
127pub struct FlowSubsystem {
128	consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
129	worker_handles: Vec<FlowHandle>,
130	pool_handle: Option<FlowPoolHandle>,
131	coordinator_handle: Option<FlowCoordinatorHandle>,
132	transactional_flow_engine: Arc<RwLock<FlowEngine>>,
133	running: bool,
134}
135
136impl FlowSubsystem {
137	pub fn new(config: FlowConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
138		Self::maybe_load_ffi_operators(&config, &engine);
139
140		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
141		let clock = runtime.clock().clone();
142		let custom_operators = Arc::new(config.custom_operators);
143		let primitive_tracker = Arc::new(ShapeVersionTracker::new());
144		let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
145
146		let actor_system = engine.actor_system();
147		let num_workers = actor_system.pools().system_thread_count();
148		info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
149
150		// Shared flow catalog: clones share the same cache so the dispatcher,
151		// coordinator, and workers see the same flow-cache state.
152		let flow_catalog = FlowCatalog::new(engine.catalog());
153
154		let (worker_refs, worker_handles) =
155			Self::spawn_flow_workers(num_workers, &engine, &flow_catalog, &clock, &custom_operators);
156
157		let pool_handle = actor_system.spawn("flow-pool", PoolActor::new(worker_refs, clock.clone()));
158		let pool_ref = pool_handle.actor_ref().clone();
159
160		let coordinator_handle = actor_system.spawn(
161			"flow-coordinator",
162			CoordinatorActor::new(
163				engine.clone(),
164				flow_catalog.clone(),
165				pool_ref,
166				primitive_tracker.clone(),
167				cdc_store.clone(),
168				num_workers,
169				clock.clone(),
170			),
171		);
172		let consume_ref = FlowConsumeRef {
173			actor_ref: coordinator_handle.actor_ref().clone(),
174		};
175
176		let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
177			engine.catalog(),
178			engine.executor(),
179			engine.event_bus().clone(),
180			RuntimeContext::with_clock(clock.clone()),
181			custom_operators.clone(),
182		)));
183
184		let registrar = TransactionalFlowRegistry {
185			flow_engine: transactional_flow_engine.clone(),
186			engine: engine.clone(),
187			catalog: engine.catalog(),
188		};
189
190		Self::register_flow_interceptors(&engine, &transactional_flow_engine, &clock, &custom_operators);
191
192		ioc.register_service::<FlowWatermarkSampler>(FlowWatermarkSampler::new({
193			let tracker = primitive_tracker.clone();
194			let engine = engine.clone();
195			let flow_catalog = flow_catalog.clone();
196			move || compute_flow_watermarks(&tracker, &engine, &flow_catalog)
197		}));
198
199		let poll_config = PollConsumerConfig::new(
200			CdcConsumerId::new("flow-coordinator"),
201			"flow-cdc-poll",
202			Duration::from_millis(10),
203			Some(100),
204		);
205		let dispatcher = FlowConsumeDispatcher {
206			coordinator: consume_ref,
207			registrar,
208			flow_catalog,
209			engine: engine.clone(),
210		};
211		let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
212
213		Self {
214			consumer,
215			worker_handles,
216			pool_handle: Some(pool_handle),
217			coordinator_handle: Some(coordinator_handle),
218			transactional_flow_engine,
219			running: false,
220		}
221	}
222
223	#[inline]
224	fn maybe_load_ffi_operators(config: &FlowConfig, engine: &StandardEngine) {
225		#[cfg(reifydb_target = "native")]
226		if let Some(ref operators_dir) = config.operators_dir {
227			let event_bus = engine.event_bus();
228			if let Err(e) = load_ffi_operators(operators_dir, event_bus) {
229				panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
230			}
231			event_bus.wait_for_completion();
232		}
233		#[cfg(not(reifydb_target = "native"))]
234		{
235			let _ = (config, engine);
236		}
237	}
238
239	#[inline]
240	fn spawn_flow_workers(
241		num_workers: usize,
242		engine: &StandardEngine,
243		flow_catalog: &FlowCatalog,
244		clock: &Clock,
245		custom_operators: &Arc<HashMap<String, OperatorFactory>>,
246	) -> (Vec<ActorRef<FlowMessage>>, Vec<FlowHandle>) {
247		let actor_system = engine.actor_system();
248		let mut worker_refs = Vec::with_capacity(num_workers);
249		let mut worker_handles = Vec::with_capacity(num_workers);
250
251		for i in 0..num_workers {
252			let cat = engine.catalog();
253			let exec = engine.executor();
254			let bus = engine.event_bus().clone();
255			let rc = RuntimeContext::with_clock(clock.clone());
256			let co = custom_operators.clone();
257			let worker_factory = move || FlowEngine::new(cat, exec, bus, rc, co);
258
259			let worker = FlowWorkerActor::new(
260				worker_factory,
261				engine.clone(),
262				engine.catalog(),
263				flow_catalog.clone(),
264			);
265			let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
266			worker_refs.push(handle.actor_ref().clone());
267			worker_handles.push(handle);
268		}
269
270		(worker_refs, worker_handles)
271	}
272
273	#[inline]
274	fn register_flow_interceptors(
275		engine: &StandardEngine,
276		transactional_flow_engine: &Arc<RwLock<FlowEngine>>,
277		clock: &Clock,
278		custom_operators: &Arc<HashMap<String, OperatorFactory>>,
279	) {
280		let flow_engine_for_pre = transactional_flow_engine.clone();
281		let engine_for_pre = engine.clone();
282		let catalog_for_pre = engine.catalog();
283
284		let flow_engine_for_post = transactional_flow_engine.clone();
285		let engine_for_post = engine.clone();
286		let catalog_for_post = engine.catalog();
287
288		let test_flow_engine = transactional_flow_engine.clone();
289		let test_engine = engine.clone();
290		let test_catalog = engine.catalog();
291		let test_event_bus = engine.event_bus().clone();
292		let test_runtime_context = RuntimeContext::with_clock(clock.clone());
293		let test_custom_operators = custom_operators.clone();
294
295		engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
296			interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
297				flow_engine: flow_engine_for_pre.clone(),
298				engine: engine_for_pre.clone(),
299				catalog: catalog_for_pre.clone(),
300			}));
301			interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
302				registrar: TransactionalFlowRegistry {
303					flow_engine: flow_engine_for_post.clone(),
304					engine: engine_for_post.clone(),
305					catalog: catalog_for_post.clone(),
306				},
307			}));
308
309			// test_pre_commit rebuilds the shared transactional flow engine from
310			// all catalog flows (including uncommitted ones visible through the
311			// admin transaction) so capture_testing_pre_commit can process flows
312			// for views that haven't been committed yet.
313			let hook_flow_engine = test_flow_engine.clone();
314			let hook_engine = test_engine.clone();
315			let hook_catalog = test_catalog.clone();
316			let hook_event_bus = test_event_bus.clone();
317			let hook_runtime_context = test_runtime_context.clone();
318			let hook_custom_operators = test_custom_operators.clone();
319
320			interceptors.set_test_pre_commit(Arc::new(move |test_txn: &mut TestTransaction<'_>| {
321				let mut fresh_engine = FlowEngine::new(
322					hook_catalog.clone(),
323					hook_engine.executor(),
324					hook_event_bus.clone(),
325					hook_runtime_context.clone(),
326					hook_custom_operators.clone(),
327				);
328
329				let flows = hook_catalog
330					.list_flows_all(&mut Transaction::Test(Box::new(test_txn.reborrow())))?;
331
332				for flow in flows {
333					let dag = load_flow_dag(
334						&hook_catalog,
335						&mut Transaction::Test(Box::new(test_txn.reborrow())),
336						flow.id,
337					)?;
338					fresh_engine.register_with_transaction(
339						&mut Transaction::Test(Box::new(test_txn.reborrow())),
340						dag,
341					)?;
342				}
343
344				*hook_flow_engine.write().unwrap() = fresh_engine;
345				Ok(())
346			}));
347		}));
348	}
349}
350
351impl Subsystem for FlowSubsystem {
352	fn name(&self) -> &'static str {
353		"sub-flow"
354	}
355
356	fn start(&mut self) -> Result<()> {
357		if self.running {
358			return Ok(());
359		}
360
361		self.consumer.start()?;
362		self.running = true;
363		Ok(())
364	}
365
366	fn shutdown(&mut self) -> Result<()> {
367		if !self.running {
368			return Ok(());
369		}
370
371		self.consumer.stop()?;
372
373		if let Some(handle) = self.coordinator_handle.take() {
374			let _ = handle.join();
375		}
376
377		if let Some(handle) = self.pool_handle.take() {
378			let _ = handle.join();
379		}
380
381		for handle in self.worker_handles.drain(..) {
382			let _ = handle.join();
383		}
384
385		// Clear the transactional flow engine to drop all Arc<Operators>,
386		// which triggers FFI operator cleanup and frees LRU caches.
387		if let Ok(mut engine) = self.transactional_flow_engine.write() {
388			engine.clear();
389		}
390
391		self.running = false;
392		Ok(())
393	}
394
395	fn is_running(&self) -> bool {
396		self.running
397	}
398
399	fn health_status(&self) -> HealthStatus {
400		if self.is_running() {
401			HealthStatus::Healthy
402		} else {
403			HealthStatus::Unknown
404		}
405	}
406
407	fn as_any(&self) -> &dyn Any {
408		self
409	}
410
411	fn as_any_mut(&mut self) -> &mut dyn Any {
412		self
413	}
414}
415
416impl HasVersion for FlowSubsystem {
417	fn version(&self) -> SystemVersion {
418		SystemVersion {
419			name: env!("CARGO_PKG_NAME")
420				.strip_prefix("reifydb-")
421				.unwrap_or(env!("CARGO_PKG_NAME"))
422				.to_string(),
423			version: env!("CARGO_PKG_VERSION").to_string(),
424			description: "Data flow and stream processing subsystem".to_string(),
425			r#type: ComponentType::Subsystem,
426		}
427	}
428}
429
430impl Drop for FlowSubsystem {
431	fn drop(&mut self) {
432		if self.running {
433			let _ = self.shutdown();
434		}
435	}
436}