reifydb_sub_flow/worker/
parallel.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use crossbeam_channel::bounded;
5use reifydb_engine::StandardCommandTransaction;
6use reifydb_sub_api::{SchedulerService, TaskContext, task_once};
7
8use super::{UnitOfWork, UnitsOfWork, WorkerPool};
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11/// Parallel worker pool that uses the sub-worker thread pool for execution
12///
13/// Each flow's units are submitted as a separate high-priority task to the
14/// worker pool. Different flows can execute in parallel, but each flow's
15/// units are processed sequentially to maintain version ordering.
16pub struct ParallelWorkerPool {
17	scheduler: SchedulerService,
18}
19
20impl ParallelWorkerPool {
21	/// Create a new parallel worker pool
22	pub fn new(scheduler: SchedulerService) -> Self {
23		Self {
24			scheduler,
25		}
26	}
27}
28
29impl WorkerPool for ParallelWorkerPool {
30	fn process(
31		&self,
32		txn: &mut StandardCommandTransaction,
33		units: UnitsOfWork,
34		engine: &FlowEngine,
35	) -> crate::Result<()> {
36		if units.is_empty() {
37			return Ok(());
38		}
39
40		let units_of_work = units.into_inner();
41		let mut txns: Vec<(Vec<UnitOfWork>, FlowTransaction)> = Vec::with_capacity(units_of_work.len());
42
43		for flow_units in units_of_work {
44			if !flow_units.is_empty() {
45				// INVARIANT: Validate that all units in this Vec are for the same flow_id
46				let flow_id = flow_units[0].flow_id;
47				for unit in &flow_units {
48					assert_eq!(
49						unit.flow_id, flow_id,
50						"INVARIANT VIOLATED: Flow units contain mixed flow_ids - expected {:?}, got {:?}. \
51						Each Vec should contain units for exactly one flow.",
52						flow_id, unit.flow_id
53					);
54				}
55
56				let first_version = flow_units[0].version;
57				let flow_txn = FlowTransaction::new(txn, first_version);
58				txns.push((flow_units, flow_txn));
59			}
60		}
61
62		// INVARIANT: Validate that no flow_id appears in multiple tasks
63		// This is critical to prevent keyspace overlap between parallel FlowTransactions
64		{
65			use std::collections::HashSet;
66			let mut flow_ids_in_tasks = HashSet::new();
67
68			for (flow_units, _) in &txns {
69				let flow_id = flow_units[0].flow_id;
70				assert!(
71					!flow_ids_in_tasks.contains(&flow_id),
72					"INVARIANT VIOLATED: flow_id {:?} will be processed by multiple parallel tasks. \
73					This will cause keyspace overlap as multiple FlowTransactions write to the same keys.",
74					flow_id
75				);
76				flow_ids_in_tasks.insert(flow_id);
77			}
78		}
79
80		let (result_tx, result_rx) = bounded(txns.len());
81
82		for (flow_units, mut flow_txn) in txns {
83			let result_tx = result_tx.clone();
84			let engine = engine.clone();
85
86			let task = task_once!(
87				"flow-processing",
88				High,
89				move |_ctx: &TaskContext| -> reifydb_core::Result<()> {
90					process(&mut flow_txn, flow_units, &engine)?;
91					let _ = result_tx.send(Ok(flow_txn));
92					Ok(())
93				}
94			);
95
96			self.scheduler.once(task)?;
97		}
98
99		// Drop our copy of sender so channel closes when all tasks complete
100		drop(result_tx);
101
102		let mut completed = Vec::new();
103		while let Ok(result) = result_rx.recv() {
104			match result {
105				Ok(flow_txn) => completed.push(flow_txn),
106				Err(e) => return e,
107			}
108		}
109
110		// Commit all FlowTransactions sequentially back to parent
111		for mut flow in completed {
112			flow.commit(txn)?;
113		}
114
115		Ok(())
116	}
117
118	fn name(&self) -> &str {
119		"parallel-worker-pool"
120	}
121}
122
123/// Process all units for a single flow in parallel worker, returning completed FlowTransaction
124fn process(flow_txn: &mut FlowTransaction, flow_units: Vec<UnitOfWork>, engine: &FlowEngine) -> crate::Result<()> {
125	// Process all units for this flow sequentially
126	for unit in flow_units {
127		// Update version if needed
128		if flow_txn.version() != unit.version {
129			flow_txn.update_version(unit.version)?;
130		}
131
132		// Process all source changes for this unit
133		for change in unit.source_changes {
134			engine.process(flow_txn, change)?;
135		}
136	}
137
138	Ok(())
139}