reifydb_sub_flow/engine/
process.rs1use std::{future::Future, pin::Pin};
5
6use reifydb_core::interface::FlowId;
7use reifydb_flow_operator_sdk::{FlowChange, FlowChangeOrigin};
8use reifydb_rql::flow::{Flow, FlowNode, FlowNodeType::SourceInlineData};
9use tracing::{instrument, trace_span};
10
11use crate::{engine::FlowEngine, transaction::FlowTransaction};
12
13impl FlowEngine {
14 #[instrument(name = "flow::process", level = "debug", skip(self, txn), fields(flow_id = ?flow_id, origin = ?change.origin, version = change.version.0, diff_count = change.diffs.len()))]
15 pub async fn process(
16 &self,
17 txn: &mut FlowTransaction,
18 change: FlowChange,
19 flow_id: FlowId,
20 ) -> crate::Result<()> {
21 match change.origin {
22 FlowChangeOrigin::External(source) => {
23 let node_registrations = {
24 let sources = self.inner.sources.read().await;
25 sources.get(&source).cloned()
26 };
27
28 if let Some(node_registrations) = node_registrations {
29 for (flow_id, node_id) in node_registrations {
30 let flow_and_node = {
31 let flows = self.inner.flows.read().await;
32 flows.get(&flow_id).and_then(|flow| {
33 flow.get_node(&node_id)
34 .map(|node| (flow.clone(), node.clone()))
35 })
36 };
37
38 if let Some((flow, node)) = flow_and_node {
39 self.process_change(
40 txn,
41 &flow,
42 &node,
43 FlowChange::internal(
44 node_id,
45 change.version,
46 change.diffs.clone(),
47 ),
48 )
49 .await?;
50 }
51 }
52 }
53 }
54 FlowChangeOrigin::Internal(node_id) => {
55 let flow_and_node = {
59 let flows = self.inner.flows.read().await;
60 flows.get(&flow_id).and_then(|flow| {
61 flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
62 })
63 };
64
65 if let Some((flow, node)) = flow_and_node {
66 self.process_change(txn, &flow, &node, change).await?;
67 }
68 }
69 }
70 Ok(())
71 }
72
73 #[instrument(name = "flow::apply", level = "trace", skip(self, txn), fields(node_id = ?node.id, input_diffs = change.diffs.len(), output_diffs))]
74 async fn apply(
75 &self,
76 txn: &mut FlowTransaction,
77 node: &FlowNode,
78 change: FlowChange,
79 ) -> crate::Result<FlowChange> {
80 let operator = self.inner.operators.read().await.get(&node.id).unwrap().clone();
81 {
82 let _span = trace_span!("flow::operator_apply", node_id = ?node.id, operator_type = ?node.ty)
83 .entered();
84 }
85 let result = operator.apply(txn, change, &self.inner.evaluator).await?;
86 tracing::Span::current().record("output_diffs", result.diffs.len());
87 Ok(result)
88 }
89
90 #[instrument(name = "flow::process::change", level = "trace", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, input_diffs = change.diffs.len(), output_diffs))]
91 fn process_change<'a>(
92 &'a self,
93 txn: &'a mut FlowTransaction,
94 flow: &'a Flow,
95 node: &'a FlowNode,
96 change: FlowChange,
97 ) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
98 Box::pin(async move {
99 let node_type = &node.ty;
100 let changes = &node.outputs;
101
102 let change = match &node_type {
103 SourceInlineData {} => unimplemented!(),
104 _ => {
105 let result = self.apply(txn, node, change).await?;
106 tracing::Span::current().record("output_diffs", result.diffs.len());
107 result
108 }
109 };
110
111 if changes.is_empty() {
112 } else if changes.len() == 1 {
113 let output_id = changes[0];
114 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change).await?;
115 } else {
116 let (last, rest) = changes.split_last().unwrap();
117 for output_id in rest {
118 self.process_change(
119 txn,
120 flow,
121 flow.get_node(output_id).unwrap(),
122 change.clone(),
123 )
124 .await?;
125 }
126 self.process_change(txn, flow, flow.get_node(last).unwrap(), change).await?;
127 }
128
129 Ok(())
130 })
131 }
132}