reifydb_sub_flow/worker/
same.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::VecDeque;
5
6use reifydb_engine::StandardCommandTransaction;
7
8use super::{UnitOfWork, UnitsOfWork, WorkerPool};
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11/// Same threaded worker that processes units of work sequentially from a deque
12///
13/// This is the initial implementation that can be easily replaced
14/// with parallel implementations later.
15pub struct SameThreadedWorker;
16
17impl SameThreadedWorker {
18	pub fn new() -> Self {
19		Self {}
20	}
21}
22
23impl WorkerPool for SameThreadedWorker {
24	fn process(
25		&self,
26		txn: &mut StandardCommandTransaction,
27		units: UnitsOfWork,
28		engine: &FlowEngine,
29	) -> crate::Result<()> {
30		// Flatten all flow units into a single queue for sequential processing
31		let mut queue: VecDeque<UnitOfWork> = units.into_iter().flat_map(|units| units.into_iter()).collect();
32
33		if queue.is_empty() {
34			return Ok(());
35		}
36
37		let first_unit = queue.front().unwrap();
38		let mut flow_txn = FlowTransaction::new(txn, first_unit.version);
39
40		while let Some(unit_of_work) = queue.pop_front() {
41			if flow_txn.version() != unit_of_work.version {
42				flow_txn.update_version(unit_of_work.version)?;
43			}
44
45			for change in unit_of_work.source_changes {
46				engine.process(&mut flow_txn, change)?;
47			}
48		}
49
50		flow_txn.commit(txn)?;
51
52		Ok(())
53	}
54
55	fn name(&self) -> &str {
56		"single-threaded"
57	}
58}