reifydb_sub_flow/subsystem/
mod.rs1pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9 any::Any,
10 sync::{Arc, RwLock},
11 time::Duration,
12};
13
14#[cfg(reifydb_target = "native")]
15use ffi::load_ffi_operators;
16use reifydb_cdc::{
17 consume::{
18 consumer::{CdcConsume, CdcConsumer},
19 poll::{PollConsumer, PollConsumerConfig},
20 },
21 storage::CdcStore,
22};
23use reifydb_core::{
24 interface::{
25 WithEventBus,
26 cdc::{Cdc, CdcConsumerId},
27 flow::FlowLagsProvider,
28 version::{ComponentType, HasVersion, SystemVersion},
29 },
30 key::{EncodableKey, cdc_consumer::CdcConsumerKey},
31 util::ioc::IocContainer,
32};
33use reifydb_engine::engine::StandardEngine;
34use reifydb_runtime::{SharedRuntime, actor::system::ActorHandle};
35use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
36use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::Transaction};
37use reifydb_type::Result;
38use tracing::{info, warn};
39
40use crate::{
41 builder::FlowBuilderConfig,
42 catalog::FlowCatalog,
43 deferred::{
44 coordinator::{CoordinatorActor, CoordinatorMsg, FlowConsumeRef, extract_new_flow_ids},
45 lag::FlowLags,
46 pool::{PoolActor, PoolMsg},
47 tracker::PrimitiveVersionTracker,
48 worker::{FlowMsg, FlowWorkerActor},
49 },
50 engine::FlowEngine,
51 transactional::{
52 interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
53 registrar::TransactionalFlowRegistrar,
54 },
55};
56
57struct FlowConsumeDispatcher {
60 coordinator: FlowConsumeRef,
61 registrar: TransactionalFlowRegistrar,
62 flow_catalog: FlowCatalog,
63 engine: StandardEngine,
64}
65
66impl CdcConsume for FlowConsumeDispatcher {
67 fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
68 let new_flow_ids = extract_new_flow_ids(&cdcs);
70 if !new_flow_ids.is_empty() {
71 if let Ok(mut query) = self.engine.begin_query() {
72 for flow_id in new_flow_ids {
73 match self
74 .flow_catalog
75 .get_or_load_flow(&mut Transaction::Query(&mut query), flow_id)
76 {
77 Ok((flow, true)) => {
78 match self.registrar.try_register(flow) {
82 Ok(true) => { }
83 Ok(false) => {
84 self.flow_catalog.remove(flow_id);
87 }
88 Err(e) => {
89 self.flow_catalog.remove(flow_id);
90 warn!(
91 flow_id = flow_id.0,
92 error = %e,
93 "failed to register transactional flow"
94 );
95 }
96 }
97 }
98 Ok((_, false)) => {
99 }
101 Err(e) => {
102 warn!(
103 flow_id = flow_id.0,
104 error = %e,
105 "failed to load flow for transactional check"
106 );
107 }
108 }
109 }
110 }
111 }
112
113 self.coordinator.consume(cdcs, reply);
117 }
118}
119
120pub struct FlowSubsystem {
122 consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
123 worker_handles: Vec<ActorHandle<FlowMsg>>,
124 pool_handle: Option<ActorHandle<PoolMsg>>,
125 coordinator_handle: Option<ActorHandle<CoordinatorMsg>>,
126 transactional_flow_engine: Arc<RwLock<FlowEngine>>,
127 running: bool,
128}
129
130impl FlowSubsystem {
131 pub fn new(config: FlowBuilderConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
133 let catalog = engine.catalog();
134 let executor = engine.executor();
135 let event_bus = engine.event_bus().clone();
136
137 #[cfg(reifydb_target = "native")]
138 if let Some(ref operators_dir) = config.operators_dir {
139 if let Err(e) = load_ffi_operators(operators_dir, &event_bus) {
140 panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
141 }
142 event_bus.wait_for_completion();
143 }
144
145 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
146 let clock = runtime.clock().clone();
147 let clock_for_factory = clock.clone();
148
149 let custom_operators = Arc::new(config.custom_operators);
150 let custom_operators_for_factory = custom_operators.clone();
151
152 let factory_builder = move || {
153 let cat = catalog.clone();
154 let exec = executor.clone();
155 let bus = event_bus.clone();
156 let clk = clock_for_factory.clone();
157 let co = custom_operators_for_factory.clone();
158
159 move || FlowEngine::new(cat, exec, bus, clk, co)
160 };
161
162 let primitive_tracker = Arc::new(PrimitiveVersionTracker::new());
163
164 let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
165
166 let num_workers = config.num_workers;
167 info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
168
169 let actor_system = engine.actor_system();
170
171 let mut worker_refs = Vec::with_capacity(num_workers);
172 let mut worker_handles = Vec::with_capacity(num_workers);
173 for i in 0..num_workers {
174 let worker_factory = factory_builder();
175 let worker = FlowWorkerActor::new(worker_factory, engine.clone(), engine.catalog());
176 let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
177 worker_refs.push(handle.actor_ref().clone());
178 worker_handles.push(handle);
179 }
180
181 let pool = PoolActor::new(worker_refs, clock.clone());
182 let pool_handle = actor_system.spawn("flow-pool", pool);
183 let pool_ref = pool_handle.actor_ref().clone();
184
185 let flow_catalog = FlowCatalog::new(engine.catalog());
188
189 let coordinator = CoordinatorActor::new(
190 engine.clone(),
191 flow_catalog.clone(),
192 pool_ref,
193 primitive_tracker.clone(),
194 cdc_store.clone(),
195 num_workers,
196 clock,
197 );
198 let coordinator_handle = actor_system.spawn("flow-coordinator", coordinator);
199 let actor_ref = coordinator_handle.actor_ref().clone();
200
201 let consumer_id = CdcConsumerId::new("flow-coordinator");
202 let consumer_key = CdcConsumerKey {
203 consumer: consumer_id.clone(),
204 }
205 .encode();
206 let consume_ref = FlowConsumeRef {
207 actor_ref,
208 consumer_key,
209 };
210
211 let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
213 engine.catalog(),
214 engine.executor(),
215 engine.event_bus().clone(),
216 runtime.clock().clone(),
217 custom_operators.clone(),
218 )));
219
220 let registrar = TransactionalFlowRegistrar {
222 flow_engine: transactional_flow_engine.clone(),
223 engine: engine.clone(),
224 catalog: engine.catalog(),
225 };
226
227 let transactional_flow_engine_for_self = transactional_flow_engine.clone();
228
229 {
231 let flow_engine_for_interceptor = transactional_flow_engine.clone();
232 let engine_for_interceptor = engine.clone();
233 let catalog_for_interceptor = engine.catalog();
234 let registrar_for_interceptor = TransactionalFlowRegistrar {
235 flow_engine: transactional_flow_engine,
236 engine: engine.clone(),
237 catalog: engine.catalog(),
238 };
239
240 engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
241 interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
242 flow_engine: flow_engine_for_interceptor.clone(),
243 engine: engine_for_interceptor.clone(),
244 catalog: catalog_for_interceptor.clone(),
245 }));
246 interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
247 registrar: TransactionalFlowRegistrar {
248 flow_engine: registrar_for_interceptor.flow_engine.clone(),
249 engine: registrar_for_interceptor.engine.clone(),
250 catalog: registrar_for_interceptor.catalog.clone(),
251 },
252 }));
253 }));
254 }
255
256 ioc.register_service::<Arc<dyn FlowLagsProvider>>(Arc::new(FlowLags::new(
257 primitive_tracker,
258 engine.clone(),
259 flow_catalog.clone(),
260 )));
261
262 let poll_config =
263 PollConsumerConfig::new(consumer_id, "flow-cdc-poll", Duration::from_millis(10), Some(100));
264
265 let dispatcher = FlowConsumeDispatcher {
267 coordinator: consume_ref,
268 registrar,
269 flow_catalog: flow_catalog.clone(),
270 engine: engine.clone(),
271 };
272
273 let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
274
275 Self {
276 consumer,
277 worker_handles,
278 pool_handle: Some(pool_handle),
279 coordinator_handle: Some(coordinator_handle),
280 transactional_flow_engine: transactional_flow_engine_for_self,
281 running: false,
282 }
283 }
284}
285
286impl Subsystem for FlowSubsystem {
287 fn name(&self) -> &'static str {
288 "sub-flow"
289 }
290
291 fn start(&mut self) -> Result<()> {
292 if self.running {
293 return Ok(());
294 }
295
296 self.consumer.start()?;
297 self.running = true;
298 Ok(())
299 }
300
301 fn shutdown(&mut self) -> Result<()> {
302 if !self.running {
303 return Ok(());
304 }
305
306 self.consumer.stop()?;
307
308 if let Some(handle) = self.coordinator_handle.take() {
309 let _ = handle.join();
310 }
311
312 if let Some(handle) = self.pool_handle.take() {
313 let _ = handle.join();
314 }
315
316 for handle in self.worker_handles.drain(..) {
317 let _ = handle.join();
318 }
319
320 if let Ok(mut engine) = self.transactional_flow_engine.write() {
323 engine.clear();
324 }
325
326 self.running = false;
327 Ok(())
328 }
329
330 fn is_running(&self) -> bool {
331 self.running
332 }
333
334 fn health_status(&self) -> HealthStatus {
335 if self.is_running() {
336 HealthStatus::Healthy
337 } else {
338 HealthStatus::Unknown
339 }
340 }
341
342 fn as_any(&self) -> &dyn Any {
343 self
344 }
345
346 fn as_any_mut(&mut self) -> &mut dyn Any {
347 self
348 }
349}
350
351impl HasVersion for FlowSubsystem {
352 fn version(&self) -> SystemVersion {
353 SystemVersion {
354 name: env!("CARGO_PKG_NAME")
355 .strip_prefix("reifydb-")
356 .unwrap_or(env!("CARGO_PKG_NAME"))
357 .to_string(),
358 version: env!("CARGO_PKG_VERSION").to_string(),
359 description: "Data flow and stream processing subsystem".to_string(),
360 r#type: ComponentType::Subsystem,
361 }
362 }
363}
364
365impl Drop for FlowSubsystem {
366 fn drop(&mut self) {
367 if self.running {
368 let _ = self.shutdown();
369 }
370 }
371}