reifydb_sub_flow/subsystem/
mod.rs1pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9 any::Any,
10 sync::{Arc, RwLock},
11 time::Duration,
12};
13
14#[cfg(reifydb_target = "native")]
15use ffi::load_ffi_operators;
16use reifydb_cdc::{
17 consume::{
18 consumer::{CdcConsume, CdcConsumer},
19 poll::{PollConsumer, PollConsumerConfig},
20 },
21 storage::CdcStore,
22};
23use reifydb_core::{
24 interface::{
25 WithEventBus,
26 cdc::{Cdc, CdcConsumerId},
27 flow::FlowLagsProvider,
28 version::{ComponentType, HasVersion, SystemVersion},
29 },
30 key::{EncodableKey, cdc_consumer::CdcConsumerKey},
31 util::ioc::IocContainer,
32};
33use reifydb_engine::engine::StandardEngine;
34use reifydb_rql::flow::loader::load_flow_dag;
35use reifydb_runtime::{SharedRuntime, actor::system::ActorHandle, context::RuntimeContext};
36use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
37use reifydb_transaction::{
38 interceptor::interceptors::Interceptors,
39 transaction::{TestTransaction, Transaction},
40};
41use reifydb_type::{Result, value::identity::IdentityId};
42use tracing::{info, warn};
43
44use crate::{
45 builder::FlowBuilderConfig,
46 catalog::FlowCatalog,
47 deferred::{
48 coordinator::{CoordinatorActor, CoordinatorMsg, FlowConsumeRef, extract_new_flow_ids},
49 lag::FlowLags,
50 pool::{PoolActor, PoolMsg},
51 tracker::SchemaVersionTracker,
52 worker::{FlowMsg, FlowWorkerActor},
53 },
54 engine::FlowEngine,
55 transactional::{
56 interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
57 registrar::TransactionalFlowRegistrar,
58 },
59};
60
61struct FlowConsumeDispatcher {
64 coordinator: FlowConsumeRef,
65 registrar: TransactionalFlowRegistrar,
66 flow_catalog: FlowCatalog,
67 engine: StandardEngine,
68}
69
70impl CdcConsume for FlowConsumeDispatcher {
71 fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
72 let new_flow_ids = extract_new_flow_ids(&cdcs);
74 if !new_flow_ids.is_empty() {
75 if let Ok(mut query) = self.engine.begin_query(IdentityId::system()) {
76 for flow_id in new_flow_ids {
77 match self
78 .flow_catalog
79 .get_or_load_flow(&mut Transaction::Query(&mut query), flow_id)
80 {
81 Ok((flow, true)) => {
82 match self.registrar.try_register(flow) {
86 Ok(true) => { }
87 Ok(false) => {
88 self.flow_catalog.remove(flow_id);
91 }
92 Err(e) => {
93 self.flow_catalog.remove(flow_id);
94 warn!(
95 flow_id = flow_id.0,
96 error = %e,
97 "failed to register transactional flow"
98 );
99 }
100 }
101 }
102 Ok((_, false)) => {
103 }
105 Err(e) => {
106 warn!(
107 flow_id = flow_id.0,
108 error = %e,
109 "failed to load flow for transactional check"
110 );
111 }
112 }
113 }
114 }
115 }
116
117 self.coordinator.consume(cdcs, reply);
121 }
122}
123
124pub struct FlowSubsystem {
126 consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
127 worker_handles: Vec<ActorHandle<FlowMsg>>,
128 pool_handle: Option<ActorHandle<PoolMsg>>,
129 coordinator_handle: Option<ActorHandle<CoordinatorMsg>>,
130 transactional_flow_engine: Arc<RwLock<FlowEngine>>,
131 running: bool,
132}
133
134impl FlowSubsystem {
135 pub fn new(config: FlowBuilderConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
137 let catalog = engine.catalog();
138 let executor = engine.executor();
139 let event_bus = engine.event_bus().clone();
140
141 #[cfg(reifydb_target = "native")]
142 if let Some(ref operators_dir) = config.operators_dir {
143 if let Err(e) = load_ffi_operators(operators_dir, &event_bus) {
144 panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
145 }
146 event_bus.wait_for_completion();
147 }
148
149 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
150 let clock = runtime.clock().clone();
151 let runtime_context = RuntimeContext::with_clock(clock.clone());
152 let runtime_context_for_factory = runtime_context.clone();
153
154 let custom_operators = Arc::new(config.custom_operators);
155 let custom_operators_for_factory = custom_operators.clone();
156
157 let factory_builder = move || {
158 let cat = catalog.clone();
159 let exec = executor.clone();
160 let bus = event_bus.clone();
161 let rc = runtime_context_for_factory.clone();
162 let co = custom_operators_for_factory.clone();
163
164 move || FlowEngine::new(cat, exec, bus, rc, co)
165 };
166
167 let primitive_tracker = Arc::new(SchemaVersionTracker::new());
168
169 let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
170
171 let num_workers = config.num_workers;
172 info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
173
174 let actor_system = engine.actor_system();
175
176 let mut worker_refs = Vec::with_capacity(num_workers);
177 let mut worker_handles = Vec::with_capacity(num_workers);
178 for i in 0..num_workers {
179 let worker_factory = factory_builder();
180 let worker = FlowWorkerActor::new(worker_factory, engine.clone(), engine.catalog());
181 let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
182 worker_refs.push(handle.actor_ref().clone());
183 worker_handles.push(handle);
184 }
185
186 let pool = PoolActor::new(worker_refs, clock.clone());
187 let pool_handle = actor_system.spawn("flow-pool", pool);
188 let pool_ref = pool_handle.actor_ref().clone();
189
190 let flow_catalog = FlowCatalog::new(engine.catalog());
193
194 let coordinator = CoordinatorActor::new(
195 engine.clone(),
196 flow_catalog.clone(),
197 pool_ref,
198 primitive_tracker.clone(),
199 cdc_store.clone(),
200 num_workers,
201 clock,
202 );
203 let coordinator_handle = actor_system.spawn("flow-coordinator", coordinator);
204 let actor_ref = coordinator_handle.actor_ref().clone();
205
206 let consumer_id = CdcConsumerId::new("flow-coordinator");
207 let consumer_key = CdcConsumerKey {
208 consumer: consumer_id.clone(),
209 }
210 .encode();
211 let consume_ref = FlowConsumeRef {
212 actor_ref,
213 consumer_key,
214 };
215
216 let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
218 engine.catalog(),
219 engine.executor(),
220 engine.event_bus().clone(),
221 RuntimeContext::with_clock(runtime.clock().clone()),
222 custom_operators.clone(),
223 )));
224
225 let registrar = TransactionalFlowRegistrar {
227 flow_engine: transactional_flow_engine.clone(),
228 engine: engine.clone(),
229 catalog: engine.catalog(),
230 };
231
232 let transactional_flow_engine_for_self = transactional_flow_engine.clone();
233
234 {
236 let flow_engine_for_interceptor = transactional_flow_engine.clone();
237 let engine_for_interceptor = engine.clone();
238 let catalog_for_interceptor = engine.catalog();
239 let registrar_for_interceptor = TransactionalFlowRegistrar {
240 flow_engine: transactional_flow_engine,
241 engine: engine.clone(),
242 catalog: engine.catalog(),
243 };
244
245 let test_flow_engine = flow_engine_for_interceptor.clone();
247 let test_engine = engine_for_interceptor.clone();
248 let test_catalog = catalog_for_interceptor.clone();
249 let test_event_bus = engine.event_bus().clone();
250 let test_runtime_context = RuntimeContext::with_clock(runtime.clock().clone());
251 let test_custom_operators = custom_operators.clone();
252
253 engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
254 interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
255 flow_engine: flow_engine_for_interceptor.clone(),
256 engine: engine_for_interceptor.clone(),
257 catalog: catalog_for_interceptor.clone(),
258 }));
259 interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
260 registrar: TransactionalFlowRegistrar {
261 flow_engine: registrar_for_interceptor.flow_engine.clone(),
262 engine: registrar_for_interceptor.engine.clone(),
263 catalog: registrar_for_interceptor.catalog.clone(),
264 },
265 }));
266
267 let hook_flow_engine = test_flow_engine.clone();
272 let hook_engine = test_engine.clone();
273 let hook_catalog = test_catalog.clone();
274 let hook_event_bus = test_event_bus.clone();
275 let hook_runtime_context = test_runtime_context.clone();
276 let hook_custom_operators = test_custom_operators.clone();
277
278 interceptors.set_test_pre_commit(Arc::new(
279 move |test_txn: &mut TestTransaction<'_>| {
280 let mut fresh_engine = FlowEngine::new(
281 hook_catalog.clone(),
282 hook_engine.executor(),
283 hook_event_bus.clone(),
284 hook_runtime_context.clone(),
285 hook_custom_operators.clone(),
286 );
287
288 let flows = hook_catalog
289 .list_flows_all(&mut Transaction::Test(test_txn.reborrow()))?;
290
291 for flow in flows {
292 let dag = load_flow_dag(
293 &hook_catalog,
294 &mut Transaction::Test(test_txn.reborrow()),
295 flow.id,
296 )?;
297 fresh_engine.register_with_transaction(
298 &mut Transaction::Test(test_txn.reborrow()),
299 dag,
300 )?;
301 }
302
303 *hook_flow_engine.write().unwrap() = fresh_engine;
304 Ok(())
305 },
306 ));
307 }));
308 }
309
310 ioc.register_service::<Arc<dyn FlowLagsProvider>>(Arc::new(FlowLags::new(
311 primitive_tracker,
312 engine.clone(),
313 flow_catalog.clone(),
314 )));
315
316 let poll_config =
317 PollConsumerConfig::new(consumer_id, "flow-cdc-poll", Duration::from_millis(10), Some(100));
318
319 let dispatcher = FlowConsumeDispatcher {
321 coordinator: consume_ref,
322 registrar,
323 flow_catalog: flow_catalog.clone(),
324 engine: engine.clone(),
325 };
326
327 let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
328
329 Self {
330 consumer,
331 worker_handles,
332 pool_handle: Some(pool_handle),
333 coordinator_handle: Some(coordinator_handle),
334 transactional_flow_engine: transactional_flow_engine_for_self,
335 running: false,
336 }
337 }
338}
339
340impl Subsystem for FlowSubsystem {
341 fn name(&self) -> &'static str {
342 "sub-flow"
343 }
344
345 fn start(&mut self) -> Result<()> {
346 if self.running {
347 return Ok(());
348 }
349
350 self.consumer.start()?;
351 self.running = true;
352 Ok(())
353 }
354
355 fn shutdown(&mut self) -> Result<()> {
356 if !self.running {
357 return Ok(());
358 }
359
360 self.consumer.stop()?;
361
362 if let Some(handle) = self.coordinator_handle.take() {
363 let _ = handle.join();
364 }
365
366 if let Some(handle) = self.pool_handle.take() {
367 let _ = handle.join();
368 }
369
370 for handle in self.worker_handles.drain(..) {
371 let _ = handle.join();
372 }
373
374 if let Ok(mut engine) = self.transactional_flow_engine.write() {
377 engine.clear();
378 }
379
380 self.running = false;
381 Ok(())
382 }
383
384 fn is_running(&self) -> bool {
385 self.running
386 }
387
388 fn health_status(&self) -> HealthStatus {
389 if self.is_running() {
390 HealthStatus::Healthy
391 } else {
392 HealthStatus::Unknown
393 }
394 }
395
396 fn as_any(&self) -> &dyn Any {
397 self
398 }
399
400 fn as_any_mut(&mut self) -> &mut dyn Any {
401 self
402 }
403}
404
405impl HasVersion for FlowSubsystem {
406 fn version(&self) -> SystemVersion {
407 SystemVersion {
408 name: env!("CARGO_PKG_NAME")
409 .strip_prefix("reifydb-")
410 .unwrap_or(env!("CARGO_PKG_NAME"))
411 .to_string(),
412 version: env!("CARGO_PKG_VERSION").to_string(),
413 description: "Data flow and stream processing subsystem".to_string(),
414 r#type: ComponentType::Subsystem,
415 }
416 }
417}
418
419impl Drop for FlowSubsystem {
420 fn drop(&mut self) {
421 if self.running {
422 let _ = self.shutdown();
423 }
424 }
425}