1use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceFlow, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{
8 CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view,
9 transaction::CatalogFlowQueryOperations,
10};
11use reifydb_core::{
12 CommitVersion, Error,
13 interface::{FlowId, FlowNodeId, SourceId},
14};
15use reifydb_engine::StandardCommandTransaction;
16use reifydb_rql::flow::{
17 Flow, FlowNode, FlowNodeType,
18 FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Sort, Take, Union, Window},
19};
20use reifydb_type::internal;
21
22use super::eval::evaluate_operator_config;
23use crate::{
24 engine::FlowEngine,
25 operator::{
26 ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator, Operators,
27 SinkViewOperator, SortOperator, SourceFlowOperator, SourceTableOperator, SourceViewOperator,
28 TakeOperator, WindowOperator,
29 },
30};
31
32impl FlowEngine {
33 pub fn register_without_backfill(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
34 self.register(txn, flow, None)
35 }
36
37 pub fn register_with_backfill(
38 &self,
39 txn: &mut StandardCommandTransaction,
40 flow: Flow,
41 flow_creation_version: CommitVersion,
42 ) -> crate::Result<()> {
43 self.register(txn, flow, Some(flow_creation_version))
44 }
45
46 fn register(
47 &self,
48 txn: &mut StandardCommandTransaction,
49 flow: Flow,
50 flow_creation_version: Option<CommitVersion>,
51 ) -> crate::Result<()> {
52 debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
53
54 for node_id in flow.topological_order()? {
55 let node = flow.get_node(&node_id).unwrap();
56 self.add(txn, &flow, node)?;
57 }
58
59 if let Some(flow_creation_version) = flow_creation_version {
60 self.inner.flow_creation_versions.write().insert(flow.id, flow_creation_version);
61
62 if let Err(e) = self.load_initial_data(txn, &flow, flow_creation_version) {
63 self.inner.flow_creation_versions.write().remove(&flow.id);
64 return Err(e);
65 }
66 }
67
68 self.inner.analyzer.write().add(flow.clone());
70 self.inner.flows.write().insert(flow.id, flow);
71
72 Ok(())
73 }
74
75 fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
76 debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
77 let node = node.clone();
78
79 match node.ty {
80 SourceInlineData {
81 ..
82 } => {
83 unimplemented!()
84 }
85 SourceTable {
86 table,
87 } => {
88 let table = txn.get_table(table)?;
89
90 self.add_source(flow.id, node.id, SourceId::table(table.id));
91 self.inner.operators.write().insert(
92 node.id,
93 Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
94 );
95 }
96 SourceView {
97 view,
98 } => {
99 let view = txn.get_view(view)?;
100 self.add_source(flow.id, node.id, SourceId::view(view.id));
101 self.inner.operators.write().insert(
102 node.id,
103 Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
104 );
105 }
106 SourceFlow {
107 flow: source_flow,
108 } => {
109 let source_flow_def = txn.get_flow(source_flow)?;
110 self.add_source(flow.id, node.id, SourceId::flow(source_flow_def.id));
111 self.inner.operators.write().insert(
112 node.id,
113 Arc::new(Operators::SourceFlow(SourceFlowOperator::new(
114 node.id,
115 source_flow_def,
116 ))),
117 );
118 }
119 SinkView {
120 view,
121 } => {
122 let parent = self
123 .inner
124 .operators
125 .read()
126 .get(&node.inputs[0])
127 .ok_or_else(|| Error(internal!("Parent operator not found")))?
128 .clone();
129
130 self.add_sink(flow.id, node.id, SourceId::view(*view));
131 self.inner.operators.write().insert(
132 node.id,
133 Arc::new(Operators::SinkView(SinkViewOperator::new(
134 parent,
135 node.id,
136 resolve_view(txn, view)?,
137 ))),
138 );
139 }
140 Filter {
141 conditions,
142 } => {
143 let parent = self
144 .inner
145 .operators
146 .read()
147 .get(&node.inputs[0])
148 .ok_or_else(|| Error(internal!("Parent operator not found")))?
149 .clone();
150 self.inner.operators.write().insert(
151 node.id,
152 Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
153 );
154 }
155 Map {
156 expressions,
157 } => {
158 let parent = self
159 .inner
160 .operators
161 .read()
162 .get(&node.inputs[0])
163 .ok_or_else(|| Error(internal!("Parent operator not found")))?
164 .clone();
165 self.inner.operators.write().insert(
166 node.id,
167 Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
168 );
169 }
170 Extend {
171 expressions,
172 } => {
173 let parent = self
174 .inner
175 .operators
176 .read()
177 .get(&node.inputs[0])
178 .ok_or_else(|| Error(internal!("Parent operator not found")))?
179 .clone();
180 self.inner.operators.write().insert(
181 node.id,
182 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
183 );
184 }
185 Sort {
186 by: _,
187 } => {
188 let parent = self
189 .inner
190 .operators
191 .read()
192 .get(&node.inputs[0])
193 .ok_or_else(|| Error(internal!("Parent operator not found")))?
194 .clone();
195 self.inner.operators.write().insert(
196 node.id,
197 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
198 );
199 }
200 Take {
201 limit,
202 } => {
203 let parent = self
204 .inner
205 .operators
206 .read()
207 .get(&node.inputs[0])
208 .ok_or_else(|| Error(internal!("Parent operator not found")))?
209 .clone();
210 self.inner.operators.write().insert(
211 node.id,
212 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
213 );
214 }
215 Join {
216 join_type,
217 left,
218 right,
219 alias,
220 } => {
221 if node.inputs.len() != 2 {
224 return Err(Error(internal!("Join node must have exactly 2 inputs")));
225 }
226
227 let left_node = node.inputs[0];
228 let right_node = node.inputs[1];
229
230 let operators = self.inner.operators.read();
231 let left_parent = operators
232 .get(&left_node)
233 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
234 .clone();
235
236 let right_parent = operators
237 .get(&right_node)
238 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
239 .clone();
240 drop(operators);
241
242 self.inner.operators.write().insert(
243 node.id,
244 Arc::new(Operators::Join(JoinOperator::new(
245 left_parent,
246 right_parent,
247 node.id,
248 join_type,
249 left_node,
250 right_node,
251 left,
252 right,
253 alias,
254 self.inner.executor.clone(),
255 ))),
256 );
257 }
258 Distinct {
259 expressions,
260 } => {
261 let parent = self
262 .inner
263 .operators
264 .read()
265 .get(&node.inputs[0])
266 .ok_or_else(|| Error(internal!("Parent operator not found")))?
267 .clone();
268 self.inner.operators.write().insert(
269 node.id,
270 Arc::new(Operators::Distinct(DistinctOperator::new(
271 parent,
272 node.id,
273 expressions,
274 ))),
275 );
276 }
277 Union {} => unimplemented!(),
279 Apply {
280 operator_name,
281 expressions,
282 } => {
283 let parent = self
284 .inner
285 .operators
286 .read()
287 .get(&node.inputs[0])
288 .ok_or_else(|| Error(internal!("Parent operator not found")))?
289 .clone();
290
291 let operator = if self.is_ffi_operator(operator_name.as_str()) {
293 let config = evaluate_operator_config(
294 expressions.as_slice(),
295 &self.inner.evaluator,
296 )?;
297 self.create_ffi_operator(operator_name.as_str(), node.id, &config)?
298 } else {
299 self.inner.registry.create_operator(
301 operator_name.as_str(),
302 node.id,
303 expressions.as_slice(),
304 )?
305 };
306
307 self.inner.operators.write().insert(
308 node.id,
309 Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
310 );
311 }
312 Aggregate {
313 ..
314 } => unimplemented!(),
315 Window {
316 window_type,
317 size,
318 slide,
319 group_by,
320 aggregations,
321 min_events,
322 max_window_count,
323 max_window_age,
324 } => {
325 let parent = self
326 .inner
327 .operators
328 .read()
329 .get(&node.inputs[0])
330 .ok_or_else(|| Error(internal!("Parent operator not found")))?
331 .clone();
332 let operator = WindowOperator::new(
333 parent,
334 node.id,
335 window_type.clone(),
336 size.clone(),
337 slide.clone(),
338 group_by.clone(),
339 aggregations.clone(),
340 min_events.clone(),
341 max_window_count.clone(),
342 max_window_age.clone(),
343 );
344 self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
345 }
346 }
347
348 Ok(())
349 }
350
351 fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
352 let mut sources = self.inner.sources.write();
353 let nodes = sources.entry(source).or_insert_with(Vec::new);
354
355 let entry = (flow, node);
356 if !nodes.contains(&entry) {
357 nodes.push(entry);
358 }
359 }
360
361 fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
362 let mut sinks = self.inner.sinks.write();
363 let nodes = sinks.entry(sink).or_insert_with(Vec::new);
364
365 let entry = (flow, node);
366 if !nodes.contains(&entry) {
367 nodes.push(entry);
368 }
369 }
370}