Skip to main content

reifydb_sub_flow/subsystem/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9	any::Any,
10	sync::{Arc, RwLock},
11	time::Duration,
12};
13
14#[cfg(reifydb_target = "native")]
15use ffi::load_ffi_operators;
16use reifydb_cdc::{
17	consume::{
18		consumer::{CdcConsume, CdcConsumer},
19		poll::{PollConsumer, PollConsumerConfig},
20	},
21	storage::CdcStore,
22};
23use reifydb_core::{
24	interface::{
25		WithEventBus,
26		cdc::{Cdc, CdcConsumerId},
27		flow::FlowLagsProvider,
28		version::{ComponentType, HasVersion, SystemVersion},
29	},
30	key::{EncodableKey, cdc_consumer::CdcConsumerKey},
31	util::ioc::IocContainer,
32};
33use reifydb_engine::engine::StandardEngine;
34use reifydb_rql::flow::loader::load_flow_dag;
35use reifydb_runtime::{SharedRuntime, actor::system::ActorHandle, context::RuntimeContext};
36use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
37use reifydb_transaction::{
38	interceptor::interceptors::Interceptors,
39	transaction::{TestTransaction, Transaction},
40};
41use reifydb_type::{Result, value::identity::IdentityId};
42use tracing::{info, warn};
43
44use crate::{
45	builder::FlowBuilderConfig,
46	catalog::FlowCatalog,
47	deferred::{
48		coordinator::{CoordinatorActor, CoordinatorMsg, FlowConsumeRef, extract_new_flow_ids},
49		lag::FlowLags,
50		pool::{PoolActor, PoolMsg},
51		tracker::SchemaVersionTracker,
52		worker::{FlowMsg, FlowWorkerActor},
53	},
54	engine::FlowEngine,
55	transactional::{
56		interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
57		registrar::TransactionalFlowRegistrar,
58	},
59};
60
61/// Thin wrapper around the deferred coordinator that intercepts new flows
62/// and registers transactional ones before forwarding to the coordinator.
63struct FlowConsumeDispatcher {
64	coordinator: FlowConsumeRef,
65	registrar: TransactionalFlowRegistrar,
66	flow_catalog: FlowCatalog,
67	engine: StandardEngine,
68}
69
70impl CdcConsume for FlowConsumeDispatcher {
71	fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
72		// Check for newly-created flows that might be transactional views.
73		let new_flow_ids = extract_new_flow_ids(&cdcs);
74		if !new_flow_ids.is_empty() {
75			if let Ok(mut query) = self.engine.begin_query(IdentityId::system()) {
76				for flow_id in new_flow_ids {
77					match self
78						.flow_catalog
79						.get_or_load_flow(&mut Transaction::Query(&mut query), flow_id)
80					{
81						Ok((flow, true)) => {
82							// Newly-loaded flow: try to register as transactional.
83							// If transactional, FlowCatalog now caches it so the
84							// coordinator's get_or_load_flow sees is_new=false.
85							match self.registrar.try_register(flow) {
86								Ok(true) => { /* transactional, leave cached */ }
87								Ok(false) => {
88									// NOT transactional — remove from cache so
89									// the coordinator discovers it as new.
90									self.flow_catalog.remove(flow_id);
91								}
92								Err(e) => {
93									self.flow_catalog.remove(flow_id);
94									warn!(
95										flow_id = flow_id.0,
96										error = %e,
97										"failed to register transactional flow"
98									);
99								}
100							}
101						}
102						Ok((_, false)) => {
103							// Already cached — nothing to do.
104						}
105						Err(e) => {
106							warn!(
107								flow_id = flow_id.0,
108								error = %e,
109								"failed to load flow for transactional check"
110							);
111						}
112					}
113				}
114			}
115		}
116
117		// Forward CDC batch to the deferred coordinator.
118		// Transactional flows will have is_new=false in the coordinator's
119		// get_or_load_flow call (shared cache), so they are skipped automatically.
120		self.coordinator.consume(cdcs, reply);
121	}
122}
123
124/// Flow subsystem - single-threaded flow processing.
125pub struct FlowSubsystem {
126	consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
127	worker_handles: Vec<ActorHandle<FlowMsg>>,
128	pool_handle: Option<ActorHandle<PoolMsg>>,
129	coordinator_handle: Option<ActorHandle<CoordinatorMsg>>,
130	transactional_flow_engine: Arc<RwLock<FlowEngine>>,
131	running: bool,
132}
133
134impl FlowSubsystem {
135	/// Create a new flow subsystem.
136	pub fn new(config: FlowBuilderConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
137		let catalog = engine.catalog();
138		let executor = engine.executor();
139		let event_bus = engine.event_bus().clone();
140
141		#[cfg(reifydb_target = "native")]
142		if let Some(ref operators_dir) = config.operators_dir {
143			if let Err(e) = load_ffi_operators(operators_dir, &event_bus) {
144				panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
145			}
146			event_bus.wait_for_completion();
147		}
148
149		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
150		let clock = runtime.clock().clone();
151		let runtime_context = RuntimeContext::with_clock(clock.clone());
152		let runtime_context_for_factory = runtime_context.clone();
153
154		let custom_operators = Arc::new(config.custom_operators);
155		let custom_operators_for_factory = custom_operators.clone();
156
157		let factory_builder = move || {
158			let cat = catalog.clone();
159			let exec = executor.clone();
160			let bus = event_bus.clone();
161			let rc = runtime_context_for_factory.clone();
162			let co = custom_operators_for_factory.clone();
163
164			move || FlowEngine::new(cat, exec, bus, rc, co)
165		};
166
167		let primitive_tracker = Arc::new(SchemaVersionTracker::new());
168
169		let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
170
171		let num_workers = config.num_workers;
172		info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
173
174		let actor_system = engine.actor_system();
175
176		let mut worker_refs = Vec::with_capacity(num_workers);
177		let mut worker_handles = Vec::with_capacity(num_workers);
178		for i in 0..num_workers {
179			let worker_factory = factory_builder();
180			let worker = FlowWorkerActor::new(worker_factory, engine.clone(), engine.catalog());
181			let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
182			worker_refs.push(handle.actor_ref().clone());
183			worker_handles.push(handle);
184		}
185
186		let pool = PoolActor::new(worker_refs, clock.clone());
187		let pool_handle = actor_system.spawn("flow-pool", pool);
188		let pool_ref = pool_handle.actor_ref().clone();
189
190		// Shared flow catalog: clones share the same cache so the dispatcher
191		// and coordinator see the same flow-cache state.
192		let flow_catalog = FlowCatalog::new(engine.catalog());
193
194		let coordinator = CoordinatorActor::new(
195			engine.clone(),
196			flow_catalog.clone(),
197			pool_ref,
198			primitive_tracker.clone(),
199			cdc_store.clone(),
200			num_workers,
201			clock,
202		);
203		let coordinator_handle = actor_system.spawn("flow-coordinator", coordinator);
204		let actor_ref = coordinator_handle.actor_ref().clone();
205
206		let consumer_id = CdcConsumerId::new("flow-coordinator");
207		let consumer_key = CdcConsumerKey {
208			consumer: consumer_id.clone(),
209		}
210		.encode();
211		let consume_ref = FlowConsumeRef {
212			actor_ref,
213			consumer_key,
214		};
215
216		// Transactional flow engine — a separate FlowEngine for transactional views only.
217		let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
218			engine.catalog(),
219			engine.executor(),
220			engine.event_bus().clone(),
221			RuntimeContext::with_clock(runtime.clock().clone()),
222			custom_operators.clone(),
223		)));
224
225		// Registrar: detects transactional flows from CDC and registers them.
226		let registrar = TransactionalFlowRegistrar {
227			flow_engine: transactional_flow_engine.clone(),
228			engine: engine.clone(),
229			catalog: engine.catalog(),
230		};
231
232		let transactional_flow_engine_for_self = transactional_flow_engine.clone();
233
234		// Register pre-commit, post-commit, and test pre-commit interceptors via a single factory function.
235		{
236			let flow_engine_for_interceptor = transactional_flow_engine.clone();
237			let engine_for_interceptor = engine.clone();
238			let catalog_for_interceptor = engine.catalog();
239			let registrar_for_interceptor = TransactionalFlowRegistrar {
240				flow_engine: transactional_flow_engine,
241				engine: engine.clone(),
242				catalog: engine.catalog(),
243			};
244
245			// Captures for the test_pre_commit hook.
246			let test_flow_engine = flow_engine_for_interceptor.clone();
247			let test_engine = engine_for_interceptor.clone();
248			let test_catalog = catalog_for_interceptor.clone();
249			let test_event_bus = engine.event_bus().clone();
250			let test_runtime_context = RuntimeContext::with_clock(runtime.clock().clone());
251			let test_custom_operators = custom_operators.clone();
252
253			engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
254				interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
255					flow_engine: flow_engine_for_interceptor.clone(),
256					engine: engine_for_interceptor.clone(),
257					catalog: catalog_for_interceptor.clone(),
258				}));
259				interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
260					registrar: TransactionalFlowRegistrar {
261						flow_engine: registrar_for_interceptor.flow_engine.clone(),
262						engine: registrar_for_interceptor.engine.clone(),
263						catalog: registrar_for_interceptor.catalog.clone(),
264					},
265				}));
266
267				// Hook for test flow processing: rebuilds the shared transactional flow
268				// engine from all catalog flows (including uncommitted ones visible through
269				// the admin transaction), so that capture_testing_pre_commit can process
270				// flows for views that haven't been committed yet.
271				let hook_flow_engine = test_flow_engine.clone();
272				let hook_engine = test_engine.clone();
273				let hook_catalog = test_catalog.clone();
274				let hook_event_bus = test_event_bus.clone();
275				let hook_runtime_context = test_runtime_context.clone();
276				let hook_custom_operators = test_custom_operators.clone();
277
278				interceptors.set_test_pre_commit(Arc::new(
279					move |test_txn: &mut TestTransaction<'_>| {
280						let mut fresh_engine = FlowEngine::new(
281							hook_catalog.clone(),
282							hook_engine.executor(),
283							hook_event_bus.clone(),
284							hook_runtime_context.clone(),
285							hook_custom_operators.clone(),
286						);
287
288						let flows = hook_catalog
289							.list_flows_all(&mut Transaction::Test(test_txn.reborrow()))?;
290
291						for flow in flows {
292							let dag = load_flow_dag(
293								&hook_catalog,
294								&mut Transaction::Test(test_txn.reborrow()),
295								flow.id,
296							)?;
297							fresh_engine.register_with_transaction(
298								&mut Transaction::Test(test_txn.reborrow()),
299								dag,
300							)?;
301						}
302
303						*hook_flow_engine.write().unwrap() = fresh_engine;
304						Ok(())
305					},
306				));
307			}));
308		}
309
310		ioc.register_service::<Arc<dyn FlowLagsProvider>>(Arc::new(FlowLags::new(
311			primitive_tracker,
312			engine.clone(),
313			flow_catalog.clone(),
314		)));
315
316		let poll_config =
317			PollConsumerConfig::new(consumer_id, "flow-cdc-poll", Duration::from_millis(10), Some(100));
318
319		// Wrap the coordinator reference in a dispatcher that handles transactional flows.
320		let dispatcher = FlowConsumeDispatcher {
321			coordinator: consume_ref,
322			registrar,
323			flow_catalog: flow_catalog.clone(),
324			engine: engine.clone(),
325		};
326
327		let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
328
329		Self {
330			consumer,
331			worker_handles,
332			pool_handle: Some(pool_handle),
333			coordinator_handle: Some(coordinator_handle),
334			transactional_flow_engine: transactional_flow_engine_for_self,
335			running: false,
336		}
337	}
338}
339
340impl Subsystem for FlowSubsystem {
341	fn name(&self) -> &'static str {
342		"sub-flow"
343	}
344
345	fn start(&mut self) -> Result<()> {
346		if self.running {
347			return Ok(());
348		}
349
350		self.consumer.start()?;
351		self.running = true;
352		Ok(())
353	}
354
355	fn shutdown(&mut self) -> Result<()> {
356		if !self.running {
357			return Ok(());
358		}
359
360		self.consumer.stop()?;
361
362		if let Some(handle) = self.coordinator_handle.take() {
363			let _ = handle.join();
364		}
365
366		if let Some(handle) = self.pool_handle.take() {
367			let _ = handle.join();
368		}
369
370		for handle in self.worker_handles.drain(..) {
371			let _ = handle.join();
372		}
373
374		// Clear the transactional flow engine to drop all Arc<Operators>,
375		// which triggers FFI operator cleanup and frees LRU caches.
376		if let Ok(mut engine) = self.transactional_flow_engine.write() {
377			engine.clear();
378		}
379
380		self.running = false;
381		Ok(())
382	}
383
384	fn is_running(&self) -> bool {
385		self.running
386	}
387
388	fn health_status(&self) -> HealthStatus {
389		if self.is_running() {
390			HealthStatus::Healthy
391		} else {
392			HealthStatus::Unknown
393		}
394	}
395
396	fn as_any(&self) -> &dyn Any {
397		self
398	}
399
400	fn as_any_mut(&mut self) -> &mut dyn Any {
401		self
402	}
403}
404
405impl HasVersion for FlowSubsystem {
406	fn version(&self) -> SystemVersion {
407		SystemVersion {
408			name: env!("CARGO_PKG_NAME")
409				.strip_prefix("reifydb-")
410				.unwrap_or(env!("CARGO_PKG_NAME"))
411				.to_string(),
412			version: env!("CARGO_PKG_VERSION").to_string(),
413			description: "Data flow and stream processing subsystem".to_string(),
414			r#type: ComponentType::Subsystem,
415		}
416	}
417}
418
419impl Drop for FlowSubsystem {
420	fn drop(&mut self) {
421		if self.running {
422			let _ = self.shutdown();
423		}
424	}
425}