1use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceFlow, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{
8 CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view,
9 transaction::CatalogFlowQueryOperations,
10};
11use reifydb_core::{
12 CommitVersion, Error,
13 interface::{FlowId, FlowNodeId, SourceId},
14};
15use reifydb_engine::StandardCommandTransaction;
16use reifydb_rql::flow::{
17 Flow, FlowNode, FlowNodeType,
18 FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Merge, Sort, Take, Window},
19};
20use reifydb_type::internal;
21use tracing::instrument;
22
23use super::eval::evaluate_operator_config;
24use crate::{
25 engine::FlowEngine,
26 operator::{
27 ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator,
28 MergeOperator, Operators, SinkViewOperator, SortOperator, SourceFlowOperator, SourceTableOperator,
29 SourceViewOperator, TakeOperator, WindowOperator,
30 },
31};
32
33impl FlowEngine {
34 #[instrument(name = "flow::register::without_backfill", level = "info", skip(self, txn), fields(flow_id = ?flow.id))]
35 pub async fn register_without_backfill(
36 &self,
37 txn: &mut StandardCommandTransaction,
38 flow: Flow,
39 ) -> crate::Result<()> {
40 self.register(txn, flow, None).await
41 }
42
43 #[instrument(name = "flow::register::with_backfill", level = "info", skip(self, txn), fields(flow_id = ?flow.id, backfill_version = flow_creation_version.0))]
44 pub async fn register_with_backfill(
45 &self,
46 txn: &mut StandardCommandTransaction,
47 flow: Flow,
48 flow_creation_version: CommitVersion,
49 ) -> crate::Result<()> {
50 self.register(txn, flow, Some(flow_creation_version)).await
51 }
52
53 #[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id, has_backfill = flow_creation_version.is_some()))]
54 async fn register(
55 &self,
56 txn: &mut StandardCommandTransaction,
57 flow: Flow,
58 flow_creation_version: Option<CommitVersion>,
59 ) -> crate::Result<()> {
60 debug_assert!(!self.inner.flows.read().await.contains_key(&flow.id), "Flow already registered");
61
62 for node_id in flow.topological_order()? {
63 let node = flow.get_node(&node_id).unwrap();
64 self.add(txn, &flow, node).await?;
65 }
66
67 if let Some(flow_creation_version) = flow_creation_version {
68 self.inner.flow_creation_versions.write().await.insert(flow.id, flow_creation_version);
69
70 if let Err(e) = self.load_initial_data(txn, &flow, flow_creation_version).await {
71 self.inner.flow_creation_versions.write().await.remove(&flow.id);
72 return Err(e);
73 }
74 }
75
76 self.inner.analyzer.write().await.add(flow.clone());
78 self.inner.flows.write().await.insert(flow.id, flow);
79
80 Ok(())
81 }
82
83 #[instrument(name = "flow::register::add_node", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?std::mem::discriminant(&node.ty)))]
84 async fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
85 debug_assert!(!self.inner.operators.read().await.contains_key(&node.id), "Operator already registered");
86 let node = node.clone();
87
88 match node.ty {
89 SourceInlineData {
90 ..
91 } => {
92 unimplemented!()
93 }
94 SourceTable {
95 table,
96 } => {
97 let table = txn.get_table(table).await?;
98
99 self.add_source(flow.id, node.id, SourceId::table(table.id)).await;
100 self.inner.operators.write().await.insert(
101 node.id,
102 Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
103 );
104 }
105 SourceView {
106 view,
107 } => {
108 let view = txn.get_view(view).await?;
109 self.add_source(flow.id, node.id, SourceId::view(view.id)).await;
110 self.inner.operators.write().await.insert(
111 node.id,
112 Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
113 );
114 }
115 SourceFlow {
116 flow: source_flow,
117 } => {
118 let source_flow_def = txn.get_flow(source_flow).await?;
119 self.add_source(flow.id, node.id, SourceId::flow(source_flow_def.id)).await;
120 self.inner.operators.write().await.insert(
121 node.id,
122 Arc::new(Operators::SourceFlow(SourceFlowOperator::new(
123 node.id,
124 source_flow_def,
125 ))),
126 );
127 }
128 SinkView {
129 view,
130 } => {
131 let parent = self
132 .inner
133 .operators
134 .read()
135 .await
136 .get(&node.inputs[0])
137 .ok_or_else(|| Error(internal!("Parent operator not found")))?
138 .clone();
139
140 self.add_sink(flow.id, node.id, SourceId::view(*view)).await;
141 let resolved = resolve_view(txn, view).await?;
142 self.inner.operators.write().await.insert(
143 node.id,
144 Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
145 );
146 }
147 Filter {
148 conditions,
149 } => {
150 let parent = self
151 .inner
152 .operators
153 .read()
154 .await
155 .get(&node.inputs[0])
156 .ok_or_else(|| Error(internal!("Parent operator not found")))?
157 .clone();
158 self.inner.operators.write().await.insert(
159 node.id,
160 Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
161 );
162 }
163 Map {
164 expressions,
165 } => {
166 let parent = self
167 .inner
168 .operators
169 .read()
170 .await
171 .get(&node.inputs[0])
172 .ok_or_else(|| Error(internal!("Parent operator not found")))?
173 .clone();
174 self.inner.operators.write().await.insert(
175 node.id,
176 Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
177 );
178 }
179 Extend {
180 expressions,
181 } => {
182 let parent = self
183 .inner
184 .operators
185 .read()
186 .await
187 .get(&node.inputs[0])
188 .ok_or_else(|| Error(internal!("Parent operator not found")))?
189 .clone();
190 self.inner.operators.write().await.insert(
191 node.id,
192 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
193 );
194 }
195 Sort {
196 by: _,
197 } => {
198 let parent = self
199 .inner
200 .operators
201 .read()
202 .await
203 .get(&node.inputs[0])
204 .ok_or_else(|| Error(internal!("Parent operator not found")))?
205 .clone();
206 self.inner.operators.write().await.insert(
207 node.id,
208 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
209 );
210 }
211 Take {
212 limit,
213 } => {
214 let parent = self
215 .inner
216 .operators
217 .read()
218 .await
219 .get(&node.inputs[0])
220 .ok_or_else(|| Error(internal!("Parent operator not found")))?
221 .clone();
222 self.inner.operators.write().await.insert(
223 node.id,
224 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
225 );
226 }
227 Join {
228 join_type,
229 left,
230 right,
231 alias,
232 } => {
233 if node.inputs.len() != 2 {
236 return Err(Error(internal!("Join node must have exactly 2 inputs")));
237 }
238
239 let left_node = node.inputs[0];
240 let right_node = node.inputs[1];
241
242 let operators = self.inner.operators.read().await;
243 let left_parent = operators
244 .get(&left_node)
245 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
246 .clone();
247
248 let right_parent = operators
249 .get(&right_node)
250 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
251 .clone();
252 drop(operators);
253
254 self.inner.operators.write().await.insert(
255 node.id,
256 Arc::new(Operators::Join(JoinOperator::new(
257 left_parent,
258 right_parent,
259 node.id,
260 join_type,
261 left_node,
262 right_node,
263 left,
264 right,
265 alias,
266 self.inner.executor.clone(),
267 ))),
268 );
269 }
270 Distinct {
271 expressions,
272 } => {
273 let parent = self
274 .inner
275 .operators
276 .read()
277 .await
278 .get(&node.inputs[0])
279 .ok_or_else(|| Error(internal!("Parent operator not found")))?
280 .clone();
281 self.inner.operators.write().await.insert(
282 node.id,
283 Arc::new(Operators::Distinct(DistinctOperator::new(
284 parent,
285 node.id,
286 expressions,
287 ))),
288 );
289 }
290 Merge {} => {
291 if node.inputs.len() < 2 {
293 return Err(Error(internal!("Merge node must have at least 2 inputs")));
294 }
295
296 let operators = self.inner.operators.read().await;
297 let mut parents = Vec::with_capacity(node.inputs.len());
298
299 for input_node_id in &node.inputs {
300 let parent = operators
301 .get(input_node_id)
302 .ok_or_else(|| {
303 Error(internal!(
304 "Parent operator not found for input {:?}",
305 input_node_id
306 ))
307 })?
308 .clone();
309 parents.push(parent);
310 }
311 drop(operators);
312
313 self.inner.operators.write().await.insert(
314 node.id,
315 Arc::new(Operators::Merge(MergeOperator::new(
316 node.id,
317 parents,
318 node.inputs.clone(),
319 ))),
320 );
321 }
322 Apply {
323 operator,
324 expressions,
325 } => {
326 let parent = self
327 .inner
328 .operators
329 .read()
330 .await
331 .get(&node.inputs[0])
332 .ok_or_else(|| Error(internal!("Parent operator not found")))?
333 .clone();
334
335 let operator = if self.is_ffi_operator(operator.as_str()) {
337 let config = evaluate_operator_config(
338 expressions.as_slice(),
339 &self.inner.evaluator,
340 )?;
341 self.create_ffi_operator(operator.as_str(), node.id, &config)?
342 } else {
343 self.inner.registry.create_operator(
345 operator.as_str(),
346 node.id,
347 expressions.as_slice(),
348 )?
349 };
350
351 self.inner.operators.write().await.insert(
352 node.id,
353 Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
354 );
355 }
356 Aggregate {
357 ..
358 } => unimplemented!(),
359 Window {
360 window_type,
361 size,
362 slide,
363 group_by,
364 aggregations,
365 min_events,
366 max_window_count,
367 max_window_age,
368 } => {
369 let parent = self
370 .inner
371 .operators
372 .read()
373 .await
374 .get(&node.inputs[0])
375 .ok_or_else(|| Error(internal!("Parent operator not found")))?
376 .clone();
377 let operator = WindowOperator::new(
378 parent,
379 node.id,
380 window_type.clone(),
381 size.clone(),
382 slide.clone(),
383 group_by.clone(),
384 aggregations.clone(),
385 min_events.clone(),
386 max_window_count.clone(),
387 max_window_age.clone(),
388 );
389 self.inner
390 .operators
391 .write()
392 .await
393 .insert(node.id, Arc::new(Operators::Window(operator)));
394 }
395 }
396
397 Ok(())
398 }
399
400 async fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
401 let mut sources = self.inner.sources.write().await;
402 let nodes = sources.entry(source).or_insert_with(Vec::new);
403
404 let entry = (flow, node);
405 if !nodes.contains(&entry) {
406 nodes.push(entry);
407 }
408 }
409
410 async fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
411 let mut sinks = self.inner.sinks.write().await;
412 let nodes = sinks.entry(sink).or_insert_with(Vec::new);
413
414 let entry = (flow, node);
415 if !nodes.contains(&entry) {
416 nodes.push(entry);
417 }
418 }
419}