reifydb_sub_flow/engine/
process.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::FlowId;
5use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
6use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
7use tracing::instrument;
8
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11impl FlowEngine {
12	#[instrument(level = "debug", skip(self, txn), fields(flow_id = ?flow_id, origin = ?change.origin, version = change.version.0, diff_count = change.diffs.len()))]
13	pub fn process(&self, txn: &mut FlowTransaction, change: FlowChange, flow_id: FlowId) -> crate::Result<()> {
14		match change.origin {
15			FlowChangeOrigin::External(source) => {
16				let sources = self.inner.sources.read();
17				if let Some(node_registrations) = sources.get(&source) {
18					// Clone the node registrations to avoid holding the lock while processing
19					let node_registrations = node_registrations.clone();
20					drop(sources);
21
22					for (flow_id, node_id) in node_registrations {
23						let flows = self.inner.flows.read();
24						if let Some(flow) = flows.get(&flow_id) {
25							if let Some(node) = flow.get_node(&node_id) {
26								let flow = flow.clone();
27								let node = node.clone();
28								drop(flows);
29
30								self.process_change(
31									txn,
32									&flow,
33									&node,
34									FlowChange::internal(
35										node_id,
36										change.version,
37										change.diffs.clone(),
38									),
39								)?;
40							} else {
41								drop(flows);
42							}
43						} else {
44							drop(flows);
45						}
46					}
47				}
48			}
49			FlowChangeOrigin::Internal(node_id) => {
50				// Internal changes are already scoped to a specific node
51				// This path is used by the partition logic to directly process a node's changes
52				// Use the flow_id parameter for direct lookup instead of iterating all flows
53				let flows = self.inner.flows.read();
54				if let Some(flow) = flows.get(&flow_id) {
55					if let Some(node) = flow.get_node(&node_id) {
56						let flow = flow.clone();
57						let node = node.clone();
58						drop(flows);
59
60						self.process_change(txn, &flow, &node, change)?;
61					}
62				}
63			}
64		}
65		Ok(())
66	}
67
68	#[instrument(level = "trace", skip(self, txn), fields(node_id = ?node.id))]
69	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: FlowChange) -> crate::Result<FlowChange> {
70		let operator = self.inner.operators.read().get(&node.id).unwrap().clone();
71		let result = operator.apply(txn, change, &self.inner.evaluator)?;
72		Ok(result)
73	}
74
75	#[instrument(level = "trace", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, input_diffs = change.diffs.len()))]
76	fn process_change(
77		&self,
78		txn: &mut FlowTransaction,
79		flow: &Flow,
80		node: &FlowNode,
81		change: FlowChange,
82	) -> crate::Result<()> {
83		let node_type = &node.ty;
84		let changes = &node.outputs;
85
86		let change = match &node_type {
87			SourceInlineData {} => unimplemented!(),
88			_ => self.apply(txn, node, change)?,
89		};
90
91		if changes.is_empty() {
92		} else if changes.len() == 1 {
93			let output_id = changes[0];
94			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
95		} else {
96			let (last, rest) = changes.split_last().unwrap();
97			for output_id in rest {
98				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
99			}
100			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
101		}
102
103		Ok(())
104	}
105}