1use reifydb_core::interface::{
5 catalog::flow::FlowId,
6 change::{Change, ChangeOrigin},
7};
8use reifydb_rql::flow::{
9 flow::FlowDag,
10 node::{FlowNode, FlowNodeType::SourceInlineData},
11};
12use reifydb_type::{Result, value::datetime::DateTime};
13use tracing::{Span, field, instrument};
14
15use crate::{engine::FlowEngine, transaction::FlowTransaction};
16
17impl FlowEngine {
18 #[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
19 flow_id = ?flow_id,
20 origin = ?change.origin,
21 version = change.version.0,
22 diff_count = change.diffs.len(),
23 nodes_processed = field::Empty
24 ))]
25 pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> Result<()> {
26 let mut nodes_processed = 0;
27
28 match change.origin {
29 ChangeOrigin::Schema(source) => {
30 let node_registrations = self.sources.get(&source).cloned();
31
32 if let Some(node_registrations) = node_registrations {
33 for (registered_flow_id, node_id) in node_registrations {
34 if registered_flow_id != flow_id {
36 continue;
37 }
38
39 let flow_and_node =
40 self.flows.get(®istered_flow_id).and_then(|flow| {
41 flow.get_node(&node_id)
42 .map(|node| (flow.clone(), node.clone()))
43 });
44
45 if let Some((flow, node)) = flow_and_node {
46 self.process_change(
47 txn,
48 &flow,
49 &node,
50 Change::from_flow(
51 node_id,
52 change.version,
53 change.diffs.clone(),
54 ),
55 )?;
56 nodes_processed += 1;
57 }
58 }
59 }
60 }
61 ChangeOrigin::Flow(node_id) => {
62 let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
66 flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
67 });
68
69 if let Some((flow, node)) = flow_and_node {
70 self.process_change(txn, &flow, &node, change)?;
71 nodes_processed += 1;
72 }
73 }
74 }
75
76 Span::current().record("nodes_processed", nodes_processed);
77 Ok(())
78 }
79
80 #[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
81 node_id = ?node.id,
82 node_type = ?node.ty,
83 input_diffs = change.diffs.len(),
84 output_diffs = field::Empty,
85 lock_wait_us = field::Empty,
86 apply_time_us = field::Empty
87 ))]
88 fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Change) -> Result<Change> {
89 let lock_start = self.runtime_context.clock.instant();
90 let operator = self.operators.get(&node.id).unwrap().clone();
91 Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
92
93 let apply_start = self.runtime_context.clock.instant();
94 let result = operator.apply(txn, change)?;
95 Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
96 Span::current().record("output_diffs", result.diffs.len());
97 Ok(result)
98 }
99
100 #[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
101 flow_id = ?flow.id,
102 node_id = ?node.id,
103 input_diffs = change.diffs.len(),
104 output_diffs = field::Empty,
105 downstream_count = node.outputs.len(),
106 propagation_time_us = field::Empty
107 ))]
108 fn process_change(
109 &self,
110 txn: &mut FlowTransaction,
111 flow: &FlowDag,
112 node: &FlowNode,
113 change: Change,
114 ) -> Result<()> {
115 let node_type = &node.ty;
116 let changes = &node.outputs;
117
118 let change = match &node_type {
119 SourceInlineData {} => unimplemented!(),
120 _ => {
121 let result = self.apply(txn, node, change)?;
122 Span::current().record("output_diffs", result.diffs.len());
123 result
124 }
125 };
126
127 let propagation_start = self.runtime_context.clock.instant();
128 if changes.is_empty() {
129 } else if changes.len() == 1 {
130 let output_id = changes[0];
131 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
132 } else {
133 let (last, rest) = changes.split_last().unwrap();
134 for output_id in rest {
135 self.process_change(txn, flow, flow.get_node(output_id).unwrap(), change.clone())?;
136 }
137 self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
138 }
139 Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
140
141 Ok(())
142 }
143
144 #[instrument(name = "flow::engine::process_tick", level = "debug", skip(self, txn), fields(
145 flow_id = ?flow_id,
146 timestamp = %timestamp
147 ))]
148 pub fn process_tick(&self, txn: &mut FlowTransaction, flow_id: FlowId, timestamp: DateTime) -> Result<()> {
149 let flow = match self.flows.get(&flow_id) {
150 Some(f) => f.clone(),
151 None => return Ok(()),
152 };
153
154 for node_id in flow.topological_order()? {
155 let operator = match self.operators.get(&node_id) {
156 Some(op) => op.clone(),
157 None => continue,
158 };
159
160 if let Some(change) = operator.tick(txn, timestamp)? {
161 let node = flow.get_node(&node_id).unwrap();
162 for &output_id in &node.outputs {
163 self.process_change(
164 txn,
165 &flow,
166 flow.get_node(&output_id).unwrap(),
167 change.clone(),
168 )?;
169 }
170 }
171 }
172 Ok(())
173 }
174}