reifydb_sub_flow/engine/
process.rs1use reifydb_core::interface::FlowId;
5use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
6use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
7use tracing::instrument;
8
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11impl FlowEngine {
12 #[instrument(level = "debug", skip(self, txn), fields(flow_id = ?flow_id, origin = ?change.origin, version = change.version.0, diff_count = change.diffs.len()))]
13 pub fn process(&self, txn: &mut FlowTransaction, change: FlowChange, flow_id: FlowId) -> crate::Result<()> {
14 match change.origin {
15 FlowChangeOrigin::External(source) => {
16 let sources = self.inner.sources.read();
17 if let Some(node_registrations) = sources.get(&source) {
18 let node_registrations = node_registrations.clone();
20 drop(sources);
21
22 for (flow_id, node_id) in node_registrations {
23 let flows = self.inner.flows.read();
24 if let Some(flow) = flows.get(&flow_id) {
25 if let Some(node) = flow.get_node(&node_id) {
26 let flow = flow.clone();
27 let node = node.clone();
28 drop(flows);
29
30 self.process_change(
31 txn,
32 &flow,
33 &node,
34 FlowChange::internal(
35 node_id,
36 change.version,
37 change.diffs.clone(),
38 ),
39 )?;
40 } else {
41 drop(flows);
42 }
43 } else {
44 drop(flows);
45 }
46 }
47 }
48 }
49 FlowChangeOrigin::Internal(node_id) => {
50 let flows = self.inner.flows.read();
54 if let Some(flow) = flows.get(&flow_id) {
55 if let Some(node) = flow.get_node(&node_id) {
56 let flow = flow.clone();
57 let node = node.clone();
58 drop(flows);
59
60 self.process_change(txn, &flow, &node, change)?;
61 }
62 }
63 }
64 }
65 Ok(())
66 }
67
68 #[instrument(level = "trace", skip(self, txn), fields(node_id = ?node.id, input_diffs = change.diffs.len(), output_diffs))]
69 fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: FlowChange) -> crate::Result<FlowChange> {
70 let operator = self.inner.operators.read().get(&node.id).unwrap().clone();
71 let result = operator.apply(txn, change, &self.inner.evaluator)?;
72 tracing::Span::current().record("output_diffs", result.diffs.len());
73 Ok(result)
74 }
75
76 #[instrument(level = "trace", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, input_diffs = change.diffs.len(), output_diffs))]
77 fn process_change(
78 &self,
79 txn: &mut FlowTransaction,
80 flow: &Flow,
81 node: &FlowNode,
82 change: FlowChange,
83 ) -> crate::Result<()> {
84 let node_type = &node.ty;
85 let changes = &node.outputs;
86
87 let change = match &node_type {
88 SourceInlineData {} => unimplemented!(),
89 _ => {
90 let result = self.apply(txn, node, change)?;
91 tracing::Span::current().record("output_diffs", result.diffs.len());
92 result
93 }
94 };
95
96 if changes.is_empty() {
97 } else if changes.len() == 1 {
98 let output_id = changes[0];
99 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
100 } else {
101 let (last, rest) = changes.split_last().unwrap();
102 for output_id in rest {
103 self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
104 }
105 self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
106 }
107
108 Ok(())
109 }
110}