Skip to main content

reifydb_routine/function/flow/
to_json.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::from_bytes;
5use reifydb_core::{
6	common::{JoinType, WindowKind},
7	internal,
8	sort::SortKey,
9	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::routine::{Function, FunctionKind, Routine, RoutineInfo, context::FunctionContext, error::RoutineError};
17
18/// JSON-serializable version of FlowNodeType that uses JsonExpression
19/// for clean expression serialization without Fragment metadata.
20#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23	SourceInlineData {},
24	SourceTable {
25		table: u64,
26	},
27	SourceView {
28		view: u64,
29	},
30	SourceFlow {
31		flow: u64,
32	},
33	SourceRingBuffer {
34		ringbuffer: u64,
35	},
36	SourceSeries {
37		series: u64,
38	},
39	Filter {
40		conditions: Vec<JsonExpression>,
41	},
42	Gate {
43		conditions: Vec<JsonExpression>,
44	},
45	Map {
46		expressions: Vec<JsonExpression>,
47	},
48	Extend {
49		expressions: Vec<JsonExpression>,
50	},
51	Join {
52		join_type: JoinType,
53		left: Vec<JsonExpression>,
54		right: Vec<JsonExpression>,
55		alias: Option<String>,
56	},
57	Aggregate {
58		by: Vec<JsonExpression>,
59		map: Vec<JsonExpression>,
60	},
61	Append,
62	Sort {
63		by: Vec<SortKey>,
64	},
65	Take {
66		limit: usize,
67	},
68	Distinct {
69		expressions: Vec<JsonExpression>,
70	},
71	Apply {
72		operator: String,
73		expressions: Vec<JsonExpression>,
74	},
75	SinkView {
76		view: u64,
77	},
78	SinkSubscription {
79		subscription: String,
80	},
81	Window {
82		kind: WindowKind,
83		group_by: Vec<JsonExpression>,
84		aggregations: Vec<JsonExpression>,
85		ts: Option<String>,
86	},
87}
88
89impl From<&FlowNodeType> for JsonFlowNodeType {
90	fn from(node_type: &FlowNodeType) -> Self {
91		match node_type {
92			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
93			FlowNodeType::SourceTable {
94				table,
95			} => JsonFlowNodeType::SourceTable {
96				table: table.0,
97			},
98			FlowNodeType::SourceView {
99				view,
100			} => JsonFlowNodeType::SourceView {
101				view: view.0,
102			},
103			FlowNodeType::SourceFlow {
104				flow,
105			} => JsonFlowNodeType::SourceFlow {
106				flow: flow.0,
107			},
108			FlowNodeType::SourceRingBuffer {
109				ringbuffer,
110			} => JsonFlowNodeType::SourceRingBuffer {
111				ringbuffer: ringbuffer.0,
112			},
113			FlowNodeType::SourceSeries {
114				series,
115			} => JsonFlowNodeType::SourceSeries {
116				series: series.0,
117			},
118			FlowNodeType::Filter {
119				conditions,
120			} => JsonFlowNodeType::Filter {
121				conditions: conditions.iter().map(|e| e.into()).collect(),
122			},
123			FlowNodeType::Gate {
124				conditions,
125			} => JsonFlowNodeType::Gate {
126				conditions: conditions.iter().map(|e| e.into()).collect(),
127			},
128			FlowNodeType::Map {
129				expressions,
130			} => JsonFlowNodeType::Map {
131				expressions: expressions.iter().map(|e| e.into()).collect(),
132			},
133			FlowNodeType::Extend {
134				expressions,
135			} => JsonFlowNodeType::Extend {
136				expressions: expressions.iter().map(|e| e.into()).collect(),
137			},
138			FlowNodeType::Join {
139				join_type,
140				left,
141				right,
142				alias,
143			} => JsonFlowNodeType::Join {
144				join_type: *join_type,
145				left: left.iter().map(|e| e.into()).collect(),
146				right: right.iter().map(|e| e.into()).collect(),
147				alias: alias.clone(),
148			},
149			FlowNodeType::Aggregate {
150				by,
151				map,
152			} => JsonFlowNodeType::Aggregate {
153				by: by.iter().map(|e| e.into()).collect(),
154				map: map.iter().map(|e| e.into()).collect(),
155			},
156			FlowNodeType::Append => JsonFlowNodeType::Append,
157			FlowNodeType::Sort {
158				by,
159			} => JsonFlowNodeType::Sort {
160				by: by.clone(),
161			},
162			FlowNodeType::Take {
163				limit,
164			} => JsonFlowNodeType::Take {
165				limit: *limit,
166			},
167			FlowNodeType::Distinct {
168				expressions,
169			} => JsonFlowNodeType::Distinct {
170				expressions: expressions.iter().map(|e| e.into()).collect(),
171			},
172			FlowNodeType::Apply {
173				operator,
174				expressions,
175			} => JsonFlowNodeType::Apply {
176				operator: operator.clone(),
177				expressions: expressions.iter().map(|e| e.into()).collect(),
178			},
179			FlowNodeType::SinkTableView {
180				view,
181				..
182			}
183			| FlowNodeType::SinkRingBufferView {
184				view,
185				..
186			}
187			| FlowNodeType::SinkSeriesView {
188				view,
189				..
190			} => JsonFlowNodeType::SinkView {
191				view: view.0,
192			},
193			FlowNodeType::SinkSubscription {
194				subscription,
195			} => JsonFlowNodeType::SinkSubscription {
196				subscription: subscription.0.to_string(),
197			},
198			FlowNodeType::Window {
199				kind,
200				group_by,
201				aggregations,
202				ts,
203			} => JsonFlowNodeType::Window {
204				kind: kind.clone(),
205				group_by: group_by.iter().map(|e| e.into()).collect(),
206				aggregations: aggregations.iter().map(|e| e.into()).collect(),
207				ts: ts.clone(),
208			},
209		}
210	}
211}
212
213pub struct FlowNodeToJson {
214	info: RoutineInfo,
215}
216
217impl Default for FlowNodeToJson {
218	fn default() -> Self {
219		Self::new()
220	}
221}
222
223impl FlowNodeToJson {
224	pub fn new() -> Self {
225		Self {
226			info: RoutineInfo::new("flow_node::to_json"),
227		}
228	}
229}
230
231impl<'a> Routine<FunctionContext<'a>> for FlowNodeToJson {
232	fn info(&self) -> &RoutineInfo {
233		&self.info
234	}
235
236	fn return_type(&self, _input_types: &[Type]) -> Type {
237		Type::Utf8
238	}
239
240	fn execute(&self, ctx: &mut FunctionContext<'a>, args: &Columns) -> Result<Columns, RoutineError> {
241		if args.is_empty() {
242			return Ok(Columns::new(vec![ColumnWithName::new(
243				ctx.fragment.clone(),
244				ColumnBuffer::utf8(Vec::<String>::new()),
245			)]));
246		}
247
248		if args.len() != 1 {
249			return Err(RoutineError::FunctionArityMismatch {
250				function: ctx.fragment.clone(),
251				expected: 1,
252				actual: args.len(),
253			});
254		}
255
256		let column = &args[0];
257		let (data, bitvec) = column.unwrap_option();
258		let row_count = data.len();
259
260		match data {
261			ColumnBuffer::Blob {
262				container,
263				..
264			} => {
265				let mut result_data = Vec::with_capacity(row_count);
266
267				for i in 0..row_count {
268					if container.is_defined(i) {
269						let bytes = match container.get(i) {
270							Some(b) => b,
271							None => continue,
272						};
273
274						// Deserialize from postcard
275						let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
276							Error(Box::new(internal!(
277								"Failed to deserialize FlowNodeType: {}",
278								e
279							)))
280						})?;
281
282						// Convert to JsonFlowNodeType for clean serialization
283						let json_node_type: JsonFlowNodeType = (&node_type).into();
284
285						// Serialize to JSON (untagged - extract inner value only)
286						let json_value = to_value(&json_node_type).map_err(|e| {
287							Error(Box::new(internal!(
288								"Failed to serialize FlowNodeType to JSON: {}",
289								e
290							)))
291						})?;
292
293						// Extract the inner object from the tagged enum {"variant_name": {...}}
294						let inner_value = match json_value {
295							JsonValue::Object(map) if map.len() == 1 => map
296								.into_iter()
297								.next()
298								.map(|(_, v)| v)
299								.unwrap_or(JsonValue::Null),
300							JsonValue::String(_) => {
301								// Unit variants serialize as strings, return null for
302								// untagged
303								JsonValue::Null
304							}
305							other => other,
306						};
307
308						let json = to_string(&inner_value).map_err(|e| {
309							Error(Box::new(internal!(
310								"Failed to serialize FlowNodeType to JSON: {}",
311								e
312							)))
313						})?;
314
315						result_data.push(json);
316					} else {
317						result_data.push(String::new());
318					}
319				}
320
321				let result_col_data = ColumnBuffer::utf8(result_data);
322				let final_data = match bitvec {
323					Some(bv) => ColumnBuffer::Option {
324						inner: Box::new(result_col_data),
325						bitvec: bv.clone(),
326					},
327					None => result_col_data,
328				};
329				Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), final_data)]))
330			}
331			_ => Err(RoutineError::FunctionExecutionFailed {
332				function: ctx.fragment.clone(),
333				reason: "flow_node::to_json only supports Blob input".to_string(),
334			}),
335		}
336	}
337}
338
339impl Function for FlowNodeToJson {
340	fn kinds(&self) -> &[FunctionKind] {
341		&[FunctionKind::Scalar]
342	}
343}