reifydb_sub_flow/engine/
process.rs1use reifydb_core::interface::FlowId;
5use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
6use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
7use tracing::instrument;
8
9use crate::{engine::FlowEngine, transaction::FlowTransaction};
10
11impl FlowEngine {
12 #[instrument(level = "debug", skip(self, txn), fields(flow_id = ?flow_id, origin = ?change.origin, version = change.version.0, diff_count = change.diffs.len()))]
13 pub fn process(&self, txn: &mut FlowTransaction, change: FlowChange, flow_id: FlowId) -> crate::Result<()> {
14 match change.origin {
15 FlowChangeOrigin::External(source) => {
16 let sources = self.inner.sources.read();
17 if let Some(node_registrations) = sources.get(&source) {
18 let node_registrations = node_registrations.clone();
20 drop(sources);
21
22 for (flow_id, node_id) in node_registrations {
23 let flows = self.inner.flows.read();
24 if let Some(flow) = flows.get(&flow_id) {
25 if let Some(node) = flow.get_node(&node_id) {
26 let flow = flow.clone();
27 let node = node.clone();
28 drop(flows);
29
30 self.process_change(
31 txn,
32 &flow,
33 &node,
34 FlowChange::internal(
35 node_id,
36 change.version,
37 change.diffs.clone(),
38 ),
39 )?;
40 } else {
41 drop(flows);
42 }
43 } else {
44 drop(flows);
45 }
46 }
47 }
48 }
49 FlowChangeOrigin::Internal(node_id) => {
50 let flows = self.inner.flows.read();
54 if let Some(flow) = flows.get(&flow_id) {
55 if let Some(node) = flow.get_node(&node_id) {
56 let flow = flow.clone();
57 let node = node.clone();
58 drop(flows);
59
60 self.process_change(txn, &flow, &node, change)?;
61 }
62 }
63 }
64 }
65 Ok(())
66 }
67
68 #[instrument(level = "trace", skip(self, txn), fields(node_id = ?node.id))]
69 fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: FlowChange) -> crate::Result<FlowChange> {
70 let operator = self.inner.operators.read().get(&node.id).unwrap().clone();
71 let result = operator.apply(txn, change, &self.inner.evaluator)?;
72 Ok(result)
73 }
74
75 #[instrument(level = "trace", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, input_diffs = change.diffs.len()))]
76 fn process_change(
77 &self,
78 txn: &mut FlowTransaction,
79 flow: &Flow,
80 node: &FlowNode,
81 change: FlowChange,
82 ) -> crate::Result<()> {
83 let node_type = &node.ty;
84 let changes = &node.outputs;
85
86 let change = match &node_type {
87 SourceInlineData {} => unimplemented!(),
88 _ => self.apply(txn, node, change)?,
89 };
90
91 if changes.is_empty() {
92 } else if changes.len() == 1 {
93 let output_id = changes[0];
94 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
95 } else {
96 let (last, rest) = changes.split_last().unwrap();
97 for output_id in rest {
98 self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
99 }
100 self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
101 }
102
103 Ok(())
104 }
105}