Skip to main content

reifydb_rql/flow/
node.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::{JoinType, WindowKind},
6	interface::catalog::{
7		flow::{FlowEdgeId, FlowId, FlowNodeId},
8		id::{RingBufferId, SeriesId, SubscriptionId, TableId, ViewId},
9		series::SeriesKey,
10		shape::ShapeId,
11	},
12	sort::SortKey,
13};
14use serde::{Deserialize, Serialize};
15
16use crate::expression::Expression;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum FlowNodeType {
20	SourceInlineData {},
21	SourceTable {
22		table: TableId,
23	},
24	SourceView {
25		view: ViewId,
26	},
27	SourceFlow {
28		flow: FlowId,
29	},
30	SourceRingBuffer {
31		ringbuffer: RingBufferId,
32	},
33	SourceSeries {
34		series: SeriesId,
35	},
36	Filter {
37		conditions: Vec<Expression>,
38	},
39	Gate {
40		conditions: Vec<Expression>,
41	},
42	Map {
43		expressions: Vec<Expression>,
44	},
45	Extend {
46		expressions: Vec<Expression>,
47	},
48	Join {
49		join_type: JoinType,
50		left: Vec<Expression>,
51		right: Vec<Expression>,
52		alias: Option<String>,
53	},
54	Aggregate {
55		by: Vec<Expression>,
56		map: Vec<Expression>,
57	},
58	Append,
59	Sort {
60		by: Vec<SortKey>,
61	},
62	Take {
63		limit: usize,
64	},
65	Distinct {
66		expressions: Vec<Expression>,
67	},
68	Apply {
69		operator: String,
70		expressions: Vec<Expression>,
71	},
72	SinkTableView {
73		view: ViewId,
74		table: TableId,
75	},
76	SinkRingBufferView {
77		view: ViewId,
78		ringbuffer: RingBufferId,
79		capacity: u64,
80		propagate_evictions: bool,
81	},
82	SinkSeriesView {
83		view: ViewId,
84		series: SeriesId,
85		key: SeriesKey,
86	},
87	SinkSubscription {
88		subscription: SubscriptionId,
89	},
90	Window {
91		kind: WindowKind,
92		group_by: Vec<Expression>,
93		aggregations: Vec<Expression>,
94		ts: Option<String>,
95	},
96}
97
98impl FlowNodeType {
99	/// Returns a discriminator value for this node type variant.
100	/// Must match indices in FLOW_NODE_TYPE_NAMES in catalog/vtable/system/flow_node_types.rs
101	pub fn discriminator(&self) -> u8 {
102		match self {
103			FlowNodeType::SourceInlineData {
104				..
105			} => 0,
106			FlowNodeType::SourceTable {
107				..
108			} => 1,
109			FlowNodeType::SourceView {
110				..
111			} => 2,
112			FlowNodeType::SourceFlow {
113				..
114			} => 3,
115			FlowNodeType::Filter {
116				..
117			} => 4,
118			FlowNodeType::Map {
119				..
120			} => 5,
121			FlowNodeType::Extend {
122				..
123			} => 6,
124			FlowNodeType::Join {
125				..
126			} => 7,
127			FlowNodeType::Aggregate {
128				..
129			} => 8,
130			FlowNodeType::Append => 9,
131			FlowNodeType::Sort {
132				..
133			} => 10,
134			FlowNodeType::Take {
135				..
136			} => 11,
137			FlowNodeType::Distinct {
138				..
139			} => 12,
140			FlowNodeType::Apply {
141				..
142			} => 13,
143			FlowNodeType::SinkSubscription {
144				..
145			} => 14,
146			FlowNodeType::Window {
147				..
148			} => 15,
149			FlowNodeType::SourceRingBuffer {
150				..
151			} => 16,
152			FlowNodeType::SourceSeries {
153				..
154			} => 17,
155			FlowNodeType::Gate {
156				..
157			} => 18,
158			FlowNodeType::SinkTableView {
159				..
160			} => 19,
161			FlowNodeType::SinkRingBufferView {
162				..
163			} => 20,
164			FlowNodeType::SinkSeriesView {
165				..
166			} => 21,
167		}
168	}
169
170	/// If this node is a primitive data source (table, ring buffer, or series),
171	/// returns its [`ShapeId`]. Returns `None` for all other node types.
172	///
173	/// Uses an exhaustive match so that adding a new variant to [`FlowNodeType`]
174	/// produces a compiler error, forcing the author to decide whether the new
175	/// variant is a primitive source.
176	pub fn primitive_source_shape_id(&self) -> Option<ShapeId> {
177		match self {
178			FlowNodeType::SourceTable {
179				table,
180			} => Some(ShapeId::table(*table)),
181			FlowNodeType::SourceRingBuffer {
182				ringbuffer,
183			} => Some(ShapeId::ringbuffer(*ringbuffer)),
184			FlowNodeType::SourceSeries {
185				series,
186			} => Some(ShapeId::series(*series)),
187			FlowNodeType::SourceInlineData {
188				..
189			}
190			| FlowNodeType::SourceView {
191				..
192			}
193			| FlowNodeType::SourceFlow {
194				..
195			}
196			| FlowNodeType::Filter {
197				..
198			}
199			| FlowNodeType::Gate {
200				..
201			}
202			| FlowNodeType::Map {
203				..
204			}
205			| FlowNodeType::Extend {
206				..
207			}
208			| FlowNodeType::Join {
209				..
210			}
211			| FlowNodeType::Aggregate {
212				..
213			}
214			| FlowNodeType::Append
215			| FlowNodeType::Sort {
216				..
217			}
218			| FlowNodeType::Take {
219				..
220			}
221			| FlowNodeType::Distinct {
222				..
223			}
224			| FlowNodeType::Apply {
225				..
226			}
227			| FlowNodeType::SinkTableView {
228				..
229			}
230			| FlowNodeType::SinkRingBufferView {
231				..
232			}
233			| FlowNodeType::SinkSeriesView {
234				..
235			}
236			| FlowNodeType::SinkSubscription {
237				..
238			}
239			| FlowNodeType::Window {
240				..
241			} => None,
242		}
243	}
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct FlowNode {
248	pub id: FlowNodeId,
249	pub ty: FlowNodeType,
250	pub inputs: Vec<FlowNodeId>,
251	pub outputs: Vec<FlowNodeId>,
252}
253
254impl FlowNode {
255	pub fn new(id: impl Into<FlowNodeId>, ty: FlowNodeType) -> Self {
256		Self {
257			id: id.into(),
258			ty,
259			inputs: Vec::new(),
260			outputs: Vec::new(),
261		}
262	}
263}
264
265#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
266pub struct FlowEdge {
267	pub id: FlowEdgeId,
268	pub source: FlowNodeId,
269	pub target: FlowNodeId,
270}
271
272impl FlowEdge {
273	pub fn new(id: impl Into<FlowEdgeId>, source: impl Into<FlowNodeId>, target: impl Into<FlowNodeId>) -> Self {
274		Self {
275			id: id.into(),
276			source: source.into(),
277			target: target.into(),
278		}
279	}
280}