1use reifydb_core::{
5 common::{JoinType, WindowKind},
6 interface::catalog::{
7 flow::{FlowEdgeId, FlowId, FlowNodeId},
8 id::{RingBufferId, SeriesId, SubscriptionId, TableId, ViewId},
9 series::SeriesKey,
10 shape::ShapeId,
11 },
12 sort::SortKey,
13};
14use serde::{Deserialize, Serialize};
15
16use crate::expression::Expression;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum FlowNodeType {
20 SourceInlineData {},
21 SourceTable {
22 table: TableId,
23 },
24 SourceView {
25 view: ViewId,
26 },
27 SourceFlow {
28 flow: FlowId,
29 },
30 SourceRingBuffer {
31 ringbuffer: RingBufferId,
32 },
33 SourceSeries {
34 series: SeriesId,
35 },
36 Filter {
37 conditions: Vec<Expression>,
38 },
39 Gate {
40 conditions: Vec<Expression>,
41 },
42 Map {
43 expressions: Vec<Expression>,
44 },
45 Extend {
46 expressions: Vec<Expression>,
47 },
48 Join {
49 join_type: JoinType,
50 left: Vec<Expression>,
51 right: Vec<Expression>,
52 alias: Option<String>,
53 },
54 Aggregate {
55 by: Vec<Expression>,
56 map: Vec<Expression>,
57 },
58 Append,
59 Sort {
60 by: Vec<SortKey>,
61 },
62 Take {
63 limit: usize,
64 },
65 Distinct {
66 expressions: Vec<Expression>,
67 },
68 Apply {
69 operator: String,
70 expressions: Vec<Expression>,
71 },
72 SinkTableView {
73 view: ViewId,
74 table: TableId,
75 },
76 SinkRingBufferView {
77 view: ViewId,
78 ringbuffer: RingBufferId,
79 capacity: u64,
80 propagate_evictions: bool,
81 },
82 SinkSeriesView {
83 view: ViewId,
84 series: SeriesId,
85 key: SeriesKey,
86 },
87 SinkSubscription {
88 subscription: SubscriptionId,
89 },
90 Window {
91 kind: WindowKind,
92 group_by: Vec<Expression>,
93 aggregations: Vec<Expression>,
94 ts: Option<String>,
95 },
96}
97
98impl FlowNodeType {
99 pub fn discriminator(&self) -> u8 {
102 match self {
103 FlowNodeType::SourceInlineData {
104 ..
105 } => 0,
106 FlowNodeType::SourceTable {
107 ..
108 } => 1,
109 FlowNodeType::SourceView {
110 ..
111 } => 2,
112 FlowNodeType::SourceFlow {
113 ..
114 } => 3,
115 FlowNodeType::Filter {
116 ..
117 } => 4,
118 FlowNodeType::Map {
119 ..
120 } => 5,
121 FlowNodeType::Extend {
122 ..
123 } => 6,
124 FlowNodeType::Join {
125 ..
126 } => 7,
127 FlowNodeType::Aggregate {
128 ..
129 } => 8,
130 FlowNodeType::Append => 9,
131 FlowNodeType::Sort {
132 ..
133 } => 10,
134 FlowNodeType::Take {
135 ..
136 } => 11,
137 FlowNodeType::Distinct {
138 ..
139 } => 12,
140 FlowNodeType::Apply {
141 ..
142 } => 13,
143 FlowNodeType::SinkSubscription {
144 ..
145 } => 14,
146 FlowNodeType::Window {
147 ..
148 } => 15,
149 FlowNodeType::SourceRingBuffer {
150 ..
151 } => 16,
152 FlowNodeType::SourceSeries {
153 ..
154 } => 17,
155 FlowNodeType::Gate {
156 ..
157 } => 18,
158 FlowNodeType::SinkTableView {
159 ..
160 } => 19,
161 FlowNodeType::SinkRingBufferView {
162 ..
163 } => 20,
164 FlowNodeType::SinkSeriesView {
165 ..
166 } => 21,
167 }
168 }
169
170 pub fn primitive_source_shape_id(&self) -> Option<ShapeId> {
177 match self {
178 FlowNodeType::SourceTable {
179 table,
180 } => Some(ShapeId::table(*table)),
181 FlowNodeType::SourceRingBuffer {
182 ringbuffer,
183 } => Some(ShapeId::ringbuffer(*ringbuffer)),
184 FlowNodeType::SourceSeries {
185 series,
186 } => Some(ShapeId::series(*series)),
187 FlowNodeType::SourceInlineData {
188 ..
189 }
190 | FlowNodeType::SourceView {
191 ..
192 }
193 | FlowNodeType::SourceFlow {
194 ..
195 }
196 | FlowNodeType::Filter {
197 ..
198 }
199 | FlowNodeType::Gate {
200 ..
201 }
202 | FlowNodeType::Map {
203 ..
204 }
205 | FlowNodeType::Extend {
206 ..
207 }
208 | FlowNodeType::Join {
209 ..
210 }
211 | FlowNodeType::Aggregate {
212 ..
213 }
214 | FlowNodeType::Append
215 | FlowNodeType::Sort {
216 ..
217 }
218 | FlowNodeType::Take {
219 ..
220 }
221 | FlowNodeType::Distinct {
222 ..
223 }
224 | FlowNodeType::Apply {
225 ..
226 }
227 | FlowNodeType::SinkTableView {
228 ..
229 }
230 | FlowNodeType::SinkRingBufferView {
231 ..
232 }
233 | FlowNodeType::SinkSeriesView {
234 ..
235 }
236 | FlowNodeType::SinkSubscription {
237 ..
238 }
239 | FlowNodeType::Window {
240 ..
241 } => None,
242 }
243 }
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct FlowNode {
248 pub id: FlowNodeId,
249 pub ty: FlowNodeType,
250 pub inputs: Vec<FlowNodeId>,
251 pub outputs: Vec<FlowNodeId>,
252}
253
254impl FlowNode {
255 pub fn new(id: impl Into<FlowNodeId>, ty: FlowNodeType) -> Self {
256 Self {
257 id: id.into(),
258 ty,
259 inputs: Vec::new(),
260 outputs: Vec::new(),
261 }
262 }
263}
264
265#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
266pub struct FlowEdge {
267 pub id: FlowEdgeId,
268 pub source: FlowNodeId,
269 pub target: FlowNodeId,
270}
271
272impl FlowEdge {
273 pub fn new(id: impl Into<FlowEdgeId>, source: impl Into<FlowNodeId>, target: impl Into<FlowNodeId>) -> Self {
274 Self {
275 id: id.into(),
276 source: source.into(),
277 target: target.into(),
278 }
279 }
280}