1pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9 any::Any,
10 collections::HashMap,
11 sync::{Arc, RwLock},
12 time::Duration,
13};
14
15#[cfg(reifydb_target = "native")]
16use ffi::load_ffi_operators;
17use reifydb_cdc::{
18 consume::{
19 consumer::{CdcConsume, CdcConsumer},
20 poll::{PollConsumer, PollConsumerConfig},
21 },
22 storage::CdcStore,
23};
24use reifydb_core::{
25 actors::flow::{FlowCoordinatorHandle, FlowHandle, FlowMessage, FlowPoolHandle},
26 interface::{
27 WithEventBus,
28 cdc::{Cdc, CdcConsumerId},
29 flow::FlowWatermarkSampler,
30 version::{ComponentType, HasVersion, SystemVersion},
31 },
32 util::ioc::IocContainer,
33};
34use reifydb_engine::engine::StandardEngine;
35use reifydb_rql::flow::loader::load_flow_dag;
36use reifydb_runtime::{
37 SharedRuntime,
38 actor::mailbox::ActorRef,
39 context::{RuntimeContext, clock::Clock},
40};
41use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
42use reifydb_transaction::{
43 interceptor::interceptors::Interceptors,
44 transaction::{TestTransaction, Transaction},
45};
46use reifydb_type::{Result, value::identity::IdentityId};
47use tracing::{info, warn};
48
49use crate::{
50 builder::{FlowConfig, OperatorFactory},
51 catalog::FlowCatalog,
52 deferred::{
53 coordinator::{CoordinatorActor, FlowConsumeRef, extract_new_flow_ids},
54 pool::PoolActor,
55 tracker::ShapeVersionTracker,
56 watermark::compute_flow_watermarks,
57 worker::FlowWorkerActor,
58 },
59 engine::FlowEngine,
60 transactional::{
61 interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
62 registry::TransactionalFlowRegistry,
63 },
64};
65
66struct FlowConsumeDispatcher {
69 coordinator: FlowConsumeRef,
70 registrar: TransactionalFlowRegistry,
71 flow_catalog: FlowCatalog,
72 engine: StandardEngine,
73}
74
75impl CdcConsume for FlowConsumeDispatcher {
76 fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
77 let new_flow_ids = extract_new_flow_ids(&cdcs);
79 if !new_flow_ids.is_empty()
80 && let Ok(mut query) = self.engine.begin_query(IdentityId::system())
81 {
82 for flow_id in new_flow_ids {
83 match self.flow_catalog.get_or_load_flow(&mut Transaction::Query(&mut query), flow_id) {
84 Ok((flow, true)) => {
85 match self.registrar.try_register(flow) {
89 Ok(true) => { }
90 Ok(false) => {
91 self.flow_catalog.remove(flow_id);
94 }
95 Err(e) => {
96 self.flow_catalog.remove(flow_id);
97 warn!(
98 flow_id = flow_id.0,
99 error = %e,
100 "failed to register transactional flow"
101 );
102 }
103 }
104 }
105 Ok((_, false)) => {
106 }
108 Err(e) => {
109 warn!(
110 flow_id = flow_id.0,
111 error = %e,
112 "failed to load flow for transactional check"
113 );
114 }
115 }
116 }
117 }
118
119 self.coordinator.consume(cdcs, reply);
123 }
124}
125
126pub struct FlowSubsystem {
128 consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
129 worker_handles: Vec<FlowHandle>,
130 pool_handle: Option<FlowPoolHandle>,
131 coordinator_handle: Option<FlowCoordinatorHandle>,
132 transactional_flow_engine: Arc<RwLock<FlowEngine>>,
133 running: bool,
134}
135
136impl FlowSubsystem {
137 pub fn new(config: FlowConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
138 Self::maybe_load_ffi_operators(&config, &engine);
139
140 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
141 let clock = runtime.clock().clone();
142 let custom_operators = Arc::new(config.custom_operators);
143 let primitive_tracker = Arc::new(ShapeVersionTracker::new());
144 let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
145
146 let actor_system = engine.actor_system();
147 let num_workers = actor_system.pools().system_thread_count();
148 info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
149
150 let flow_catalog = FlowCatalog::new(engine.catalog());
153
154 let (worker_refs, worker_handles) =
155 Self::spawn_flow_workers(num_workers, &engine, &flow_catalog, &clock, &custom_operators);
156
157 let pool_handle = actor_system.spawn("flow-pool", PoolActor::new(worker_refs, clock.clone()));
158 let pool_ref = pool_handle.actor_ref().clone();
159
160 let coordinator_handle = actor_system.spawn(
161 "flow-coordinator",
162 CoordinatorActor::new(
163 engine.clone(),
164 flow_catalog.clone(),
165 pool_ref,
166 primitive_tracker.clone(),
167 cdc_store.clone(),
168 num_workers,
169 clock.clone(),
170 ),
171 );
172 let consume_ref = FlowConsumeRef {
173 actor_ref: coordinator_handle.actor_ref().clone(),
174 };
175
176 let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
177 engine.catalog(),
178 engine.executor(),
179 engine.event_bus().clone(),
180 RuntimeContext::with_clock(clock.clone()),
181 custom_operators.clone(),
182 )));
183
184 let registrar = TransactionalFlowRegistry {
185 flow_engine: transactional_flow_engine.clone(),
186 engine: engine.clone(),
187 catalog: engine.catalog(),
188 };
189
190 Self::register_flow_interceptors(&engine, &transactional_flow_engine, &clock, &custom_operators);
191
192 ioc.register_service::<FlowWatermarkSampler>(FlowWatermarkSampler::new({
193 let tracker = primitive_tracker.clone();
194 let engine = engine.clone();
195 let flow_catalog = flow_catalog.clone();
196 move || compute_flow_watermarks(&tracker, &engine, &flow_catalog)
197 }));
198
199 let poll_config = PollConsumerConfig::new(
200 CdcConsumerId::new("flow-coordinator"),
201 "flow-cdc-poll",
202 Duration::from_millis(10),
203 Some(100),
204 );
205 let dispatcher = FlowConsumeDispatcher {
206 coordinator: consume_ref,
207 registrar,
208 flow_catalog,
209 engine: engine.clone(),
210 };
211 let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
212
213 Self {
214 consumer,
215 worker_handles,
216 pool_handle: Some(pool_handle),
217 coordinator_handle: Some(coordinator_handle),
218 transactional_flow_engine,
219 running: false,
220 }
221 }
222
223 #[inline]
224 fn maybe_load_ffi_operators(config: &FlowConfig, engine: &StandardEngine) {
225 #[cfg(reifydb_target = "native")]
226 if let Some(ref operators_dir) = config.operators_dir {
227 let event_bus = engine.event_bus();
228 if let Err(e) = load_ffi_operators(operators_dir, event_bus) {
229 panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
230 }
231 event_bus.wait_for_completion();
232 }
233 #[cfg(not(reifydb_target = "native"))]
234 {
235 let _ = (config, engine);
236 }
237 }
238
239 #[inline]
240 fn spawn_flow_workers(
241 num_workers: usize,
242 engine: &StandardEngine,
243 flow_catalog: &FlowCatalog,
244 clock: &Clock,
245 custom_operators: &Arc<HashMap<String, OperatorFactory>>,
246 ) -> (Vec<ActorRef<FlowMessage>>, Vec<FlowHandle>) {
247 let actor_system = engine.actor_system();
248 let mut worker_refs = Vec::with_capacity(num_workers);
249 let mut worker_handles = Vec::with_capacity(num_workers);
250
251 for i in 0..num_workers {
252 let cat = engine.catalog();
253 let exec = engine.executor();
254 let bus = engine.event_bus().clone();
255 let rc = RuntimeContext::with_clock(clock.clone());
256 let co = custom_operators.clone();
257 let worker_factory = move || FlowEngine::new(cat, exec, bus, rc, co);
258
259 let worker = FlowWorkerActor::new(
260 worker_factory,
261 engine.clone(),
262 engine.catalog(),
263 flow_catalog.clone(),
264 );
265 let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
266 worker_refs.push(handle.actor_ref().clone());
267 worker_handles.push(handle);
268 }
269
270 (worker_refs, worker_handles)
271 }
272
273 #[inline]
274 fn register_flow_interceptors(
275 engine: &StandardEngine,
276 transactional_flow_engine: &Arc<RwLock<FlowEngine>>,
277 clock: &Clock,
278 custom_operators: &Arc<HashMap<String, OperatorFactory>>,
279 ) {
280 let flow_engine_for_pre = transactional_flow_engine.clone();
281 let engine_for_pre = engine.clone();
282 let catalog_for_pre = engine.catalog();
283
284 let flow_engine_for_post = transactional_flow_engine.clone();
285 let engine_for_post = engine.clone();
286 let catalog_for_post = engine.catalog();
287
288 let test_flow_engine = transactional_flow_engine.clone();
289 let test_engine = engine.clone();
290 let test_catalog = engine.catalog();
291 let test_event_bus = engine.event_bus().clone();
292 let test_runtime_context = RuntimeContext::with_clock(clock.clone());
293 let test_custom_operators = custom_operators.clone();
294
295 engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
296 interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
297 flow_engine: flow_engine_for_pre.clone(),
298 engine: engine_for_pre.clone(),
299 catalog: catalog_for_pre.clone(),
300 }));
301 interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
302 registrar: TransactionalFlowRegistry {
303 flow_engine: flow_engine_for_post.clone(),
304 engine: engine_for_post.clone(),
305 catalog: catalog_for_post.clone(),
306 },
307 }));
308
309 let hook_flow_engine = test_flow_engine.clone();
314 let hook_engine = test_engine.clone();
315 let hook_catalog = test_catalog.clone();
316 let hook_event_bus = test_event_bus.clone();
317 let hook_runtime_context = test_runtime_context.clone();
318 let hook_custom_operators = test_custom_operators.clone();
319
320 interceptors.set_test_pre_commit(Arc::new(move |test_txn: &mut TestTransaction<'_>| {
321 let mut fresh_engine = FlowEngine::new(
322 hook_catalog.clone(),
323 hook_engine.executor(),
324 hook_event_bus.clone(),
325 hook_runtime_context.clone(),
326 hook_custom_operators.clone(),
327 );
328
329 let flows = hook_catalog
330 .list_flows_all(&mut Transaction::Test(Box::new(test_txn.reborrow())))?;
331
332 for flow in flows {
333 let dag = load_flow_dag(
334 &hook_catalog,
335 &mut Transaction::Test(Box::new(test_txn.reborrow())),
336 flow.id,
337 )?;
338 fresh_engine.register_with_transaction(
339 &mut Transaction::Test(Box::new(test_txn.reborrow())),
340 dag,
341 )?;
342 }
343
344 *hook_flow_engine.write().unwrap() = fresh_engine;
345 Ok(())
346 }));
347 }));
348 }
349}
350
351impl Subsystem for FlowSubsystem {
352 fn name(&self) -> &'static str {
353 "sub-flow"
354 }
355
356 fn start(&mut self) -> Result<()> {
357 if self.running {
358 return Ok(());
359 }
360
361 self.consumer.start()?;
362 self.running = true;
363 Ok(())
364 }
365
366 fn shutdown(&mut self) -> Result<()> {
367 if !self.running {
368 return Ok(());
369 }
370
371 self.consumer.stop()?;
372
373 if let Some(handle) = self.coordinator_handle.take() {
374 let _ = handle.join();
375 }
376
377 if let Some(handle) = self.pool_handle.take() {
378 let _ = handle.join();
379 }
380
381 for handle in self.worker_handles.drain(..) {
382 let _ = handle.join();
383 }
384
385 if let Ok(mut engine) = self.transactional_flow_engine.write() {
388 engine.clear();
389 }
390
391 self.running = false;
392 Ok(())
393 }
394
395 fn is_running(&self) -> bool {
396 self.running
397 }
398
399 fn health_status(&self) -> HealthStatus {
400 if self.is_running() {
401 HealthStatus::Healthy
402 } else {
403 HealthStatus::Unknown
404 }
405 }
406
407 fn as_any(&self) -> &dyn Any {
408 self
409 }
410
411 fn as_any_mut(&mut self) -> &mut dyn Any {
412 self
413 }
414}
415
416impl HasVersion for FlowSubsystem {
417 fn version(&self) -> SystemVersion {
418 SystemVersion {
419 name: env!("CARGO_PKG_NAME")
420 .strip_prefix("reifydb-")
421 .unwrap_or(env!("CARGO_PKG_NAME"))
422 .to_string(),
423 version: env!("CARGO_PKG_VERSION").to_string(),
424 description: "Data flow and stream processing subsystem".to_string(),
425 r#type: ComponentType::Subsystem,
426 }
427 }
428}
429
430impl Drop for FlowSubsystem {
431 fn drop(&mut self) {
432 if self.running {
433 let _ = self.shutdown();
434 }
435 }
436}