reifydb_sub_flow/worker/
mod.rs1pub mod same;
5
6use async_trait::async_trait;
7use reifydb_core::{CommitVersion, interface::FlowId};
8use reifydb_engine::StandardCommandTransaction;
9use reifydb_flow_operator_sdk::FlowChange;
10pub use same::SameThreadedWorker;
11
12use crate::FlowEngine;
13
14#[async_trait]
16pub trait WorkerPool {
17 async fn process(
30 &self,
31 txn: &mut StandardCommandTransaction,
32 units: UnitsOfWork,
33 engine: &FlowEngine,
34 ) -> crate::Result<()>;
35
36 fn name(&self) -> &str;
38}
39
40#[derive(Debug, Clone)]
42pub struct UnitOfWork {
43 pub flow_id: FlowId,
45
46 pub version: CommitVersion,
48
49 pub source_changes: Vec<FlowChange>,
52}
53
54impl UnitOfWork {
55 pub fn new(flow_id: FlowId, version: CommitVersion, source_changes: Vec<FlowChange>) -> Self {
57 Self {
58 flow_id,
59 version,
60 source_changes,
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
71pub struct UnitsOfWork(Vec<Vec<UnitOfWork>>);
72
73impl UnitsOfWork {
74 pub fn new(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
76 Self(flow_units)
77 }
78
79 pub fn empty() -> Self {
81 Self(Vec::new())
82 }
83
84 pub fn is_empty(&self) -> bool {
86 self.0.is_empty()
87 }
88
89 pub fn flow_count(&self) -> usize {
91 self.0.len()
92 }
93
94 pub fn total_units(&self) -> usize {
96 self.0.iter().map(|units| units.len()).sum()
97 }
98
99 pub fn as_inner(&self) -> &Vec<Vec<UnitOfWork>> {
101 &self.0
102 }
103
104 pub fn into_inner(self) -> Vec<Vec<UnitOfWork>> {
106 self.0
107 }
108}
109
110impl From<Vec<Vec<UnitOfWork>>> for UnitsOfWork {
111 fn from(flow_units: Vec<Vec<UnitOfWork>>) -> Self {
112 Self(flow_units)
113 }
114}
115
116impl IntoIterator for UnitsOfWork {
117 type Item = Vec<UnitOfWork>;
118 type IntoIter = std::vec::IntoIter<Vec<UnitOfWork>>;
119
120 fn into_iter(self) -> Self::IntoIter {
121 self.0.into_iter()
122 }
123}