Skip to main content

reifydb_sub_flow/engine/
process.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::interface::{
5	catalog::flow::FlowId,
6	change::{Change, ChangeOrigin},
7};
8use reifydb_rql::flow::{
9	flow::FlowDag,
10	node::{FlowNode, FlowNodeType::SourceInlineData},
11};
12use reifydb_type::{Result, value::datetime::DateTime};
13use tracing::{Span, field, instrument};
14
15use crate::{engine::FlowEngine, transaction::FlowTransaction};
16
17impl FlowEngine {
18	#[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
19		flow_id = ?flow_id,
20		origin = ?change.origin,
21		version = change.version.0,
22		diff_count = change.diffs.len(),
23		nodes_processed = field::Empty
24	))]
25	pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> Result<()> {
26		let mut nodes_processed = 0;
27
28		match change.origin {
29			ChangeOrigin::Schema(source) => {
30				let node_registrations = self.sources.get(&source).cloned();
31
32				if let Some(node_registrations) = node_registrations {
33					for (registered_flow_id, node_id) in node_registrations {
34						// Only process the flow that was passed as parameter
35						if registered_flow_id != flow_id {
36							continue;
37						}
38
39						let flow_and_node =
40							self.flows.get(&registered_flow_id).and_then(|flow| {
41								flow.get_node(&node_id)
42									.map(|node| (flow.clone(), node.clone()))
43							});
44
45						if let Some((flow, node)) = flow_and_node {
46							self.process_change(
47								txn,
48								&flow,
49								&node,
50								Change::from_flow(
51									node_id,
52									change.version,
53									change.diffs.clone(),
54								),
55							)?;
56							nodes_processed += 1;
57						}
58					}
59				}
60			}
61			ChangeOrigin::Flow(node_id) => {
62				// Internal changes are already scoped to a specific node
63				// This path is used by the partition logic to directly process a node's changes
64				// Use the flow_id parameter for direct lookup instead of iterating all flows
65				let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
66					flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
67				});
68
69				if let Some((flow, node)) = flow_and_node {
70					self.process_change(txn, &flow, &node, change)?;
71					nodes_processed += 1;
72				}
73			}
74		}
75
76		Span::current().record("nodes_processed", nodes_processed);
77		Ok(())
78	}
79
80	#[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
81		node_id = ?node.id,
82		node_type = ?node.ty,
83		input_diffs = change.diffs.len(),
84		output_diffs = field::Empty,
85		lock_wait_us = field::Empty,
86		apply_time_us = field::Empty
87	))]
88	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Change) -> Result<Change> {
89		let lock_start = self.runtime_context.clock.instant();
90		let operator = self.operators.get(&node.id).unwrap().clone();
91		Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
92
93		let apply_start = self.runtime_context.clock.instant();
94		let result = operator.apply(txn, change)?;
95		Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
96		Span::current().record("output_diffs", result.diffs.len());
97		Ok(result)
98	}
99
100	#[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
101		flow_id = ?flow.id,
102		node_id = ?node.id,
103		input_diffs = change.diffs.len(),
104		output_diffs = field::Empty,
105		downstream_count = node.outputs.len(),
106		propagation_time_us = field::Empty
107	))]
108	fn process_change(
109		&self,
110		txn: &mut FlowTransaction,
111		flow: &FlowDag,
112		node: &FlowNode,
113		change: Change,
114	) -> Result<()> {
115		let node_type = &node.ty;
116		let changes = &node.outputs;
117
118		let change = match &node_type {
119			SourceInlineData {} => unimplemented!(),
120			_ => {
121				let result = self.apply(txn, node, change)?;
122				Span::current().record("output_diffs", result.diffs.len());
123				result
124			}
125		};
126
127		let propagation_start = self.runtime_context.clock.instant();
128		if changes.is_empty() {
129		} else if changes.len() == 1 {
130			let output_id = changes[0];
131			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
132		} else {
133			let (last, rest) = changes.split_last().unwrap();
134			for output_id in rest {
135				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
136			}
137			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
138		}
139		Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
140
141		Ok(())
142	}
143
144	#[instrument(name = "flow::engine::process_tick", level = "debug", skip(self, txn), fields(
145		flow_id = ?flow_id,
146		timestamp = %timestamp
147	))]
148	pub fn process_tick(&self, txn: &mut FlowTransaction, flow_id: FlowId, timestamp: DateTime) -> Result<()> {
149		let flow = match self.flows.get(&flow_id) {
150			Some(f) => f.clone(),
151			None => return Ok(()),
152		};
153
154		for node_id in flow.topological_order()? {
155			let operator = match self.operators.get(&node_id) {
156				Some(op) => op.clone(),
157				None => continue,
158			};
159
160			if let Some(change) = operator.tick(txn, timestamp)? {
161				let node = flow.get_node(&node_id).unwrap();
162				for &output_id in &node.outputs {
163					self.process_change(
164						txn,
165						&flow,
166						flow.get_node(&output_id).unwrap(),
167						change.clone(),
168					)?;
169				}
170			}
171		}
172		Ok(())
173	}
174}