reifydb_sub_flow/worker/
mod.rs1pub mod parallel;
5pub mod same;
6
7pub use parallel::ParallelWorkerPool;
8use reifydb_core::{CommitVersion, interface::FlowId};
9use reifydb_engine::StandardCommandTransaction;
10use reifydb_flow_operator_sdk::FlowChange;
11pub use same::SameThreadedWorker;
12
13use crate::FlowEngine;
14
15pub trait WorkerPool {
17 fn process(
30 &self,
31 txn: &mut StandardCommandTransaction,
32 units: UnitsOfWork,
33 engine: &FlowEngine,
34 ) -> crate::Result<()>;
35
36 fn name(&self) -> &str;
38}
39
40#[derive(Debug, Clone)]
42pub struct UnitOfWork {
43 pub flow_id: FlowId,
45
46 pub version: CommitVersion,
48
49 pub source_changes: Vec<FlowChange>,
52}
53
54impl UnitOfWork {
55 pub fn new(flow_id: FlowId, version: CommitVersion, source_changes: Vec<FlowChange>) -> Self {
57 Self {
58 flow_id,
59 version,
60 source_changes,
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
71pub struct UnitsOfWork(Vec<Vec<UnitOfWork>>);
72
73impl UnitsOfWork {
74 pub fn new(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
76 Self(flow_units)
77 }
78
79 pub fn empty() -> Self {
81 Self(Vec::new())
82 }
83
84 pub fn is_empty(&self) -> bool {
86 self.0.is_empty()
87 }
88
89 pub fn flow_count(&self) -> usize {
91 self.0.len()
92 }
93
94 pub fn total_units(&self) -> usize {
96 self.0.iter().map(|units| units.len()).sum()
97 }
98
99 pub fn as_inner(&self) -> &Vec<Vec<UnitOfWork>> {
101 &self.0
102 }
103
104 pub fn into_inner(self) -> Vec<Vec<UnitOfWork>> {
106 self.0
107 }
108}
109
110impl From<Vec<Vec<UnitOfWork>>> for UnitsOfWork {
111 fn from(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
112 Self(flow_units)
113 }
114}
115
116impl IntoIterator for UnitsOfWork {
117 type Item = Vec<UnitOfWork>;
118 type IntoIter = std::vec::IntoIter<Vec<UnitOfWork>>;
119
120 fn into_iter(self) -> Self::IntoIter {
121 self.0.into_iter()
122 }
123}