reifydb_sub_flow/worker/
same.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::VecDeque;
5
6use async_trait::async_trait;
7use reifydb_engine::StandardCommandTransaction;
8use tracing::trace_span;
9
10use super::{UnitOfWork, UnitsOfWork, WorkerPool};
11use crate::{engine::FlowEngine, transaction::FlowTransaction};
12
13/// Same threaded worker that processes units of work sequentially from a deque
14///
15/// This is the initial implementation that can be easily replaced
16/// with parallel implementations later.
17pub struct SameThreadedWorker;
18
19impl SameThreadedWorker {
20	pub fn new() -> Self {
21		Self {}
22	}
23}
24
25#[async_trait]
26impl WorkerPool for SameThreadedWorker {
27	async fn process(
28		&self,
29		txn: &mut StandardCommandTransaction,
30		units: UnitsOfWork,
31		engine: &FlowEngine,
32	) -> crate::Result<()> {
33		// Flatten all flow units into a single queue for sequential processing
34		let mut queue: VecDeque<UnitOfWork> = units.into_iter().flat_map(|units| units.into_iter()).collect();
35
36		if queue.is_empty() {
37			return Ok(());
38		}
39
40		let first_unit = queue.front().unwrap();
41		let mut flow_txn = FlowTransaction::new(txn, first_unit.version).await;
42
43		{
44			let _loop_span = trace_span!("flow::process_all_units", unit_count = queue.len()).entered();
45		}
46
47		while let Some(unit_of_work) = queue.pop_front() {
48			{
49				let _unit_span = trace_span!(
50					"flow::process_unit",
51					flow_id = ?unit_of_work.flow_id,
52					version = unit_of_work.version.0,
53					change_count = unit_of_work.source_changes.len()
54				)
55				.entered();
56			}
57
58			if flow_txn.version() != unit_of_work.version {
59				flow_txn.update_version(unit_of_work.version).await;
60			}
61
62			let flow_id = unit_of_work.flow_id;
63			for change in unit_of_work.source_changes {
64				engine.process(&mut flow_txn, change, flow_id).await?;
65			}
66		}
67		flow_txn.commit(txn).await?;
68
69		Ok(())
70	}
71
72	fn name(&self) -> &str {
73		"single-threaded"
74	}
75}