reifydb_sub_flow/subsystem/
mod.rs1pub mod factory;
5#[cfg(reifydb_target = "native")]
6pub mod ffi;
7
8use std::{
9 any::Any,
10 sync::{Arc, RwLock},
11 time::Duration,
12};
13
14#[cfg(reifydb_target = "native")]
15use ffi::load_ffi_operators;
16use reifydb_cdc::{
17 consume::{
18 consumer::{CdcConsume, CdcConsumer},
19 poll::{PollConsumer, PollConsumerConfig},
20 },
21 storage::CdcStore,
22};
23use reifydb_core::{
24 interface::{
25 WithEventBus,
26 cdc::{Cdc, CdcConsumerId},
27 flow::FlowLagsProvider,
28 version::{ComponentType, HasVersion, SystemVersion},
29 },
30 key::{EncodableKey, cdc_consumer::CdcConsumerKey},
31 util::ioc::IocContainer,
32};
33use reifydb_engine::engine::StandardEngine;
34use reifydb_runtime::{SharedRuntime, actor::system::ActorHandle};
35use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
36use reifydb_transaction::{
37 interceptor::interceptors::Interceptors, testing::TestingViewMutationCaptor, transaction::Transaction,
38};
39use reifydb_type::Result;
40use tracing::{info, warn};
41
42use crate::{
43 builder::FlowBuilderConfig,
44 catalog::FlowCatalog,
45 deferred::{
46 coordinator::{CoordinatorActor, CoordinatorMsg, FlowConsumeRef, extract_new_flow_ids},
47 lag::FlowLags,
48 pool::{PoolActor, PoolMsg},
49 tracker::PrimitiveVersionTracker,
50 worker::{FlowMsg, FlowWorkerActor},
51 },
52 engine::FlowEngine,
53 testing::ViewInlineTestingMutationCapture,
54 transactional::{
55 interceptor::{TransactionalFlowPostCommitInterceptor, TransactionalFlowPreCommitInterceptor},
56 registrar::TransactionalFlowRegistrar,
57 },
58};
59
60struct FlowConsumeDispatcher {
63 coordinator: FlowConsumeRef,
64 registrar: TransactionalFlowRegistrar,
65 flow_catalog: FlowCatalog,
66 engine: StandardEngine,
67}
68
69impl CdcConsume for FlowConsumeDispatcher {
70 fn consume(&self, cdcs: Vec<Cdc>, reply: Box<dyn FnOnce(Result<()>) + Send>) {
71 let new_flow_ids = extract_new_flow_ids(&cdcs);
73 if !new_flow_ids.is_empty() {
74 if let Ok(mut query) = self.engine.begin_query() {
75 for flow_id in new_flow_ids {
76 match self
77 .flow_catalog
78 .get_or_load_flow(&mut Transaction::Query(&mut query), flow_id)
79 {
80 Ok((flow, true)) => {
81 match self.registrar.try_register(flow) {
85 Ok(true) => { }
86 Ok(false) => {
87 self.flow_catalog.remove(flow_id);
90 }
91 Err(e) => {
92 self.flow_catalog.remove(flow_id);
93 warn!(
94 flow_id = flow_id.0,
95 error = %e,
96 "failed to register transactional flow"
97 );
98 }
99 }
100 }
101 Ok((_, false)) => {
102 }
104 Err(e) => {
105 warn!(
106 flow_id = flow_id.0,
107 error = %e,
108 "failed to load flow for transactional check"
109 );
110 }
111 }
112 }
113 }
114 }
115
116 self.coordinator.consume(cdcs, reply);
120 }
121}
122
123pub struct FlowSubsystem {
125 consumer: PollConsumer<StandardEngine, FlowConsumeDispatcher>,
126 worker_handles: Vec<ActorHandle<FlowMsg>>,
127 pool_handle: Option<ActorHandle<PoolMsg>>,
128 coordinator_handle: Option<ActorHandle<CoordinatorMsg>>,
129 transactional_flow_engine: Arc<RwLock<FlowEngine>>,
130 running: bool,
131}
132
133impl FlowSubsystem {
134 pub fn new(config: FlowBuilderConfig, engine: StandardEngine, ioc: &IocContainer) -> Self {
136 let catalog = engine.catalog();
137 let executor = engine.executor();
138 let event_bus = engine.event_bus().clone();
139
140 #[cfg(reifydb_target = "native")]
141 if let Some(ref operators_dir) = config.operators_dir {
142 if let Err(e) = load_ffi_operators(operators_dir, &event_bus) {
143 panic!("Failed to load FFI operators from {:?}: {}", operators_dir, e);
144 }
145 event_bus.wait_for_completion();
146 }
147
148 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime must be registered");
149 let clock = runtime.clock().clone();
150 let clock_for_factory = clock.clone();
151
152 let custom_operators = Arc::new(config.custom_operators);
153 let custom_operators_for_factory = custom_operators.clone();
154
155 let factory_builder = move || {
156 let cat = catalog.clone();
157 let exec = executor.clone();
158 let bus = event_bus.clone();
159 let clk = clock_for_factory.clone();
160 let co = custom_operators_for_factory.clone();
161
162 move || FlowEngine::new(cat, exec, bus, clk, co)
163 };
164
165 let primitive_tracker = Arc::new(PrimitiveVersionTracker::new());
166
167 let cdc_store = ioc.resolve::<CdcStore>().expect("CdcStore must be registered");
168
169 let num_workers = config.num_workers;
170 info!(num_workers, "initializing flow coordinator with {} workers", num_workers);
171
172 let actor_system = engine.actor_system();
173
174 let mut worker_refs = Vec::with_capacity(num_workers);
175 let mut worker_handles = Vec::with_capacity(num_workers);
176 for i in 0..num_workers {
177 let worker_factory = factory_builder();
178 let worker = FlowWorkerActor::new(worker_factory, engine.clone(), engine.catalog());
179 let handle = actor_system.spawn(&format!("flow-worker-{}", i), worker);
180 worker_refs.push(handle.actor_ref().clone());
181 worker_handles.push(handle);
182 }
183
184 let pool = PoolActor::new(worker_refs, clock.clone());
185 let pool_handle = actor_system.spawn("flow-pool", pool);
186 let pool_ref = pool_handle.actor_ref().clone();
187
188 let flow_catalog = FlowCatalog::new(engine.catalog());
191
192 let coordinator = CoordinatorActor::new(
193 engine.clone(),
194 flow_catalog.clone(),
195 pool_ref,
196 primitive_tracker.clone(),
197 cdc_store.clone(),
198 num_workers,
199 clock,
200 );
201 let coordinator_handle = actor_system.spawn("flow-coordinator", coordinator);
202 let actor_ref = coordinator_handle.actor_ref().clone();
203
204 let consumer_id = CdcConsumerId::new("flow-coordinator");
205 let consumer_key = CdcConsumerKey {
206 consumer: consumer_id.clone(),
207 }
208 .encode();
209 let consume_ref = FlowConsumeRef {
210 actor_ref,
211 consumer_key,
212 };
213
214 let transactional_flow_engine = Arc::new(RwLock::new(FlowEngine::new(
216 engine.catalog(),
217 engine.executor(),
218 engine.event_bus().clone(),
219 runtime.clock().clone(),
220 custom_operators.clone(),
221 )));
222
223 let registrar = TransactionalFlowRegistrar {
225 flow_engine: transactional_flow_engine.clone(),
226 engine: engine.clone(),
227 catalog: engine.catalog(),
228 };
229
230 let transactional_flow_engine_for_self = transactional_flow_engine.clone();
231
232 {
234 let flow_engine_for_interceptor = transactional_flow_engine.clone();
235 let engine_for_interceptor = engine.clone();
236 let catalog_for_interceptor = engine.catalog();
237 let registrar_for_interceptor = TransactionalFlowRegistrar {
238 flow_engine: transactional_flow_engine,
239 engine: engine.clone(),
240 catalog: engine.catalog(),
241 };
242
243 engine.add_interceptor_factory(Arc::new(move |interceptors: &mut Interceptors| {
244 interceptors.pre_commit.add(Arc::new(TransactionalFlowPreCommitInterceptor {
245 flow_engine: flow_engine_for_interceptor.clone(),
246 engine: engine_for_interceptor.clone(),
247 catalog: catalog_for_interceptor.clone(),
248 }));
249 interceptors.post_commit.add(Arc::new(TransactionalFlowPostCommitInterceptor {
250 registrar: TransactionalFlowRegistrar {
251 flow_engine: registrar_for_interceptor.flow_engine.clone(),
252 engine: registrar_for_interceptor.engine.clone(),
253 catalog: registrar_for_interceptor.catalog.clone(),
254 },
255 }));
256 }));
257 }
258
259 ioc.register_service::<Arc<dyn FlowLagsProvider>>(Arc::new(FlowLags::new(
260 primitive_tracker,
261 engine.clone(),
262 flow_catalog.clone(),
263 )));
264
265 ioc.register_service::<Arc<dyn TestingViewMutationCaptor>>(Arc::new(
266 ViewInlineTestingMutationCapture {
267 engine: engine.clone(),
268 catalog: engine.catalog(),
269 event_bus: engine.event_bus().clone(),
270 clock: runtime.clock().clone(),
271 custom_operators: custom_operators.clone(),
272 },
273 ));
274
275 let poll_config =
276 PollConsumerConfig::new(consumer_id, "flow-cdc-poll", Duration::from_millis(10), Some(100));
277
278 let dispatcher = FlowConsumeDispatcher {
280 coordinator: consume_ref,
281 registrar,
282 flow_catalog: flow_catalog.clone(),
283 engine: engine.clone(),
284 };
285
286 let consumer = PollConsumer::new(poll_config, engine, dispatcher, cdc_store, actor_system);
287
288 Self {
289 consumer,
290 worker_handles,
291 pool_handle: Some(pool_handle),
292 coordinator_handle: Some(coordinator_handle),
293 transactional_flow_engine: transactional_flow_engine_for_self,
294 running: false,
295 }
296 }
297}
298
299impl Subsystem for FlowSubsystem {
300 fn name(&self) -> &'static str {
301 "sub-flow"
302 }
303
304 fn start(&mut self) -> Result<()> {
305 if self.running {
306 return Ok(());
307 }
308
309 self.consumer.start()?;
310 self.running = true;
311 Ok(())
312 }
313
314 fn shutdown(&mut self) -> Result<()> {
315 if !self.running {
316 return Ok(());
317 }
318
319 self.consumer.stop()?;
320
321 if let Some(handle) = self.coordinator_handle.take() {
322 let _ = handle.join();
323 }
324
325 if let Some(handle) = self.pool_handle.take() {
326 let _ = handle.join();
327 }
328
329 for handle in self.worker_handles.drain(..) {
330 let _ = handle.join();
331 }
332
333 if let Ok(mut engine) = self.transactional_flow_engine.write() {
336 engine.clear();
337 }
338
339 self.running = false;
340 Ok(())
341 }
342
343 fn is_running(&self) -> bool {
344 self.running
345 }
346
347 fn health_status(&self) -> HealthStatus {
348 if self.is_running() {
349 HealthStatus::Healthy
350 } else {
351 HealthStatus::Unknown
352 }
353 }
354
355 fn as_any(&self) -> &dyn Any {
356 self
357 }
358
359 fn as_any_mut(&mut self) -> &mut dyn Any {
360 self
361 }
362}
363
364impl HasVersion for FlowSubsystem {
365 fn version(&self) -> SystemVersion {
366 SystemVersion {
367 name: env!("CARGO_PKG_NAME")
368 .strip_prefix("reifydb-")
369 .unwrap_or(env!("CARGO_PKG_NAME"))
370 .to_string(),
371 version: env!("CARGO_PKG_VERSION").to_string(),
372 description: "Data flow and stream processing subsystem".to_string(),
373 r#type: ComponentType::Subsystem,
374 }
375 }
376}
377
378impl Drop for FlowSubsystem {
379 fn drop(&mut self) {
380 if self.running {
381 let _ = self.shutdown();
382 }
383 }
384}