1use reifydb_core::{
5 common::{JoinType, WindowKind},
6 interface::catalog::{
7 flow::{FlowEdgeId, FlowId, FlowNodeId},
8 id::{RingBufferId, SeriesId, SubscriptionId, TableId, ViewId},
9 series::SeriesKey,
10 shape::ShapeId,
11 },
12 sort::SortKey,
13};
14use serde::{Deserialize, Serialize};
15
16use crate::expression::Expression;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum FlowNodeType {
20 SourceInlineData {},
21 SourceTable {
22 table: TableId,
23 },
24 SourceView {
25 view: ViewId,
26 },
27 SourceFlow {
28 flow: FlowId,
29 },
30 SourceRingBuffer {
31 ringbuffer: RingBufferId,
32 },
33 SourceSeries {
34 series: SeriesId,
35 },
36 Filter {
37 conditions: Vec<Expression>,
38 },
39 Gate {
40 conditions: Vec<Expression>,
41 },
42 Map {
43 expressions: Vec<Expression>,
44 },
45 Extend {
46 expressions: Vec<Expression>,
47 },
48 Join {
49 join_type: JoinType,
50 left: Vec<Expression>,
51 right: Vec<Expression>,
52 alias: Option<String>,
53 #[serde(default)]
54 snapshot: bool,
55 },
56 Aggregate {
57 by: Vec<Expression>,
58 map: Vec<Expression>,
59 },
60 Append {},
61 Sort {
62 by: Vec<SortKey>,
63 },
64 Take {
65 limit: usize,
66 },
67 Distinct {
68 expressions: Vec<Expression>,
69 },
70 Apply {
71 operator: String,
72 expressions: Vec<Expression>,
73 },
74 SinkTableView {
75 view: ViewId,
76 table: TableId,
77 },
78 SinkRingBufferView {
79 view: ViewId,
80 ringbuffer: RingBufferId,
81 capacity: u64,
82 propagate_evictions: bool,
83 },
84 SinkSeriesView {
85 view: ViewId,
86 series: SeriesId,
87 key: SeriesKey,
88 },
89 SinkSubscription {
90 subscription: SubscriptionId,
91 },
92 Window {
93 kind: WindowKind,
94 group_by: Vec<Expression>,
95 aggregations: Vec<Expression>,
96 ts: Option<String>,
97 },
98}
99
100impl FlowNodeType {
101 pub fn ticks(&self) -> bool {
102 matches!(
103 self,
104 FlowNodeType::Append { .. }
105 | FlowNodeType::Distinct { .. }
106 | FlowNodeType::Window { .. }
107 | FlowNodeType::Apply { .. }
108 )
109 }
110
111 pub fn label(&self) -> String {
112 match self {
113 FlowNodeType::SourceInlineData {
114 ..
115 } => "SourceInlineData".into(),
116 FlowNodeType::SourceTable {
117 ..
118 } => "SourceTable".into(),
119 FlowNodeType::SourceView {
120 ..
121 } => "SourceView".into(),
122 FlowNodeType::SourceFlow {
123 ..
124 } => "SourceFlow".into(),
125 FlowNodeType::SourceRingBuffer {
126 ..
127 } => "SourceRingBuffer".into(),
128 FlowNodeType::SourceSeries {
129 ..
130 } => "SourceSeries".into(),
131 FlowNodeType::Filter {
132 ..
133 } => "Filter".into(),
134 FlowNodeType::Gate {
135 ..
136 } => "Gate".into(),
137 FlowNodeType::Map {
138 ..
139 } => "Map".into(),
140 FlowNodeType::Extend {
141 ..
142 } => "Extend".into(),
143 FlowNodeType::Join {
144 ..
145 } => "Join".into(),
146 FlowNodeType::Aggregate {
147 ..
148 } => "Aggregate".into(),
149 FlowNodeType::Append {
150 ..
151 } => "Append".into(),
152 FlowNodeType::Sort {
153 ..
154 } => "Sort".into(),
155 FlowNodeType::Take {
156 ..
157 } => "Take".into(),
158 FlowNodeType::Distinct {
159 ..
160 } => "Distinct".into(),
161 FlowNodeType::Apply {
162 operator,
163 ..
164 } => format!("Apply({})", operator),
165 FlowNodeType::SinkTableView {
166 ..
167 } => "SinkTableView".into(),
168 FlowNodeType::SinkRingBufferView {
169 ..
170 } => "SinkRingBufferView".into(),
171 FlowNodeType::SinkSeriesView {
172 ..
173 } => "SinkSeriesView".into(),
174 FlowNodeType::SinkSubscription {
175 ..
176 } => "SinkSubscription".into(),
177 FlowNodeType::Window {
178 ..
179 } => "Window".into(),
180 }
181 }
182
183 pub fn discriminator(&self) -> u8 {
184 match self {
185 FlowNodeType::SourceInlineData {
186 ..
187 } => 0,
188 FlowNodeType::SourceTable {
189 ..
190 } => 1,
191 FlowNodeType::SourceView {
192 ..
193 } => 2,
194 FlowNodeType::SourceFlow {
195 ..
196 } => 3,
197 FlowNodeType::Filter {
198 ..
199 } => 4,
200 FlowNodeType::Map {
201 ..
202 } => 5,
203 FlowNodeType::Extend {
204 ..
205 } => 6,
206 FlowNodeType::Join {
207 ..
208 } => 7,
209 FlowNodeType::Aggregate {
210 ..
211 } => 8,
212 FlowNodeType::Append {
213 ..
214 } => 9,
215 FlowNodeType::Sort {
216 ..
217 } => 10,
218 FlowNodeType::Take {
219 ..
220 } => 11,
221 FlowNodeType::Distinct {
222 ..
223 } => 12,
224 FlowNodeType::Apply {
225 ..
226 } => 13,
227 FlowNodeType::SinkSubscription {
228 ..
229 } => 14,
230 FlowNodeType::Window {
231 ..
232 } => 15,
233 FlowNodeType::SourceRingBuffer {
234 ..
235 } => 16,
236 FlowNodeType::SourceSeries {
237 ..
238 } => 17,
239 FlowNodeType::Gate {
240 ..
241 } => 18,
242 FlowNodeType::SinkTableView {
243 ..
244 } => 19,
245 FlowNodeType::SinkRingBufferView {
246 ..
247 } => 20,
248 FlowNodeType::SinkSeriesView {
249 ..
250 } => 21,
251 }
252 }
253
254 pub fn primitive_source_shape_id(&self) -> Option<ShapeId> {
255 match self {
256 FlowNodeType::SourceTable {
257 table,
258 } => Some(ShapeId::table(*table)),
259 FlowNodeType::SourceRingBuffer {
260 ringbuffer,
261 } => Some(ShapeId::ringbuffer(*ringbuffer)),
262 FlowNodeType::SourceSeries {
263 series,
264 } => Some(ShapeId::series(*series)),
265 FlowNodeType::SourceInlineData {
266 ..
267 }
268 | FlowNodeType::SourceView {
269 ..
270 }
271 | FlowNodeType::SourceFlow {
272 ..
273 }
274 | FlowNodeType::Filter {
275 ..
276 }
277 | FlowNodeType::Gate {
278 ..
279 }
280 | FlowNodeType::Map {
281 ..
282 }
283 | FlowNodeType::Extend {
284 ..
285 }
286 | FlowNodeType::Join {
287 ..
288 }
289 | FlowNodeType::Aggregate {
290 ..
291 }
292 | FlowNodeType::Append {
293 ..
294 }
295 | FlowNodeType::Sort {
296 ..
297 }
298 | FlowNodeType::Take {
299 ..
300 }
301 | FlowNodeType::Distinct {
302 ..
303 }
304 | FlowNodeType::Apply {
305 ..
306 }
307 | FlowNodeType::SinkTableView {
308 ..
309 }
310 | FlowNodeType::SinkRingBufferView {
311 ..
312 }
313 | FlowNodeType::SinkSeriesView {
314 ..
315 }
316 | FlowNodeType::SinkSubscription {
317 ..
318 }
319 | FlowNodeType::Window {
320 ..
321 } => None,
322 }
323 }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
327pub struct FlowNode {
328 pub id: FlowNodeId,
329 pub ty: FlowNodeType,
330 pub inputs: Vec<FlowNodeId>,
331 pub outputs: Vec<FlowNodeId>,
332}
333
334impl FlowNode {
335 pub fn new(id: impl Into<FlowNodeId>, ty: FlowNodeType) -> Self {
336 Self {
337 id: id.into(),
338 ty,
339 inputs: Vec::new(),
340 outputs: Vec::new(),
341 }
342 }
343}
344
345#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
346pub struct FlowEdge {
347 pub id: FlowEdgeId,
348 pub source: FlowNodeId,
349 pub target: FlowNodeId,
350}
351
352impl FlowEdge {
353 pub fn new(id: impl Into<FlowEdgeId>, source: impl Into<FlowNodeId>, target: impl Into<FlowNodeId>) -> Self {
354 Self {
355 id: id.into(),
356 source: source.into(),
357 target: target.into(),
358 }
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use reifydb_core::common::JoinType;
365
366 use super::FlowNodeType;
367
368 fn join() -> FlowNodeType {
369 FlowNodeType::Join {
370 join_type: JoinType::Inner,
371 left: vec![],
372 right: vec![],
373 alias: None,
374 snapshot: false,
375 }
376 }
377
378 #[test]
379 fn join_never_requests_ticks() {
380 assert!(!join().ticks());
383 }
384
385 #[test]
386 fn apply_always_requests_ticks() {
387 let apply = FlowNodeType::Apply {
393 operator: "compute_swap_volumes".to_string(),
394 expressions: vec![],
395 };
396 assert!(apply.ticks());
397 }
398
399 #[test]
400 fn append_and_distinct_always_request_ticks() {
401 assert!(FlowNodeType::Append {}.ticks());
405 assert!(FlowNodeType::Distinct {
406 expressions: vec![]
407 }
408 .ticks());
409 }
410
411 #[test]
412 fn stateless_nodes_do_not_request_ticks() {
413 assert!(!FlowNodeType::Map {
414 expressions: vec![]
415 }
416 .ticks());
417 assert!(!FlowNodeType::Filter {
418 conditions: vec![]
419 }
420 .ticks());
421 }
422}