reifydb_sub_flow/engine/
process.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
5
6use crate::{
7	engine::FlowEngine,
8	flow::{FlowChange, FlowChangeOrigin},
9	transaction::FlowTransaction,
10};
11
12impl FlowEngine {
13	pub fn process(&self, txn: &mut FlowTransaction, change: FlowChange) -> crate::Result<()> {
14		match change.origin {
15			FlowChangeOrigin::External(source) => {
16				let sources = self.inner.sources.read();
17				if let Some(node_registrations) = sources.get(&source) {
18					// Clone the node registrations to avoid holding the lock while processing
19					let node_registrations = node_registrations.clone();
20					drop(sources);
21
22					for (flow_id, node_id) in node_registrations {
23						let flows = self.inner.flows.read();
24						if let Some(flow) = flows.get(&flow_id) {
25							if let Some(node) = flow.get_node(&node_id) {
26								let flow = flow.clone();
27								let node = node.clone();
28								drop(flows);
29
30								self.process_change(
31									txn,
32									&flow,
33									&node,
34									FlowChange::internal(
35										node_id,
36										change.version,
37										change.diffs.clone(),
38									),
39								)?;
40							} else {
41								drop(flows);
42							}
43						} else {
44							drop(flows);
45						}
46					}
47				}
48			}
49			FlowChangeOrigin::Internal(node_id) => {
50				// Internal changes are already scoped to a specific node
51				// This path is used by the partition logic to directly process a node's changes
52				// Find which flow this node belongs to by checking all registered flows
53				let flows = self.inner.flows.read();
54				for (_flow_id, flow) in flows.iter() {
55					if let Some(node) = flow.get_node(&node_id) {
56						let flow = flow.clone();
57						let node = node.clone();
58						drop(flows);
59
60						self.process_change(txn, &flow, &node, change)?;
61						return Ok(());
62					}
63				}
64				drop(flows);
65			}
66		}
67		Ok(())
68	}
69
70	fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: FlowChange) -> crate::Result<FlowChange> {
71		let operator = self.inner.operators.read().get(&node.id).unwrap().clone();
72		let result = operator.apply(txn, change, &self.inner.evaluator)?;
73		Ok(result)
74	}
75
76	fn process_change(
77		&self,
78		txn: &mut FlowTransaction,
79		flow: &Flow,
80		node: &FlowNode,
81		change: FlowChange,
82	) -> crate::Result<()> {
83		let node_type = &node.ty;
84		let changes = &node.outputs;
85
86		let change = match &node_type {
87			SourceInlineData {} => unimplemented!(),
88			_ => self.apply(txn, node, change)?,
89		};
90
91		if changes.is_empty() {
92		} else if changes.len() == 1 {
93			let output_id = changes[0];
94			self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
95		} else {
96			let (last, rest) = changes.split_last().unwrap();
97			for output_id in rest {
98				self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
99			}
100			self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
101		}
102
103		Ok(())
104	}
105}