Skip to main content

reifydb_sub_flow/engine/
process.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::interface::{
7	catalog::flow::FlowId,
8	change::{Change, ChangeOrigin},
9};
10use reifydb_rql::flow::{
11	flow::FlowDag,
12	node::{FlowNode, FlowNodeType::SourceInlineData},
13};
14use reifydb_type::{Result, value::datetime::DateTime};
15use tracing::{Span, field, instrument};
16
17use crate::{engine::FlowEngine, transaction::FlowTransaction};
18
19impl FlowEngine {
20	#[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
21		flow_id = ?flow_id,
22		origin = ?change.origin,
23		version = change.version.0,
24		diff_count = change.diffs.len(),
25		nodes_processed = field::Empty
26	))]
27	pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> Result<()> {
28		let mut nodes_processed = 0;
29
30		match change.origin {
31			ChangeOrigin::Shape(source) => {
32				let node_registrations = self.sources.get(&source).cloned();
33
34				if let Some(node_registrations) = node_registrations {
35					for (registered_flow_id, node_id) in node_registrations {
36						// Only process the flow that was passed as parameter
37						if registered_flow_id != flow_id {
38							continue;
39						}
40
41						let flow_and_node =
42							self.flows.get(&registered_flow_id).and_then(|flow| {
43								flow.get_node(&node_id)
44									.map(|node| (flow.clone(), node.clone()))
45							});
46
47						if let Some((flow, node)) = flow_and_node {
48							self.process_change(
49								txn,
50								&flow,
51								&node,
52								Arc::new(Change::from_flow(
53									node_id,
54									change.version,
55									change.diffs.clone(),
56									change.changed_at,
57								)),
58							)?;
59							nodes_processed += 1;
60						}
61					}
62				}
63			}
64			ChangeOrigin::Flow(node_id) => {
65				// Internal changes are already scoped to a specific node
66				// This path is used by the partition logic to directly process a node's changes
67				// Use the flow_id parameter for direct lookup instead of iterating all flows
68				let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
69					flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
70				});
71
72				if let Some((flow, node)) = flow_and_node {
73					self.process_change(txn, &flow, &node, Arc::new(change))?;
74					nodes_processed += 1;
75				}
76			}
77		}
78
79		Span::current().record("nodes_processed", nodes_processed);
80		Ok(())
81	}
82
83	#[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
84		node_id = ?node.id,
85		node_type = ?node.ty,
86		input_diffs = change.diffs.len(),
87		output_diffs = field::Empty,
88		lock_wait_us = field::Empty,
89		apply_time_us = field::Empty
90	))]
91	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Arc<Change>) -> Result<Change> {
92		let lock_start = self.runtime_context.clock.instant();
93		let operator = self.operators.get(&node.id).unwrap().clone();
94		Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
95
96		// Single-consumer path: try to take ownership of the Change without cloning.
97		// If another consumer still holds the Arc, fall back to a deep clone.
98		let owned = Arc::try_unwrap(change).unwrap_or_else(|arc| (*arc).clone());
99
100		let apply_start = self.runtime_context.clock.instant();
101		let result = operator.apply(txn, owned)?;
102		Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
103		Span::current().record("output_diffs", result.diffs.len());
104		Ok(result)
105	}
106
107	#[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
108		flow_id = ?flow.id,
109		node_id = ?node.id,
110		input_diffs = change.diffs.len(),
111		output_diffs = field::Empty,
112		downstream_count = node.outputs.len(),
113		propagation_time_us = field::Empty
114	))]
115	fn process_change(
116		&self,
117		txn: &mut FlowTransaction,
118		flow: &FlowDag,
119		node: &FlowNode,
120		change: Arc<Change>,
121	) -> Result<()> {
122		let node_type = &node.ty;
123		let changes = &node.outputs;
124
125		let change: Arc<Change> = match &node_type {
126			SourceInlineData {} => unimplemented!(),
127			_ => {
128				let result = self.apply(txn, node, change)?;
129				Span::current().record("output_diffs", result.diffs.len());
130				Arc::new(result)
131			}
132		};
133
134		let propagation_start = self.runtime_context.clock.instant();
135		if changes.is_empty() {
136		} else if changes.len() == 1 {
137			let output_id = changes[0];
138			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
139		} else {
140			let (last, rest) = changes.split_last().unwrap();
141			for output_id in rest {
142				// Fan-out: cheap Arc::clone (refcount bump) rather than deep Vec<Diff>::clone.
143				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), Arc::clone(&change))?;
144			}
145			// Last consumer takes the original Arc; if no one else retained it,
146			// `apply`'s `try_unwrap` succeeds and avoids the deep clone entirely.
147			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
148		}
149		Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
150
151		Ok(())
152	}
153
154	#[instrument(name = "flow::engine::process_tick", level = "debug", skip(self, txn), fields(
155		flow_id = ?flow_id,
156		timestamp = %timestamp
157	))]
158	pub fn process_tick(&self, txn: &mut FlowTransaction, flow_id: FlowId, timestamp: DateTime) -> Result<()> {
159		let flow = match self.flows.get(&flow_id) {
160			Some(f) => f.clone(),
161			None => return Ok(()),
162		};
163
164		for node_id in flow.topological_order()? {
165			let operator = match self.operators.get(&node_id) {
166				Some(op) => op.clone(),
167				None => continue,
168			};
169
170			if let Some(change) = operator.tick(txn, timestamp)? {
171				let node = flow.get_node(&node_id).unwrap();
172				let outputs = &node.outputs;
173				if outputs.is_empty() {
174				} else if outputs.len() == 1 {
175					self.process_change(
176						txn,
177						&flow,
178						flow.get_node(&outputs[0]).unwrap(),
179						Arc::new(change),
180					)?;
181				} else {
182					let arc = Arc::new(change);
183					let (last, rest) = outputs.split_last().unwrap();
184					for output_id in rest {
185						self.process_change(
186							txn,
187							&flow,
188							flow.get_node(output_id).unwrap(),
189							Arc::clone(&arc),
190						)?;
191					}
192					self.process_change(txn, &flow, flow.get_node(last).unwrap(), arc)?;
193				}
194			}
195		}
196		Ok(())
197	}
198}