reifydb_sub_flow/engine/
process.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{future::Future, pin::Pin};
5
6use reifydb_core::interface::FlowId;
7use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
8use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
9use tracing::{instrument, trace_span};
10
11use crate::{engine::FlowEngine, transaction::FlowTransaction};
12
13impl FlowEngine {
14	#[instrument(name = "flow::process", level = "debug", skip(self, txn), fields(flow_id = ?flow_id, origin = ?change.origin, version = change.version.0, diff_count = change.diffs.len()))]
15	pub async fn process(
16		&self,
17		txn: &mut FlowTransaction,
18		change: FlowChange,
19		flow_id: FlowId,
20	) -> crate::Result<()> {
21		match change.origin {
22			FlowChangeOrigin::External(source) => {
23				let node_registrations = {
24					let sources = self.inner.sources.read().await;
25					sources.get(&source).cloned()
26				};
27
28				if let Some(node_registrations) = node_registrations {
29					for (flow_id, node_id) in node_registrations {
30						let flow_and_node = {
31							let flows = self.inner.flows.read().await;
32							flows.get(&flow_id).and_then(|flow| {
33								flow.get_node(&node_id)
34									.map(|node| (flow.clone(), node.clone()))
35							})
36						};
37
38						if let Some((flow, node)) = flow_and_node {
39							self.process_change(
40								txn,
41								&flow,
42								&node,
43								FlowChange::internal(
44									node_id,
45									change.version,
46									change.diffs.clone(),
47								),
48							)
49							.await?;
50						}
51					}
52				}
53			}
54			FlowChangeOrigin::Internal(node_id) => {
55				// Internal changes are already scoped to a specific node
56				// This path is used by the partition logic to directly process a node's changes
57				// Use the flow_id parameter for direct lookup instead of iterating all flows
58				let flow_and_node = {
59					let flows = self.inner.flows.read().await;
60					flows.get(&flow_id).and_then(|flow| {
61						flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
62					})
63				};
64
65				if let Some((flow, node)) = flow_and_node {
66					self.process_change(txn, &flow, &node, change).await?;
67				}
68			}
69		}
70		Ok(())
71	}
72
73	#[instrument(name = "flow::apply", level = "trace", skip(self, txn), fields(node_id = ?node.id, input_diffs = change.diffs.len(), output_diffs))]
74	async fn apply(
75		&self,
76		txn: &mut FlowTransaction,
77		node: &FlowNode,
78		change: FlowChange,
79	) -> crate::Result<FlowChange> {
80		let operator = self.inner.operators.read().await.get(&node.id).unwrap().clone();
81		{
82			let _span = trace_span!("flow::operator_apply", node_id = ?node.id, operator_type = ?node.ty)
83				.entered();
84		}
85		let result = operator.apply(txn, change, &self.inner.evaluator).await?;
86		tracing::Span::current().record("output_diffs", result.diffs.len());
87		Ok(result)
88	}
89
90	#[instrument(name = "flow::process::change", level = "trace", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, input_diffs = change.diffs.len(), output_diffs))]
91	fn process_change<'a>(
92		&'a self,
93		txn: &'a mut FlowTransaction,
94		flow: &'a Flow,
95		node: &'a FlowNode,
96		change: FlowChange,
97	) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
98		Box::pin(async move {
99			let node_type = &node.ty;
100			let changes = &node.outputs;
101
102			let change = match &node_type {
103				SourceInlineData {} => unimplemented!(),
104				_ => {
105					let result = self.apply(txn, node, change).await?;
106					tracing::Span::current().record("output_diffs", result.diffs.len());
107					result
108				}
109			};
110
111			if changes.is_empty() {
112			} else if changes.len() == 1 {
113				let output_id = changes[0];
114				self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change).await?;
115			} else {
116				let (last, rest) = changes.split_last().unwrap();
117				for output_id in rest {
118					self.process_change(
119						txn,
120						flow,
121						flow.get_node(output_id).unwrap(),
122						change.clone(),
123					)
124					.await?;
125				}
126				self.process_change(txn, flow, flow.get_node(last).unwrap(), change).await?;
127			}
128
129			Ok(())
130		})
131	}
132}