reifydb_sub_flow/worker/
parallel.rs1use crossbeam_channel::bounded;
5use reifydb_engine::StandardCommandTransaction;
6use reifydb_sub_api::{SchedulerService, TaskContext, task_once};
7
8use super::{UnitOfWork, UnitsOfWork, WorkerPool};
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11pub struct ParallelWorkerPool {
17 scheduler: SchedulerService,
18}
19
20impl ParallelWorkerPool {
21 pub fn new(scheduler: SchedulerService) -> Self {
23 Self {
24 scheduler,
25 }
26 }
27}
28
29impl WorkerPool for ParallelWorkerPool {
30 fn process(
31 &self,
32 txn: &mut StandardCommandTransaction,
33 units: UnitsOfWork,
34 engine: &FlowEngine,
35 ) -> crate::Result<()> {
36 if units.is_empty() {
37 return Ok(());
38 }
39
40 let units_of_work = units.into_inner();
41 let mut txns: Vec<(Vec<UnitOfWork>, FlowTransaction)> = Vec::with_capacity(units_of_work.len());
42
43 for flow_units in units_of_work {
44 if !flow_units.is_empty() {
45 let flow_id = flow_units[0].flow_id;
47 for unit in &flow_units {
48 assert_eq!(
49 unit.flow_id, flow_id,
50 "INVARIANT VIOLATED: Flow units contain mixed flow_ids - expected {:?}, got {:?}. \
51 Each Vec should contain units for exactly one flow.",
52 flow_id, unit.flow_id
53 );
54 }
55
56 let first_version = flow_units[0].version;
57 let flow_txn = FlowTransaction::new(txn, first_version);
58 txns.push((flow_units, flow_txn));
59 }
60 }
61
62 {
65 use std::collections::HashSet;
66 let mut flow_ids_in_tasks = HashSet::new();
67
68 for (flow_units, _) in &txns {
69 let flow_id = flow_units[0].flow_id;
70 assert!(
71 !flow_ids_in_tasks.contains(&flow_id),
72 "INVARIANT VIOLATED: flow_id {:?} will be processed by multiple parallel tasks. \
73 This will cause keyspace overlap as multiple FlowTransactions write to the same keys.",
74 flow_id
75 );
76 flow_ids_in_tasks.insert(flow_id);
77 }
78 }
79
80 let (result_tx, result_rx) = bounded(txns.len());
81
82 for (flow_units, mut flow_txn) in txns {
83 let result_tx = result_tx.clone();
84 let engine = engine.clone();
85
86 let task = task_once!(
87 "flow-processing",
88 High,
89 move |_ctx: &TaskContext| -> reifydb_core::Result<()> {
90 process(&mut flow_txn, flow_units, &engine)?;
91 let _ = result_tx.send(Ok(flow_txn));
92 Ok(())
93 }
94 );
95
96 self.scheduler.once(task)?;
97 }
98
99 drop(result_tx);
101
102 let mut completed = Vec::new();
103 while let Ok(result) = result_rx.recv() {
104 match result {
105 Ok(flow_txn) => completed.push(flow_txn),
106 Err(e) => return e,
107 }
108 }
109
110 for mut flow in completed {
112 flow.commit(txn)?;
113 }
114
115 Ok(())
116 }
117
118 fn name(&self) -> &str {
119 "parallel-worker-pool"
120 }
121}
122
123fn process(flow_txn: &mut FlowTransaction, flow_units: Vec<UnitOfWork>, engine: &FlowEngine) -> crate::Result<()> {
125 for unit in flow_units {
127 if flow_txn.version() != unit.version {
129 flow_txn.update_version(unit.version)?;
130 }
131
132 for change in unit.source_changes {
134 engine.process(flow_txn, change)?;
135 }
136 }
137
138 Ok(())
139}