Skip to main content

reifydb_sub_flow/engine/
process.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::{
5	catalog::flow::FlowId,
6	change::{Change, ChangeOrigin},
7};
8use reifydb_rql::flow::{
9	flow::FlowDag,
10	node::{FlowNode, FlowNodeType::SourceInlineData},
11};
12use tracing::{Span, instrument};
13
14use crate::{engine::FlowEngine, transaction::FlowTransaction};
15
16impl FlowEngine {
17	#[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
18		flow_id = ?flow_id,
19		origin = ?change.origin,
20		version = change.version.0,
21		diff_count = change.diffs.len(),
22		nodes_processed = tracing::field::Empty
23	))]
24	pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> reifydb_type::Result<()> {
25		let mut nodes_processed = 0;
26
27		match change.origin {
28			ChangeOrigin::Primitive(source) => {
29				let node_registrations = self.sources.get(&source).cloned();
30
31				if let Some(node_registrations) = node_registrations {
32					for (registered_flow_id, node_id) in node_registrations {
33						// Only process the flow that was passed as parameter
34						if registered_flow_id != flow_id {
35							continue;
36						}
37
38						let flow_and_node =
39							self.flows.get(&registered_flow_id).and_then(|flow| {
40								flow.get_node(&node_id)
41									.map(|node| (flow.clone(), node.clone()))
42							});
43
44						if let Some((flow, node)) = flow_and_node {
45							self.process_change(
46								txn,
47								&flow,
48								&node,
49								Change::from_flow(
50									node_id,
51									change.version,
52									change.diffs.clone(),
53								),
54							)?;
55							nodes_processed += 1;
56						}
57					}
58				}
59			}
60			ChangeOrigin::Flow(node_id) => {
61				// Internal changes are already scoped to a specific node
62				// This path is used by the partition logic to directly process a node's changes
63				// Use the flow_id parameter for direct lookup instead of iterating all flows
64				let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
65					flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
66				});
67
68				if let Some((flow, node)) = flow_and_node {
69					self.process_change(txn, &flow, &node, change)?;
70					nodes_processed += 1;
71				}
72			}
73		}
74
75		Span::current().record("nodes_processed", nodes_processed);
76		Ok(())
77	}
78
79	#[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
80		node_id = ?node.id,
81		node_type = ?node.ty,
82		input_diffs = change.diffs.len(),
83		output_diffs = tracing::field::Empty,
84		lock_wait_us = tracing::field::Empty,
85		apply_time_us = tracing::field::Empty
86	))]
87	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Change) -> reifydb_type::Result<Change> {
88		let lock_start = self.clock.instant();
89		let operator = self.operators.get(&node.id).unwrap().clone();
90		Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
91
92		let apply_start = self.clock.instant();
93		let result = operator.apply(txn, change)?;
94		Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
95		Span::current().record("output_diffs", result.diffs.len());
96		Ok(result)
97	}
98
99	#[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
100		flow_id = ?flow.id,
101		node_id = ?node.id,
102		input_diffs = change.diffs.len(),
103		output_diffs = tracing::field::Empty,
104		downstream_count = node.outputs.len(),
105		propagation_time_us = tracing::field::Empty
106	))]
107	fn process_change(
108		&self,
109		txn: &mut FlowTransaction,
110		flow: &FlowDag,
111		node: &FlowNode,
112		change: Change,
113	) -> reifydb_type::Result<()> {
114		let node_type = &node.ty;
115		let changes = &node.outputs;
116
117		let change = match &node_type {
118			SourceInlineData {} => unimplemented!(),
119			_ => {
120				let result = self.apply(txn, node, change)?;
121				Span::current().record("output_diffs", result.diffs.len());
122				result
123			}
124		};
125
126		let propagation_start = self.clock.instant();
127		if changes.is_empty() {
128		} else if changes.len() == 1 {
129			let output_id = changes[0];
130			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
131		} else {
132			let (last, rest) = changes.split_last().unwrap();
133			for output_id in rest {
134				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
135			}
136			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
137		}
138		Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
139
140		Ok(())
141	}
142}