reifydb_sub_flow/engine/
process.rs1use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
5use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
6
7use crate::{engine::FlowEngine, transaction::FlowTransaction};
8
9impl FlowEngine {
10 pub fn process(&self, txn: &mut FlowTransaction, change: FlowChange) -> crate::Result<()> {
11 match change.origin {
12 FlowChangeOrigin::External(source) => {
13 let sources = self.inner.sources.read();
14 if let Some(node_registrations) = sources.get(&source) {
15 let node_registrations = node_registrations.clone();
17 drop(sources);
18
19 for (flow_id, node_id) in node_registrations {
20 let flows = self.inner.flows.read();
21 if let Some(flow) = flows.get(&flow_id) {
22 if let Some(node) = flow.get_node(&node_id) {
23 let flow = flow.clone();
24 let node = node.clone();
25 drop(flows);
26
27 self.process_change(
28 txn,
29 &flow,
30 &node,
31 FlowChange::internal(
32 node_id,
33 change.version,
34 change.diffs.clone(),
35 ),
36 )?;
37 } else {
38 drop(flows);
39 }
40 } else {
41 drop(flows);
42 }
43 }
44 }
45 }
46 FlowChangeOrigin::Internal(node_id) => {
47 let flows = self.inner.flows.read();
51 for (_flow_id, flow) in flows.iter() {
52 if let Some(node) = flow.get_node(&node_id) {
53 let flow = flow.clone();
54 let node = node.clone();
55 drop(flows);
56
57 self.process_change(txn, &flow, &node, change)?;
58 return Ok(());
59 }
60 }
61 drop(flows);
62 }
63 }
64 Ok(())
65 }
66
67 fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: FlowChange) -> crate::Result<FlowChange> {
68 let operator = self.inner.operators.read().get(&node.id).unwrap().clone();
69 let result = operator.apply(txn, change, &self.inner.evaluator)?;
70 Ok(result)
71 }
72
73 fn process_change(
74 &self,
75 txn: &mut FlowTransaction,
76 flow: &Flow,
77 node: &FlowNode,
78 change: FlowChange,
79 ) -> crate::Result<()> {
80 let node_type = &node.ty;
81 let changes = &node.outputs;
82
83 let change = match &node_type {
84 SourceInlineData {} => unimplemented!(),
85 _ => self.apply(txn, node, change)?,
86 };
87
88 if changes.is_empty() {
89 } else if changes.len() == 1 {
90 let output_id = changes[0];
91 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
92 } else {
93 let (last, rest) = changes.split_last().unwrap();
94 for output_id in rest {
95 self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
96 }
97 self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
98 }
99
100 Ok(())
101 }
102}