reifydb_sub_flow/worker/
same.rs1use std::collections::VecDeque;
5
6use reifydb_engine::StandardCommandTransaction;
7
8use super::{UnitOfWork, UnitsOfWork, WorkerPool};
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11pub struct SameThreadedWorker;
16
17impl SameThreadedWorker {
18 pub fn new() -> Self {
19 Self {}
20 }
21}
22
23impl WorkerPool for SameThreadedWorker {
24 fn process(
25 &self,
26 txn: &mut StandardCommandTransaction,
27 units: UnitsOfWork,
28 engine: &FlowEngine,
29 ) -> crate::Result<()> {
30 let mut queue: VecDeque<UnitOfWork> = units.into_iter().flat_map(|units| units.into_iter()).collect();
32
33 if queue.is_empty() {
34 return Ok(());
35 }
36
37 let first_unit = queue.front().unwrap();
38 let mut flow_txn = FlowTransaction::new(txn, first_unit.version);
39
40 while let Some(unit_of_work) = queue.pop_front() {
41 if flow_txn.version() != unit_of_work.version {
42 flow_txn.update_version(unit_of_work.version)?;
43 }
44
45 for change in unit_of_work.source_changes {
46 engine.process(&mut flow_txn, change)?;
47 }
48 }
49
50 flow_txn.commit(txn)?;
51
52 Ok(())
53 }
54
55 fn name(&self) -> &str {
56 "single-threaded"
57 }
58}