Skip to main content

reifydb_sub_flow/subsystem/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9	any::Any,
10	sync::{Arc, RwLock},
11	time::Duration,
12};
13
14#[cfg(reifydb_target = "native")]
15use ffi::load_ffi_operators;
16use reifydb_cdc::{
17	consume::{
18		consumer::{CdcConsume, CdcConsumer},
19		poll::{PollConsumer, PollConsumerConfig},
20	},
21	storage::CdcStore,
22};
23use reifydb_core::{
24	interface::{
25		WithEventBus,
26		cdc::{Cdc, CdcConsumerId},
27		flow::FlowLagsProvider,
28		version::{ComponentType, HasVersion, SystemVersion},
29	},
30	key::{EncodableKey, cdc_consumer::CdcConsumerKey},
31	util::ioc::IocContainer,
32};
33use reifydb_engine::engine::StandardEngine;
34use reifydb_runtime::{SharedRuntime, actor::system::ActorHandle};
35use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
36use reifydb_transaction::{
37	interceptor::interceptors::Interceptors, testing::TestingViewMutationCaptor, transaction::Transaction,
38};
39use reifydb_type::Result;
40use tracing::{info, warn};
41
42use crate::{
43	builder::FlowBuilderConfig,
44	catalog::FlowCatalog,
45	deferred::{
46		coordinator::{CoordinatorActor, CoordinatorMsg, FlowConsumeRef, extract_new_flow_ids},
47		lag::FlowLags,
48		pool::{PoolActor, PoolMsg},
49		tracker::PrimitiveVersionTracker,
50		worker::{FlowMsg, FlowWorkerActor},
51	},
52	engine::FlowEngine,
53	testing::ViewInlineTestingMutationCapture,
54	transactional::{
55		interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
56		registrar::TransactionalFlowRegistrar,
57	},
58};
59
60/// Thin wrapper around the deferred coordinator that intercepts new flows
61/// and registers transactional ones before forwarding to the coordinator.
62struct FlowConsumeDispatcher {
63	coordinator: FlowConsumeRef,
64	registrar: TransactionalFlowRegistrar,
65	flow_catalog: FlowCatalog,
66	engine: StandardEngine,
67}
68
69impl CdcConsume for FlowConsumeDispatcher {
70	fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
71		// Check for newly-created flows that might be transactional views.
72		let new_flow_ids = extract_new_flow_ids(&cdcs);
73		if !new_flow_ids.is_empty() {
74			if let Ok(mut query) = self.engine.begin_query() {
75				for flow_id in new_flow_ids {
76					match self
77						.flow_catalog
78						.get_or_load_flow(&mut Transaction::Query(&mut query), flow_id)
79					{
80						Ok((flow, true)) => {
81							// Newly-loaded flow: try to register as transactional.
82							// If transactional, FlowCatalog now caches it so the
83							// coordinator's get_or_load_flow sees is_new=false.
84							match self.registrar.try_register(flow) {
85								Ok(true) => { /* transactional, leave cached */ }
86								Ok(false) => {
87									// NOT transactional — remove from cache so
88									// the coordinator discovers it as new.
89									self.flow_catalog.remove(flow_id);
90								}
91								Err(e) => {
92									self.flow_catalog.remove(flow_id);
93									warn!(
94										flow_id = flow_id.0,
95										error = %e,
96										"failed to register transactional flow"
97									);
98								}
99							}
100						}
101						Ok((_, false)) => {
102							// Already cached — nothing to do.
103						}
104						Err(e) => {
105							warn!(
106								flow_id = flow_id.0,
107								error = %e,
108								"failed to load flow for transactional check"
109							);
110						}
111					}
112				}
113			}
114		}
115
116		// Forward CDC batch to the deferred coordinator.
117		// Transactional flows will have is_new=false in the coordinator's
118		// get_or_load_flow call (shared cache), so they are skipped automatically.
119		self.coordinator.consume(cdcs, reply);
120	}
121}
122
123/// Flow subsystem - single-threaded flow processing.
124pub struct FlowSubsystem {
125	consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
126	worker_handles: Vec<ActorHandle<FlowMsg>>,
127	pool_handle: Option<ActorHandle<PoolMsg>>,
128	coordinator_handle: Option<ActorHandle<CoordinatorMsg>>,
129	transactional_flow_engine: Arc<RwLock<FlowEngine>>,
130	running: bool,
131}
132
133impl FlowSubsystem {
134	/// Create a new flow subsystem.
135	pub fn new(config: FlowBuilderConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
136		let catalog = engine.catalog();
137		let executor = engine.executor();
138		let event_bus = engine.event_bus().clone();
139
140		#[cfg(reifydb_target = "native")]
141		if let Some(ref operators_dir) = config.operators_dir {
142			if let Err(e) = load_ffi_operators(operators_dir, &event_bus) {
143				panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
144			}
145			event_bus.wait_for_completion();
146		}
147
148		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
149		let clock = runtime.clock().clone();
150		let clock_for_factory = clock.clone();
151
152		let custom_operators = Arc::new(config.custom_operators);
153		let custom_operators_for_factory = custom_operators.clone();
154
155		let factory_builder = move || {
156			let cat = catalog.clone();
157			let exec = executor.clone();
158			let bus = event_bus.clone();
159			let clk = clock_for_factory.clone();
160			let co = custom_operators_for_factory.clone();
161
162			move || FlowEngine::new(cat, exec, bus, clk, co)
163		};
164
165		let primitive_tracker = Arc::new(PrimitiveVersionTracker::new());
166
167		let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
168
169		let num_workers = config.num_workers;
170		info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
171
172		let actor_system = engine.actor_system();
173
174		let mut worker_refs = Vec::with_capacity(num_workers);
175		let mut worker_handles = Vec::with_capacity(num_workers);
176		for i in 0..num_workers {
177			let worker_factory = factory_builder();
178			let worker = FlowWorkerActor::new(worker_factory, engine.clone(), engine.catalog());
179			let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
180			worker_refs.push(handle.actor_ref().clone());
181			worker_handles.push(handle);
182		}
183
184		let pool = PoolActor::new(worker_refs, clock.clone());
185		let pool_handle = actor_system.spawn("flow-pool", pool);
186		let pool_ref = pool_handle.actor_ref().clone();
187
188		// Shared flow catalog: clones share the same cache so the dispatcher
189		// and coordinator see the same flow-cache state.
190		let flow_catalog = FlowCatalog::new(engine.catalog());
191
192		let coordinator = CoordinatorActor::new(
193			engine.clone(),
194			flow_catalog.clone(),
195			pool_ref,
196			primitive_tracker.clone(),
197			cdc_store.clone(),
198			num_workers,
199			clock,
200		);
201		let coordinator_handle = actor_system.spawn("flow-coordinator", coordinator);
202		let actor_ref = coordinator_handle.actor_ref().clone();
203
204		let consumer_id = CdcConsumerId::new("flow-coordinator");
205		let consumer_key = CdcConsumerKey {
206			consumer: consumer_id.clone(),
207		}
208		.encode();
209		let consume_ref = FlowConsumeRef {
210			actor_ref,
211			consumer_key,
212		};
213
214		// Transactional flow engine — a separate FlowEngine for transactional views only.
215		let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
216			engine.catalog(),
217			engine.executor(),
218			engine.event_bus().clone(),
219			runtime.clock().clone(),
220			custom_operators.clone(),
221		)));
222
223		// Registrar: detects transactional flows from CDC and registers them.
224		let registrar = TransactionalFlowRegistrar {
225			flow_engine: transactional_flow_engine.clone(),
226			engine: engine.clone(),
227			catalog: engine.catalog(),
228		};
229
230		let transactional_flow_engine_for_self = transactional_flow_engine.clone();
231
232		// Register both pre-commit and post-commit interceptors via a single factory function.
233		{
234			let flow_engine_for_interceptor = transactional_flow_engine.clone();
235			let engine_for_interceptor = engine.clone();
236			let catalog_for_interceptor = engine.catalog();
237			let registrar_for_interceptor = TransactionalFlowRegistrar {
238				flow_engine: transactional_flow_engine,
239				engine: engine.clone(),
240				catalog: engine.catalog(),
241			};
242
243			engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
244				interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
245					flow_engine: flow_engine_for_interceptor.clone(),
246					engine: engine_for_interceptor.clone(),
247					catalog: catalog_for_interceptor.clone(),
248				}));
249				interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
250					registrar: TransactionalFlowRegistrar {
251						flow_engine: registrar_for_interceptor.flow_engine.clone(),
252						engine: registrar_for_interceptor.engine.clone(),
253						catalog: registrar_for_interceptor.catalog.clone(),
254					},
255				}));
256			}));
257		}
258
259		ioc.register_service::<Arc<dyn FlowLagsProvider>>(Arc::new(FlowLags::new(
260			primitive_tracker,
261			engine.clone(),
262			flow_catalog.clone(),
263		)));
264
265		ioc.register_service::<Arc<dyn TestingViewMutationCaptor>>(Arc::new(
266			ViewInlineTestingMutationCapture {
267				engine: engine.clone(),
268				catalog: engine.catalog(),
269				event_bus: engine.event_bus().clone(),
270				clock: runtime.clock().clone(),
271				custom_operators: custom_operators.clone(),
272			},
273		));
274
275		let poll_config =
276			PollConsumerConfig::new(consumer_id, "flow-cdc-poll", Duration::from_millis(10), Some(100));
277
278		// Wrap the coordinator reference in a dispatcher that handles transactional flows.
279		let dispatcher = FlowConsumeDispatcher {
280			coordinator: consume_ref,
281			registrar,
282			flow_catalog: flow_catalog.clone(),
283			engine: engine.clone(),
284		};
285
286		let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
287
288		Self {
289			consumer,
290			worker_handles,
291			pool_handle: Some(pool_handle),
292			coordinator_handle: Some(coordinator_handle),
293			transactional_flow_engine: transactional_flow_engine_for_self,
294			running: false,
295		}
296	}
297}
298
299impl Subsystem for FlowSubsystem {
300	fn name(&self) -> &'static str {
301		"sub-flow"
302	}
303
304	fn start(&mut self) -> Result<()> {
305		if self.running {
306			return Ok(());
307		}
308
309		self.consumer.start()?;
310		self.running = true;
311		Ok(())
312	}
313
314	fn shutdown(&mut self) -> Result<()> {
315		if !self.running {
316			return Ok(());
317		}
318
319		self.consumer.stop()?;
320
321		if let Some(handle) = self.coordinator_handle.take() {
322			let _ = handle.join();
323		}
324
325		if let Some(handle) = self.pool_handle.take() {
326			let _ = handle.join();
327		}
328
329		for handle in self.worker_handles.drain(..) {
330			let _ = handle.join();
331		}
332
333		// Clear the transactional flow engine to drop all Arc<Operators>,
334		// which triggers FFI operator cleanup and frees LRU caches.
335		if let Ok(mut engine) = self.transactional_flow_engine.write() {
336			engine.clear();
337		}
338
339		self.running = false;
340		Ok(())
341	}
342
343	fn is_running(&self) -> bool {
344		self.running
345	}
346
347	fn health_status(&self) -> HealthStatus {
348		if self.is_running() {
349			HealthStatus::Healthy
350		} else {
351			HealthStatus::Unknown
352		}
353	}
354
355	fn as_any(&self) -> &dyn Any {
356		self
357	}
358
359	fn as_any_mut(&mut self) -> &mut dyn Any {
360		self
361	}
362}
363
364impl HasVersion for FlowSubsystem {
365	fn version(&self) -> SystemVersion {
366		SystemVersion {
367			name: env!("CARGO_PKG_NAME")
368				.strip_prefix("reifydb-")
369				.unwrap_or(env!("CARGO_PKG_NAME"))
370				.to_string(),
371			version: env!("CARGO_PKG_VERSION").to_string(),
372			description: "Data flow and stream processing subsystem".to_string(),
373			r#type: ComponentType::Subsystem,
374		}
375	}
376}
377
378impl Drop for FlowSubsystem {
379	fn drop(&mut self) {
380		if self.running {
381			let _ = self.shutdown();
382		}
383	}
384}