reifydb_sub_flow/engine/
process.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
5use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
6
7use crate::{engine::FlowEngine, transaction::FlowTransaction};
8
9impl FlowEngine {
10	pub fn process(&self, txn: &mut FlowTransaction, change: FlowChange) -> crate::Result<()> {
11		match change.origin {
12			FlowChangeOrigin::External(source) => {
13				let sources = self.inner.sources.read();
14				if let Some(node_registrations) = sources.get(&source) {
15					// Clone the node registrations to avoid holding the lock while processing
16					let node_registrations = node_registrations.clone();
17					drop(sources);
18
19					for (flow_id, node_id) in node_registrations {
20						let flows = self.inner.flows.read();
21						if let Some(flow) = flows.get(&flow_id) {
22							if let Some(node) = flow.get_node(&node_id) {
23								let flow = flow.clone();
24								let node = node.clone();
25								drop(flows);
26
27								self.process_change(
28									txn,
29									&flow,
30									&node,
31									FlowChange::internal(
32										node_id,
33										change.version,
34										change.diffs.clone(),
35									),
36								)?;
37							} else {
38								drop(flows);
39							}
40						} else {
41							drop(flows);
42						}
43					}
44				}
45			}
46			FlowChangeOrigin::Internal(node_id) => {
47				// Internal changes are already scoped to a specific node
48				// This path is used by the partition logic to directly process a node's changes
49				// Find which flow this node belongs to by checking all registered flows
50				let flows = self.inner.flows.read();
51				for (_flow_id, flow) in flows.iter() {
52					if let Some(node) = flow.get_node(&node_id) {
53						let flow = flow.clone();
54						let node = node.clone();
55						drop(flows);
56
57						self.process_change(txn, &flow, &node, change)?;
58						return Ok(());
59					}
60				}
61				drop(flows);
62			}
63		}
64		Ok(())
65	}
66
67	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: FlowChange) -> crate::Result<FlowChange> {
68		let operator = self.inner.operators.read().get(&node.id).unwrap().clone();
69		let result = operator.apply(txn, change, &self.inner.evaluator)?;
70		Ok(result)
71	}
72
73	fn process_change(
74		&self,
75		txn: &mut FlowTransaction,
76		flow: &Flow,
77		node: &FlowNode,
78		change: FlowChange,
79	) -> crate::Result<()> {
80		let node_type = &node.ty;
81		let changes = &node.outputs;
82
83		let change = match &node_type {
84			SourceInlineData {} => unimplemented!(),
85			_ => self.apply(txn, node, change)?,
86		};
87
88		if changes.is_empty() {
89		} else if changes.len() == 1 {
90			let output_id = changes[0];
91			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
92		} else {
93			let (last, rest) = changes.split_last().unwrap();
94			for output_id in rest {
95				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
96			}
97			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
98		}
99
100		Ok(())
101	}
102}