reifydb_sub_flow/worker/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4pub mod parallel;
5pub mod same;
6
7pub use parallel::ParallelWorkerPool;
8use reifydb_core::{CommitVersion, interface::FlowId};
9use reifydb_engine::StandardCommandTransaction;
10use reifydb_flow_operator_sdk::FlowChange;
11pub use same::SameThreadedWorker;
12
13use crate::FlowEngine;
14
15/// Trait for different worker pool implementations
16pub trait WorkerPool {
17	/// Process a batch of units of work grouped by flow
18	///
19	/// Each flow's units are ordered by version and must be processed sequentially.
20	/// Different flows can be processed in parallel.
21	///
22	/// # Arguments
23	/// * `txn` - Parent transaction for creating FlowTransactions
24	/// * `units` - Units of work grouped by flow
25	/// * `engine` - Engine for processing flows
26	///
27	/// # Returns
28	/// Ok(()) if all units processed successfully, Err if any failed
29	fn process(
30		&self,
31		txn: &mut StandardCommandTransaction,
32		units: UnitsOfWork,
33		engine: &FlowEngine,
34	) -> crate::Result<()>;
35
36	/// Get a name for this worker implementation (for logging)
37	fn name(&self) -> &str;
38}
39
40/// A unit of work representing a flow and all its source changes
41#[derive(Debug, Clone)]
42pub struct UnitOfWork {
43	/// The flow to process
44	pub flow_id: FlowId,
45
46	/// The commit version for this unit of work
47	pub version: CommitVersion,
48
49	/// All source changes this flow needs to process
50	/// Multiple entries if flow subscribes to multiple sources (e.g., joins)
51	pub source_changes: Vec<FlowChange>,
52}
53
54impl UnitOfWork {
55	/// Create a new unit of work
56	pub fn new(flow_id: FlowId, version: CommitVersion, source_changes: Vec<FlowChange>) -> Self {
57		Self {
58			flow_id,
59			version,
60			source_changes,
61		}
62	}
63}
64
65/// A collection of units of work grouped by flow
66///
67/// Each inner vector represents one flow's units, ordered by version.
68/// The flow's units must be processed sequentially to maintain version ordering.
69/// Different flows (outer vector) can be processed in parallel.
70#[derive(Debug, Clone)]
71pub struct UnitsOfWork(Vec<Vec<UnitOfWork>>);
72
73impl UnitsOfWork {
74	/// Create a new UnitsOfWork from a vector of flow units
75	pub fn new(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
76		Self(flow_units)
77	}
78
79	/// Create an empty UnitsOfWork
80	pub fn empty() -> Self {
81		Self(Vec::new())
82	}
83
84	/// Check if there are no units of work
85	pub fn is_empty(&self) -> bool {
86		self.0.is_empty()
87	}
88
89	/// Get the number of flows
90	pub fn flow_count(&self) -> usize {
91		self.0.len()
92	}
93
94	/// Get the total number of units across all flows
95	pub fn total_units(&self) -> usize {
96		self.0.iter().map(|units| units.len()).sum()
97	}
98
99	/// Get a reference to the inner vector
100	pub fn as_inner(&self) -> &Vec<Vec<UnitOfWork>> {
101		&self.0
102	}
103
104	/// Consume self and return the inner vector
105	pub fn into_inner(self) -> Vec<Vec<UnitOfWork>> {
106		self.0
107	}
108}
109
110impl From<Vec<Vec<UnitOfWork>>> for UnitsOfWork {
111	fn from(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
112		Self(flow_units)
113	}
114}
115
116impl IntoIterator for UnitsOfWork {
117	type Item = Vec<UnitOfWork>;
118	type IntoIter = std::vec::IntoIter<Vec<UnitOfWork>>;
119
120	fn into_iter(self) -> Self::IntoIter {
121		self.0.into_iter()
122	}
123}