reifydb_sub_flow/worker/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4pub mod parallel;
5pub mod processor;
6pub mod same;
7
8pub use parallel::ParallelWorkerPool;
9pub use processor::WorkerPool;
10use reifydb_core::{CommitVersion, interface::FlowId};
11pub use same::SameThreadedWorker;
12
13use crate::flow::FlowChange;
14
15/// A unit of work representing a flow and all its source changes
16#[derive(Debug, Clone)]
17pub struct UnitOfWork {
18	/// The flow to process
19	pub flow_id: FlowId,
20
21	/// The commit version for this unit of work
22	pub version: CommitVersion,
23
24	/// All source changes this flow needs to process
25	/// Multiple entries if flow subscribes to multiple sources (e.g., joins)
26	pub source_changes: Vec<FlowChange>,
27}
28
29impl UnitOfWork {
30	/// Create a new unit of work
31	pub fn new(flow_id: FlowId, version: CommitVersion, source_changes: Vec<FlowChange>) -> Self {
32		Self {
33			flow_id,
34			version,
35			source_changes,
36		}
37	}
38}
39
40/// A collection of units of work grouped by flow
41///
42/// Each inner vector represents one flow's units, ordered by version.
43/// The flow's units must be processed sequentially to maintain version ordering.
44/// Different flows (outer vector) can be processed in parallel.
45#[derive(Debug, Clone)]
46pub struct UnitsOfWork(Vec<Vec<UnitOfWork>>);
47
48impl UnitsOfWork {
49	/// Create a new UnitsOfWork from a vector of flow units
50	pub fn new(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
51		Self(flow_units)
52	}
53
54	/// Create an empty UnitsOfWork
55	pub fn empty() -> Self {
56		Self(Vec::new())
57	}
58
59	/// Check if there are no units of work
60	pub fn is_empty(&self) -> bool {
61		self.0.is_empty()
62	}
63
64	/// Get the number of flows
65	pub fn flow_count(&self) -> usize {
66		self.0.len()
67	}
68
69	/// Get the total number of units across all flows
70	pub fn total_units(&self) -> usize {
71		self.0.iter().map(|units| units.len()).sum()
72	}
73
74	/// Get a reference to the inner vector
75	pub fn as_inner(&self) -> &Vec<Vec<UnitOfWork>> {
76		&self.0
77	}
78
79	/// Consume self and return the inner vector
80	pub fn into_inner(self) -> Vec<Vec<UnitOfWork>> {
81		self.0
82	}
83}
84
85impl From<Vec<Vec<UnitOfWork>>> for UnitsOfWork {
86	fn from(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
87		Self(flow_units)
88	}
89}
90
91impl IntoIterator for UnitsOfWork {
92	type Item = Vec<UnitOfWork>;
93	type IntoIter = std::vec::IntoIter<Vec<UnitOfWork>>;
94
95	fn into_iter(self) -> Self::IntoIter {
96		self.0.into_iter()
97	}
98}