Skip to main content

reifydb_rql/flow/
node.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use reifydb_core::{
5	common::{JoinType, WindowKind},
6	interface::catalog::{
7		flow::{FlowEdgeId, FlowId, FlowNodeId},
8		id::{RingBufferId, SeriesId, SubscriptionId, TableId, ViewId},
9		series::SeriesKey,
10		shape::ShapeId,
11	},
12	sort::SortKey,
13};
14use serde::{Deserialize, Serialize};
15
16use crate::expression::Expression;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum FlowNodeType {
20	SourceInlineData {},
21	SourceTable {
22		table: TableId,
23	},
24	SourceView {
25		view: ViewId,
26	},
27	SourceFlow {
28		flow: FlowId,
29	},
30	SourceRingBuffer {
31		ringbuffer: RingBufferId,
32	},
33	SourceSeries {
34		series: SeriesId,
35	},
36	Filter {
37		conditions: Vec<Expression>,
38	},
39	Gate {
40		conditions: Vec<Expression>,
41	},
42	Map {
43		expressions: Vec<Expression>,
44	},
45	Extend {
46		expressions: Vec<Expression>,
47	},
48	Join {
49		join_type: JoinType,
50		left: Vec<Expression>,
51		right: Vec<Expression>,
52		alias: Option<String>,
53		#[serde(default)]
54		snapshot: bool,
55	},
56	Aggregate {
57		by: Vec<Expression>,
58		map: Vec<Expression>,
59	},
60	Append {},
61	Sort {
62		by: Vec<SortKey>,
63	},
64	Take {
65		limit: usize,
66	},
67	Distinct {
68		expressions: Vec<Expression>,
69	},
70	Apply {
71		operator: String,
72		expressions: Vec<Expression>,
73	},
74	SinkTableView {
75		view: ViewId,
76		table: TableId,
77	},
78	SinkRingBufferView {
79		view: ViewId,
80		ringbuffer: RingBufferId,
81		capacity: u64,
82		propagate_evictions: bool,
83	},
84	SinkSeriesView {
85		view: ViewId,
86		series: SeriesId,
87		key: SeriesKey,
88	},
89	SinkSubscription {
90		subscription: SubscriptionId,
91	},
92	Window {
93		kind: WindowKind,
94		group_by: Vec<Expression>,
95		aggregations: Vec<Expression>,
96		ts: Option<String>,
97	},
98}
99
100impl FlowNodeType {
101	pub fn ticks(&self) -> bool {
102		matches!(
103			self,
104			FlowNodeType::Append { .. }
105				| FlowNodeType::Distinct { .. }
106				| FlowNodeType::Window { .. }
107				| FlowNodeType::Apply { .. }
108		)
109	}
110
111	pub fn label(&self) -> String {
112		match self {
113			FlowNodeType::SourceInlineData {
114				..
115			} => "SourceInlineData".into(),
116			FlowNodeType::SourceTable {
117				..
118			} => "SourceTable".into(),
119			FlowNodeType::SourceView {
120				..
121			} => "SourceView".into(),
122			FlowNodeType::SourceFlow {
123				..
124			} => "SourceFlow".into(),
125			FlowNodeType::SourceRingBuffer {
126				..
127			} => "SourceRingBuffer".into(),
128			FlowNodeType::SourceSeries {
129				..
130			} => "SourceSeries".into(),
131			FlowNodeType::Filter {
132				..
133			} => "Filter".into(),
134			FlowNodeType::Gate {
135				..
136			} => "Gate".into(),
137			FlowNodeType::Map {
138				..
139			} => "Map".into(),
140			FlowNodeType::Extend {
141				..
142			} => "Extend".into(),
143			FlowNodeType::Join {
144				..
145			} => "Join".into(),
146			FlowNodeType::Aggregate {
147				..
148			} => "Aggregate".into(),
149			FlowNodeType::Append {
150				..
151			} => "Append".into(),
152			FlowNodeType::Sort {
153				..
154			} => "Sort".into(),
155			FlowNodeType::Take {
156				..
157			} => "Take".into(),
158			FlowNodeType::Distinct {
159				..
160			} => "Distinct".into(),
161			FlowNodeType::Apply {
162				operator,
163				..
164			} => format!("Apply({})", operator),
165			FlowNodeType::SinkTableView {
166				..
167			} => "SinkTableView".into(),
168			FlowNodeType::SinkRingBufferView {
169				..
170			} => "SinkRingBufferView".into(),
171			FlowNodeType::SinkSeriesView {
172				..
173			} => "SinkSeriesView".into(),
174			FlowNodeType::SinkSubscription {
175				..
176			} => "SinkSubscription".into(),
177			FlowNodeType::Window {
178				..
179			} => "Window".into(),
180		}
181	}
182
183	pub fn discriminator(&self) -> u8 {
184		match self {
185			FlowNodeType::SourceInlineData {
186				..
187			} => 0,
188			FlowNodeType::SourceTable {
189				..
190			} => 1,
191			FlowNodeType::SourceView {
192				..
193			} => 2,
194			FlowNodeType::SourceFlow {
195				..
196			} => 3,
197			FlowNodeType::Filter {
198				..
199			} => 4,
200			FlowNodeType::Map {
201				..
202			} => 5,
203			FlowNodeType::Extend {
204				..
205			} => 6,
206			FlowNodeType::Join {
207				..
208			} => 7,
209			FlowNodeType::Aggregate {
210				..
211			} => 8,
212			FlowNodeType::Append {
213				..
214			} => 9,
215			FlowNodeType::Sort {
216				..
217			} => 10,
218			FlowNodeType::Take {
219				..
220			} => 11,
221			FlowNodeType::Distinct {
222				..
223			} => 12,
224			FlowNodeType::Apply {
225				..
226			} => 13,
227			FlowNodeType::SinkSubscription {
228				..
229			} => 14,
230			FlowNodeType::Window {
231				..
232			} => 15,
233			FlowNodeType::SourceRingBuffer {
234				..
235			} => 16,
236			FlowNodeType::SourceSeries {
237				..
238			} => 17,
239			FlowNodeType::Gate {
240				..
241			} => 18,
242			FlowNodeType::SinkTableView {
243				..
244			} => 19,
245			FlowNodeType::SinkRingBufferView {
246				..
247			} => 20,
248			FlowNodeType::SinkSeriesView {
249				..
250			} => 21,
251		}
252	}
253
254	pub fn primitive_source_shape_id(&self) -> Option<ShapeId> {
255		match self {
256			FlowNodeType::SourceTable {
257				table,
258			} => Some(ShapeId::table(*table)),
259			FlowNodeType::SourceRingBuffer {
260				ringbuffer,
261			} => Some(ShapeId::ringbuffer(*ringbuffer)),
262			FlowNodeType::SourceSeries {
263				series,
264			} => Some(ShapeId::series(*series)),
265			FlowNodeType::SourceInlineData {
266				..
267			}
268			| FlowNodeType::SourceView {
269				..
270			}
271			| FlowNodeType::SourceFlow {
272				..
273			}
274			| FlowNodeType::Filter {
275				..
276			}
277			| FlowNodeType::Gate {
278				..
279			}
280			| FlowNodeType::Map {
281				..
282			}
283			| FlowNodeType::Extend {
284				..
285			}
286			| FlowNodeType::Join {
287				..
288			}
289			| FlowNodeType::Aggregate {
290				..
291			}
292			| FlowNodeType::Append {
293				..
294			}
295			| FlowNodeType::Sort {
296				..
297			}
298			| FlowNodeType::Take {
299				..
300			}
301			| FlowNodeType::Distinct {
302				..
303			}
304			| FlowNodeType::Apply {
305				..
306			}
307			| FlowNodeType::SinkTableView {
308				..
309			}
310			| FlowNodeType::SinkRingBufferView {
311				..
312			}
313			| FlowNodeType::SinkSeriesView {
314				..
315			}
316			| FlowNodeType::SinkSubscription {
317				..
318			}
319			| FlowNodeType::Window {
320				..
321			} => None,
322		}
323	}
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
327pub struct FlowNode {
328	pub id: FlowNodeId,
329	pub ty: FlowNodeType,
330	pub inputs: Vec<FlowNodeId>,
331	pub outputs: Vec<FlowNodeId>,
332}
333
334impl FlowNode {
335	pub fn new(id: impl Into<FlowNodeId>, ty: FlowNodeType) -> Self {
336		Self {
337			id: id.into(),
338			ty,
339			inputs: Vec::new(),
340			outputs: Vec::new(),
341		}
342	}
343}
344
345#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
346pub struct FlowEdge {
347	pub id: FlowEdgeId,
348	pub source: FlowNodeId,
349	pub target: FlowNodeId,
350}
351
352impl FlowEdge {
353	pub fn new(id: impl Into<FlowEdgeId>, source: impl Into<FlowNodeId>, target: impl Into<FlowNodeId>) -> Self {
354		Self {
355			id: id.into(),
356			source: source.into(),
357			target: target.into(),
358		}
359	}
360}
361
362#[cfg(test)]
363mod tests {
364	use reifydb_core::common::JoinType;
365
366	use super::FlowNodeType;
367
368	fn join() -> FlowNodeType {
369		FlowNodeType::Join {
370			join_type: JoinType::Inner,
371			left: vec![],
372			right: vec![],
373			alias: None,
374			snapshot: false,
375		}
376	}
377
378	#[test]
379	fn join_never_requests_ticks() {
380		// Join state TTL is reclaimed by the background operator GC actor (per-side, via
381		// OperatorSettings), not on the flow tick path - so a Join node never requests ticks.
382		assert!(!join().ticks());
383	}
384
385	#[test]
386	fn apply_always_requests_ticks() {
387		// Apply nodes always register for flow ticks, regardless of the underlying operator's
388		// tick capability. The graph-level gate cannot see the runtime operator, so it
389		// registers unconditionally; the runtime operator then decides whether tick() actually
390		// runs (an FFI operator without CAPABILITY_TICK reports no interval and is skipped).
391		// Registering here is what lets a tick-capable custom operator be ticked at all.
392		let apply = FlowNodeType::Apply {
393			operator: "compute_swap_volumes".to_string(),
394			expressions: vec![],
395		};
396		assert!(apply.ticks());
397	}
398
399	#[test]
400	fn append_and_distinct_always_request_ticks() {
401		// Their TTL now lives in OperatorSettings (not the node) and is reclaimed on tick when
402		// configured; the graph-level gate cannot see it, so they request ticks unconditionally and
403		// the runtime operator decides whether tick() actually runs.
404		assert!(FlowNodeType::Append {}.ticks());
405		assert!(FlowNodeType::Distinct {
406			expressions: vec![]
407		}
408		.ticks());
409	}
410
411	#[test]
412	fn stateless_nodes_do_not_request_ticks() {
413		assert!(!FlowNodeType::Map {
414			expressions: vec![]
415		}
416		.ticks());
417		assert!(!FlowNodeType::Filter {
418			conditions: vec![]
419		}
420		.ticks());
421	}
422}