reifydb_sub_flow/engine/
process.rs1use reifydb_core::interface::{
5 catalog::flow::FlowId,
6 change::{Change, ChangeOrigin},
7};
8use reifydb_rql::flow::{
9 flow::FlowDag,
10 node::{FlowNode, FlowNodeType::SourceInlineData},
11};
12use tracing::{Span, instrument};
13
14use crate::{engine::FlowEngine, transaction::FlowTransaction};
15
16impl FlowEngine {
17 #[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
18 flow_id = ?flow_id,
19 origin = ?change.origin,
20 version = change.version.0,
21 diff_count = change.diffs.len(),
22 nodes_processed = tracing::field::Empty
23 ))]
24 pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> reifydb_type::Result<()> {
25 let mut nodes_processed = 0;
26
27 match change.origin {
28 ChangeOrigin::Primitive(source) => {
29 let node_registrations = self.sources.get(&source).cloned();
30
31 if let Some(node_registrations) = node_registrations {
32 for (registered_flow_id, node_id) in node_registrations {
33 if registered_flow_id != flow_id {
35 continue;
36 }
37
38 let flow_and_node =
39 self.flows.get(®istered_flow_id).and_then(|flow| {
40 flow.get_node(&node_id)
41 .map(|node| (flow.clone(), node.clone()))
42 });
43
44 if let Some((flow, node)) = flow_and_node {
45 self.process_change(
46 txn,
47 &flow,
48 &node,
49 Change::from_flow(
50 node_id,
51 change.version,
52 change.diffs.clone(),
53 ),
54 )?;
55 nodes_processed += 1;
56 }
57 }
58 }
59 }
60 ChangeOrigin::Flow(node_id) => {
61 let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
65 flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
66 });
67
68 if let Some((flow, node)) = flow_and_node {
69 self.process_change(txn, &flow, &node, change)?;
70 nodes_processed += 1;
71 }
72 }
73 }
74
75 Span::current().record("nodes_processed", nodes_processed);
76 Ok(())
77 }
78
79 #[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
80 node_id = ?node.id,
81 node_type = ?node.ty,
82 input_diffs = change.diffs.len(),
83 output_diffs = tracing::field::Empty,
84 lock_wait_us = tracing::field::Empty,
85 apply_time_us = tracing::field::Empty
86 ))]
87 fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Change) -> reifydb_type::Result<Change> {
88 let lock_start = self.clock.instant();
89 let operator = self.operators.get(&node.id).unwrap().clone();
90 Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
91
92 let apply_start = self.clock.instant();
93 let result = operator.apply(txn, change)?;
94 Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
95 Span::current().record("output_diffs", result.diffs.len());
96 Ok(result)
97 }
98
99 #[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
100 flow_id = ?flow.id,
101 node_id = ?node.id,
102 input_diffs = change.diffs.len(),
103 output_diffs = tracing::field::Empty,
104 downstream_count = node.outputs.len(),
105 propagation_time_us = tracing::field::Empty
106 ))]
107 fn process_change(
108 &self,
109 txn: &mut FlowTransaction,
110 flow: &FlowDag,
111 node: &FlowNode,
112 change: Change,
113 ) -> reifydb_type::Result<()> {
114 let node_type = &node.ty;
115 let changes = &node.outputs;
116
117 let change = match &node_type {
118 SourceInlineData {} => unimplemented!(),
119 _ => {
120 let result = self.apply(txn, node, change)?;
121 Span::current().record("output_diffs", result.diffs.len());
122 result
123 }
124 };
125
126 let propagation_start = self.clock.instant();
127 if changes.is_empty() {
128 } else if changes.len() == 1 {
129 let output_id = changes[0];
130 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
131 } else {
132 let (last, rest) = changes.split_last().unwrap();
133 for output_id in rest {
134 self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
135 }
136 self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
137 }
138 Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
139
140 Ok(())
141 }
142}