1use std::sync::Arc;
5
6use reifydb_core::interface::{
7 catalog::flow::FlowId,
8 change::{Change, ChangeOrigin},
9};
10use reifydb_rql::flow::{
11 flow::FlowDag,
12 node::{FlowNode, FlowNodeType::SourceInlineData},
13};
14use reifydb_type::{Result, value::datetime::DateTime};
15use tracing::{Span, field, instrument};
16
17use crate::{engine::FlowEngine, transaction::FlowTransaction};
18
19impl FlowEngine {
20 #[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
21 flow_id = ?flow_id,
22 origin = ?change.origin,
23 version = change.version.0,
24 diff_count = change.diffs.len(),
25 nodes_processed = field::Empty
26 ))]
27 pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> Result<()> {
28 let mut nodes_processed = 0;
29
30 match change.origin {
31 ChangeOrigin::Shape(source) => {
32 let node_registrations = self.sources.get(&source).cloned();
33
34 if let Some(node_registrations) = node_registrations {
35 for (registered_flow_id, node_id) in node_registrations {
36 if registered_flow_id != flow_id {
38 continue;
39 }
40
41 let flow_and_node =
42 self.flows.get(®istered_flow_id).and_then(|flow| {
43 flow.get_node(&node_id)
44 .map(|node| (flow.clone(), node.clone()))
45 });
46
47 if let Some((flow, node)) = flow_and_node {
48 self.process_change(
49 txn,
50 &flow,
51 &node,
52 Arc::new(Change::from_flow(
53 node_id,
54 change.version,
55 change.diffs.clone(),
56 change.changed_at,
57 )),
58 )?;
59 nodes_processed += 1;
60 }
61 }
62 }
63 }
64 ChangeOrigin::Flow(node_id) => {
65 let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
69 flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
70 });
71
72 if let Some((flow, node)) = flow_and_node {
73 self.process_change(txn, &flow, &node, Arc::new(change))?;
74 nodes_processed += 1;
75 }
76 }
77 }
78
79 Span::current().record("nodes_processed", nodes_processed);
80 Ok(())
81 }
82
83 #[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
84 node_id = ?node.id,
85 node_type = ?node.ty,
86 input_diffs = change.diffs.len(),
87 output_diffs = field::Empty,
88 lock_wait_us = field::Empty,
89 apply_time_us = field::Empty
90 ))]
91 fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Arc<Change>) -> Result<Change> {
92 let lock_start = self.runtime_context.clock.instant();
93 let operator = self.operators.get(&node.id).unwrap().clone();
94 Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
95
96 let owned = Arc::try_unwrap(change).unwrap_or_else(|arc| (*arc).clone());
99
100 let apply_start = self.runtime_context.clock.instant();
101 let result = operator.apply(txn, owned)?;
102 Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
103 Span::current().record("output_diffs", result.diffs.len());
104 Ok(result)
105 }
106
107 #[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
108 flow_id = ?flow.id,
109 node_id = ?node.id,
110 input_diffs = change.diffs.len(),
111 output_diffs = field::Empty,
112 downstream_count = node.outputs.len(),
113 propagation_time_us = field::Empty
114 ))]
115 fn process_change(
116 &self,
117 txn: &mut FlowTransaction,
118 flow: &FlowDag,
119 node: &FlowNode,
120 change: Arc<Change>,
121 ) -> Result<()> {
122 let node_type = &node.ty;
123 let changes = &node.outputs;
124
125 let change: Arc<Change> = match &node_type {
126 SourceInlineData {} => unimplemented!(),
127 _ => {
128 let result = self.apply(txn, node, change)?;
129 Span::current().record("output_diffs", result.diffs.len());
130 Arc::new(result)
131 }
132 };
133
134 let propagation_start = self.runtime_context.clock.instant();
135 if changes.is_empty() {
136 } else if changes.len() == 1 {
137 let output_id = changes[0];
138 self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
139 } else {
140 let (last, rest) = changes.split_last().unwrap();
141 for output_id in rest {
142 self.process_change(txn, flow, flow.get_node(output_id).unwrap(), Arc::clone(&change))?;
144 }
145 self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
148 }
149 Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
150
151 Ok(())
152 }
153
154 #[instrument(name = "flow::engine::process_tick", level = "debug", skip(self, txn), fields(
155 flow_id = ?flow_id,
156 timestamp = %timestamp
157 ))]
158 pub fn process_tick(&self, txn: &mut FlowTransaction, flow_id: FlowId, timestamp: DateTime) -> Result<()> {
159 let flow = match self.flows.get(&flow_id) {
160 Some(f) => f.clone(),
161 None => return Ok(()),
162 };
163
164 for node_id in flow.topological_order()? {
165 let operator = match self.operators.get(&node_id) {
166 Some(op) => op.clone(),
167 None => continue,
168 };
169
170 if let Some(change) = operator.tick(txn, timestamp)? {
171 let node = flow.get_node(&node_id).unwrap();
172 let outputs = &node.outputs;
173 if outputs.is_empty() {
174 } else if outputs.len() == 1 {
175 self.process_change(
176 txn,
177 &flow,
178 flow.get_node(&outputs[0]).unwrap(),
179 Arc::new(change),
180 )?;
181 } else {
182 let arc = Arc::new(change);
183 let (last, rest) = outputs.split_last().unwrap();
184 for output_id in rest {
185 self.process_change(
186 txn,
187 &flow,
188 flow.get_node(output_id).unwrap(),
189 Arc::clone(&arc),
190 )?;
191 }
192 self.process_change(txn, &flow, flow.get_node(last).unwrap(), arc)?;
193 }
194 }
195 }
196 Ok(())
197 }
198}