reifydb_sub_flow/worker/
same.rs1use std::collections::VecDeque;
5
6use async_trait::async_trait;
7use reifydb_engine::StandardCommandTransaction;
8use tracing::trace_span;
9
10use super::{UnitOfWork, UnitsOfWork, WorkerPool};
11use crate::{engine::FlowEngine, transaction::FlowTransaction};
12
13pub struct SameThreadedWorker;
18
19impl SameThreadedWorker {
20 pub fn new() -> Self {
21 Self {}
22 }
23}
24
25#[async_trait]
26impl WorkerPool for SameThreadedWorker {
27 async fn process(
28 &self,
29 txn: &mut StandardCommandTransaction,
30 units: UnitsOfWork,
31 engine: &FlowEngine,
32 ) -> crate::Result<()> {
33 let mut queue: VecDeque<UnitOfWork> = units.into_iter().flat_map(|units| units.into_iter()).collect();
35
36 if queue.is_empty() {
37 return Ok(());
38 }
39
40 let first_unit = queue.front().unwrap();
41 let mut flow_txn = FlowTransaction::new(txn, first_unit.version).await;
42
43 {
44 let _loop_span = trace_span!("flow::process_all_units", unit_count = queue.len()).entered();
45 }
46
47 while let Some(unit_of_work) = queue.pop_front() {
48 {
49 let _unit_span = trace_span!(
50 "flow::process_unit",
51 flow_id = ?unit_of_work.flow_id,
52 version = unit_of_work.version.0,
53 change_count = unit_of_work.source_changes.len()
54 )
55 .entered();
56 }
57
58 if flow_txn.version() != unit_of_work.version {
59 flow_txn.update_version(unit_of_work.version).await;
60 }
61
62 let flow_id = unit_of_work.flow_id;
63 for change in unit_of_work.source_changes {
64 engine.process(&mut flow_txn, change, flow_id).await?;
65 }
66 }
67 flow_txn.commit(txn).await?;
68
69 Ok(())
70 }
71
72 fn name(&self) -> &str {
73 "single-threaded"
74 }
75}