1use postcard::from_bytes;
5use reifydb_core::{
6 common::{JoinType, WindowKind},
7 internal,
8 sort::SortKey,
9 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::routine::{Function, FunctionKind, Routine, RoutineInfo, context::FunctionContext, error::RoutineError};
17
18#[derive(Debug, Clone, Serialize)]
19#[serde(rename_all = "snake_case")]
20pub enum JsonFlowNodeType {
21 SourceInlineData {},
22 SourceTable {
23 table: u64,
24 },
25 SourceView {
26 view: u64,
27 },
28 SourceFlow {
29 flow: u64,
30 },
31 SourceRingBuffer {
32 ringbuffer: u64,
33 },
34 SourceSeries {
35 series: u64,
36 },
37 Filter {
38 conditions: Vec<JsonExpression>,
39 },
40 Gate {
41 conditions: Vec<JsonExpression>,
42 },
43 Map {
44 expressions: Vec<JsonExpression>,
45 },
46 Extend {
47 expressions: Vec<JsonExpression>,
48 },
49 Join {
50 join_type: JoinType,
51 left: Vec<JsonExpression>,
52 right: Vec<JsonExpression>,
53 alias: Option<String>,
54 },
55 Aggregate {
56 by: Vec<JsonExpression>,
57 map: Vec<JsonExpression>,
58 },
59 Append,
60 Sort {
61 by: Vec<SortKey>,
62 },
63 Take {
64 limit: usize,
65 },
66 Distinct {
67 expressions: Vec<JsonExpression>,
68 },
69 Apply {
70 operator: String,
71 expressions: Vec<JsonExpression>,
72 },
73 SinkView {
74 view: u64,
75 },
76 SinkSubscription {
77 subscription: String,
78 },
79 Window {
80 kind: WindowKind,
81 group_by: Vec<JsonExpression>,
82 aggregations: Vec<JsonExpression>,
83 ts: Option<String>,
84 },
85}
86
87impl From<&FlowNodeType> for JsonFlowNodeType {
88 fn from(node_type: &FlowNodeType) -> Self {
89 match node_type {
90 FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
91 FlowNodeType::SourceTable {
92 table,
93 } => JsonFlowNodeType::SourceTable {
94 table: table.0,
95 },
96 FlowNodeType::SourceView {
97 view,
98 } => JsonFlowNodeType::SourceView {
99 view: view.0,
100 },
101 FlowNodeType::SourceFlow {
102 flow,
103 } => JsonFlowNodeType::SourceFlow {
104 flow: flow.0,
105 },
106 FlowNodeType::SourceRingBuffer {
107 ringbuffer,
108 } => JsonFlowNodeType::SourceRingBuffer {
109 ringbuffer: ringbuffer.0,
110 },
111 FlowNodeType::SourceSeries {
112 series,
113 } => JsonFlowNodeType::SourceSeries {
114 series: series.0,
115 },
116 FlowNodeType::Filter {
117 conditions,
118 } => JsonFlowNodeType::Filter {
119 conditions: conditions.iter().map(|e| e.into()).collect(),
120 },
121 FlowNodeType::Gate {
122 conditions,
123 } => JsonFlowNodeType::Gate {
124 conditions: conditions.iter().map(|e| e.into()).collect(),
125 },
126 FlowNodeType::Map {
127 expressions,
128 } => JsonFlowNodeType::Map {
129 expressions: expressions.iter().map(|e| e.into()).collect(),
130 },
131 FlowNodeType::Extend {
132 expressions,
133 } => JsonFlowNodeType::Extend {
134 expressions: expressions.iter().map(|e| e.into()).collect(),
135 },
136 FlowNodeType::Join {
137 join_type,
138 left,
139 right,
140 alias,
141 ttl: _,
142 } => JsonFlowNodeType::Join {
143 join_type: *join_type,
144 left: left.iter().map(|e| e.into()).collect(),
145 right: right.iter().map(|e| e.into()).collect(),
146 alias: alias.clone(),
147 },
148 FlowNodeType::Aggregate {
149 by,
150 map,
151 } => JsonFlowNodeType::Aggregate {
152 by: by.iter().map(|e| e.into()).collect(),
153 map: map.iter().map(|e| e.into()).collect(),
154 },
155 FlowNodeType::Append => JsonFlowNodeType::Append,
156 FlowNodeType::Sort {
157 by,
158 } => JsonFlowNodeType::Sort {
159 by: by.clone(),
160 },
161 FlowNodeType::Take {
162 limit,
163 } => JsonFlowNodeType::Take {
164 limit: *limit,
165 },
166 FlowNodeType::Distinct {
167 expressions,
168 ttl: _,
169 } => JsonFlowNodeType::Distinct {
170 expressions: expressions.iter().map(|e| e.into()).collect(),
171 },
172 FlowNodeType::Apply {
173 operator,
174 expressions,
175 ttl: _,
176 } => JsonFlowNodeType::Apply {
177 operator: operator.clone(),
178 expressions: expressions.iter().map(|e| e.into()).collect(),
179 },
180 FlowNodeType::SinkTableView {
181 view,
182 ..
183 }
184 | FlowNodeType::SinkRingBufferView {
185 view,
186 ..
187 }
188 | FlowNodeType::SinkSeriesView {
189 view,
190 ..
191 } => JsonFlowNodeType::SinkView {
192 view: view.0,
193 },
194 FlowNodeType::SinkSubscription {
195 subscription,
196 } => JsonFlowNodeType::SinkSubscription {
197 subscription: subscription.0.to_string(),
198 },
199 FlowNodeType::Window {
200 kind,
201 group_by,
202 aggregations,
203 ts,
204 } => JsonFlowNodeType::Window {
205 kind: kind.clone(),
206 group_by: group_by.iter().map(|e| e.into()).collect(),
207 aggregations: aggregations.iter().map(|e| e.into()).collect(),
208 ts: ts.clone(),
209 },
210 }
211 }
212}
213
214pub struct FlowNodeToJson {
215 info: RoutineInfo,
216}
217
218impl Default for FlowNodeToJson {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224impl FlowNodeToJson {
225 pub fn new() -> Self {
226 Self {
227 info: RoutineInfo::new("flow_node::to_json"),
228 }
229 }
230}
231
232impl<'a> Routine<FunctionContext<'a>> for FlowNodeToJson {
233 fn info(&self) -> &RoutineInfo {
234 &self.info
235 }
236
237 fn return_type(&self, _input_types: &[Type]) -> Type {
238 Type::Utf8
239 }
240
241 fn execute(&self, ctx: &mut FunctionContext<'a>, args: &Columns) -> Result<Columns, RoutineError> {
242 if args.is_empty() {
243 return Ok(Columns::new(vec![ColumnWithName::new(
244 ctx.fragment.clone(),
245 ColumnBuffer::utf8(Vec::<String>::new()),
246 )]));
247 }
248
249 if args.len() != 1 {
250 return Err(RoutineError::FunctionArityMismatch {
251 function: ctx.fragment.clone(),
252 expected: 1,
253 actual: args.len(),
254 });
255 }
256
257 let column = &args[0];
258 let (data, bitvec) = column.unwrap_option();
259 let row_count = data.len();
260
261 match data {
262 ColumnBuffer::Blob {
263 container,
264 ..
265 } => {
266 let mut result_data = Vec::with_capacity(row_count);
267
268 for i in 0..row_count {
269 if container.is_defined(i) {
270 let bytes = match container.get(i) {
271 Some(b) => b,
272 None => continue,
273 };
274
275 let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
276 Error(Box::new(internal!(
277 "Failed to deserialize FlowNodeType: {}",
278 e
279 )))
280 })?;
281
282 let json_node_type: JsonFlowNodeType = (&node_type).into();
283
284 let json_value = to_value(&json_node_type).map_err(|e| {
285 Error(Box::new(internal!(
286 "Failed to serialize FlowNodeType to JSON: {}",
287 e
288 )))
289 })?;
290
291 let inner_value = match json_value {
292 JsonValue::Object(map) if map.len() == 1 => map
293 .into_iter()
294 .next()
295 .map(|(_, v)| v)
296 .unwrap_or(JsonValue::Null),
297 JsonValue::String(_) => JsonValue::Null,
298 other => other,
299 };
300
301 let json = to_string(&inner_value).map_err(|e| {
302 Error(Box::new(internal!(
303 "Failed to serialize FlowNodeType to JSON: {}",
304 e
305 )))
306 })?;
307
308 result_data.push(json);
309 } else {
310 result_data.push(String::new());
311 }
312 }
313
314 let result_col_data = ColumnBuffer::utf8(result_data);
315 let final_data = match bitvec {
316 Some(bv) => ColumnBuffer::Option {
317 inner: Box::new(result_col_data),
318 bitvec: bv.clone(),
319 },
320 None => result_col_data,
321 };
322 Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), final_data)]))
323 }
324 _ => Err(RoutineError::FunctionExecutionFailed {
325 function: ctx.fragment.clone(),
326 reason: "flow_node::to_json only supports Blob input".to_string(),
327 }),
328 }
329 }
330}
331
332impl Function for FlowNodeToJson {
333 fn kinds(&self) -> &[FunctionKind] {
334 &[FunctionKind::Scalar]
335 }
336}