Skip to main content

reifydb_sub_flow/engine/
process.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::{
5	catalog::flow::FlowId,
6	change::{Change, ChangeOrigin},
7};
8use reifydb_rql::flow::{
9	flow::FlowDag,
10	node::{FlowNode, FlowNodeType::SourceInlineData},
11};
12use reifydb_type::Result;
13use tracing::{Span, field, instrument};
14
15use crate::{engine::FlowEngine, transaction::FlowTransaction};
16
17impl FlowEngine {
18	#[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
19		flow_id = ?flow_id,
20		origin = ?change.origin,
21		version = change.version.0,
22		diff_count = change.diffs.len(),
23		nodes_processed = field::Empty
24	))]
25	pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> Result<()> {
26		let mut nodes_processed = 0;
27
28		match change.origin {
29			ChangeOrigin::Primitive(source) => {
30				let node_registrations = self.sources.get(&source).cloned();
31
32				if let Some(node_registrations) = node_registrations {
33					for (registered_flow_id, node_id) in node_registrations {
34						// Only process the flow that was passed as parameter
35						if registered_flow_id != flow_id {
36							continue;
37						}
38
39						let flow_and_node =
40							self.flows.get(&registered_flow_id).and_then(|flow| {
41								flow.get_node(&node_id)
42									.map(|node| (flow.clone(), node.clone()))
43							});
44
45						if let Some((flow, node)) = flow_and_node {
46							self.process_change(
47								txn,
48								&flow,
49								&node,
50								Change::from_flow(
51									node_id,
52									change.version,
53									change.diffs.clone(),
54								),
55							)?;
56							nodes_processed += 1;
57						}
58					}
59				}
60			}
61			ChangeOrigin::Flow(node_id) => {
62				// Internal changes are already scoped to a specific node
63				// This path is used by the partition logic to directly process a node's changes
64				// Use the flow_id parameter for direct lookup instead of iterating all flows
65				let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
66					flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
67				});
68
69				if let Some((flow, node)) = flow_and_node {
70					self.process_change(txn, &flow, &node, change)?;
71					nodes_processed += 1;
72				}
73			}
74		}
75
76		Span::current().record("nodes_processed", nodes_processed);
77		Ok(())
78	}
79
80	#[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
81		node_id = ?node.id,
82		node_type = ?node.ty,
83		input_diffs = change.diffs.len(),
84		output_diffs = field::Empty,
85		lock_wait_us = field::Empty,
86		apply_time_us = field::Empty
87	))]
88	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Change) -> Result<Change> {
89		let lock_start = self.clock.instant();
90		let operator = self.operators.get(&node.id).unwrap().clone();
91		Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
92
93		let apply_start = self.clock.instant();
94		let result = operator.apply(txn, change)?;
95		Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
96		Span::current().record("output_diffs", result.diffs.len());
97		Ok(result)
98	}
99
100	#[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
101		flow_id = ?flow.id,
102		node_id = ?node.id,
103		input_diffs = change.diffs.len(),
104		output_diffs = field::Empty,
105		downstream_count = node.outputs.len(),
106		propagation_time_us = field::Empty
107	))]
108	fn process_change(
109		&self,
110		txn: &mut FlowTransaction,
111		flow: &FlowDag,
112		node: &FlowNode,
113		change: Change,
114	) -> Result<()> {
115		let node_type = &node.ty;
116		let changes = &node.outputs;
117
118		let change = match &node_type {
119			SourceInlineData {} => unimplemented!(),
120			_ => {
121				let result = self.apply(txn, node, change)?;
122				Span::current().record("output_diffs", result.diffs.len());
123				result
124			}
125		};
126
127		let propagation_start = self.clock.instant();
128		if changes.is_empty() {
129		} else if changes.len() == 1 {
130			let output_id = changes[0];
131			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
132		} else {
133			let (last, rest) = changes.split_last().unwrap();
134			for output_id in rest {
135				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
136			}
137			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
138		}
139		Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
140
141		Ok(())
142	}
143}