Skip to main content

reifydb_routine/function/flow/
to_json.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::from_bytes;
5use reifydb_core::{
6	common::{JoinType, WindowKind},
7	internal,
8	sort::SortKey,
9	value::column::{Column, columns::Columns, data::ColumnData},
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::function::{Function, FunctionCapability, FunctionContext, FunctionInfo, error::FunctionError};
17
18/// JSON-serializable version of FlowNodeType that uses JsonExpression
19/// for clean expression serialization without Fragment metadata.
20#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23	SourceInlineData {},
24	SourceTable {
25		table: u64,
26	},
27	SourceView {
28		view: u64,
29	},
30	SourceFlow {
31		flow: u64,
32	},
33	SourceRingBuffer {
34		ringbuffer: u64,
35	},
36	SourceSeries {
37		series: u64,
38	},
39	Filter {
40		conditions: Vec<JsonExpression>,
41	},
42	Gate {
43		conditions: Vec<JsonExpression>,
44	},
45	Map {
46		expressions: Vec<JsonExpression>,
47	},
48	Extend {
49		expressions: Vec<JsonExpression>,
50	},
51	Join {
52		join_type: JoinType,
53		left: Vec<JsonExpression>,
54		right: Vec<JsonExpression>,
55		alias: Option<String>,
56	},
57	Aggregate {
58		by: Vec<JsonExpression>,
59		map: Vec<JsonExpression>,
60	},
61	Append,
62	Sort {
63		by: Vec<SortKey>,
64	},
65	Take {
66		limit: usize,
67	},
68	Distinct {
69		expressions: Vec<JsonExpression>,
70	},
71	Apply {
72		operator: String,
73		expressions: Vec<JsonExpression>,
74	},
75	SinkView {
76		view: u64,
77	},
78	SinkSubscription {
79		subscription: String,
80	},
81	Window {
82		kind: WindowKind,
83		group_by: Vec<JsonExpression>,
84		aggregations: Vec<JsonExpression>,
85		ts: Option<String>,
86	},
87}
88
89impl From<&FlowNodeType> for JsonFlowNodeType {
90	fn from(node_type: &FlowNodeType) -> Self {
91		match node_type {
92			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
93			FlowNodeType::SourceTable {
94				table,
95			} => JsonFlowNodeType::SourceTable {
96				table: table.0,
97			},
98			FlowNodeType::SourceView {
99				view,
100			} => JsonFlowNodeType::SourceView {
101				view: view.0,
102			},
103			FlowNodeType::SourceFlow {
104				flow,
105			} => JsonFlowNodeType::SourceFlow {
106				flow: flow.0,
107			},
108			FlowNodeType::SourceRingBuffer {
109				ringbuffer,
110			} => JsonFlowNodeType::SourceRingBuffer {
111				ringbuffer: ringbuffer.0,
112			},
113			FlowNodeType::SourceSeries {
114				series,
115			} => JsonFlowNodeType::SourceSeries {
116				series: series.0,
117			},
118			FlowNodeType::Filter {
119				conditions,
120			} => JsonFlowNodeType::Filter {
121				conditions: conditions.iter().map(|e| e.into()).collect(),
122			},
123			FlowNodeType::Gate {
124				conditions,
125			} => JsonFlowNodeType::Gate {
126				conditions: conditions.iter().map(|e| e.into()).collect(),
127			},
128			FlowNodeType::Map {
129				expressions,
130			} => JsonFlowNodeType::Map {
131				expressions: expressions.iter().map(|e| e.into()).collect(),
132			},
133			FlowNodeType::Extend {
134				expressions,
135			} => JsonFlowNodeType::Extend {
136				expressions: expressions.iter().map(|e| e.into()).collect(),
137			},
138			FlowNodeType::Join {
139				join_type,
140				left,
141				right,
142				alias,
143			} => JsonFlowNodeType::Join {
144				join_type: *join_type,
145				left: left.iter().map(|e| e.into()).collect(),
146				right: right.iter().map(|e| e.into()).collect(),
147				alias: alias.clone(),
148			},
149			FlowNodeType::Aggregate {
150				by,
151				map,
152			} => JsonFlowNodeType::Aggregate {
153				by: by.iter().map(|e| e.into()).collect(),
154				map: map.iter().map(|e| e.into()).collect(),
155			},
156			FlowNodeType::Append => JsonFlowNodeType::Append,
157			FlowNodeType::Sort {
158				by,
159			} => JsonFlowNodeType::Sort {
160				by: by.clone(),
161			},
162			FlowNodeType::Take {
163				limit,
164			} => JsonFlowNodeType::Take {
165				limit: *limit,
166			},
167			FlowNodeType::Distinct {
168				expressions,
169			} => JsonFlowNodeType::Distinct {
170				expressions: expressions.iter().map(|e| e.into()).collect(),
171			},
172			FlowNodeType::Apply {
173				operator,
174				expressions,
175			} => JsonFlowNodeType::Apply {
176				operator: operator.clone(),
177				expressions: expressions.iter().map(|e| e.into()).collect(),
178			},
179			FlowNodeType::SinkTableView {
180				view,
181				..
182			}
183			| FlowNodeType::SinkRingBufferView {
184				view,
185				..
186			}
187			| FlowNodeType::SinkSeriesView {
188				view,
189				..
190			} => JsonFlowNodeType::SinkView {
191				view: view.0,
192			},
193			FlowNodeType::SinkSubscription {
194				subscription,
195			} => JsonFlowNodeType::SinkSubscription {
196				subscription: subscription.0.to_string(),
197			},
198			FlowNodeType::Window {
199				kind,
200				group_by,
201				aggregations,
202				ts,
203			} => JsonFlowNodeType::Window {
204				kind: kind.clone(),
205				group_by: group_by.iter().map(|e| e.into()).collect(),
206				aggregations: aggregations.iter().map(|e| e.into()).collect(),
207				ts: ts.clone(),
208			},
209		}
210	}
211}
212
213pub struct FlowNodeToJson {
214	info: FunctionInfo,
215}
216
217impl Default for FlowNodeToJson {
218	fn default() -> Self {
219		Self::new()
220	}
221}
222
223impl FlowNodeToJson {
224	pub fn new() -> Self {
225		Self {
226			info: FunctionInfo::new("flow_node::to_json"),
227		}
228	}
229}
230
231impl Function for FlowNodeToJson {
232	fn info(&self) -> &FunctionInfo {
233		&self.info
234	}
235
236	fn capabilities(&self) -> &[FunctionCapability] {
237		&[FunctionCapability::Scalar]
238	}
239
240	fn return_type(&self, _input_types: &[Type]) -> Type {
241		Type::Utf8
242	}
243
244	fn execute(&self, ctx: &FunctionContext, args: &Columns) -> Result<Columns, FunctionError> {
245		if args.is_empty() {
246			return Ok(Columns::new(vec![Column::new(
247				ctx.fragment.clone(),
248				ColumnData::utf8(Vec::<String>::new()),
249			)]));
250		}
251
252		if args.len() != 1 {
253			return Err(FunctionError::ArityMismatch {
254				function: ctx.fragment.clone(),
255				expected: 1,
256				actual: args.len(),
257			});
258		}
259
260		let column = &args[0];
261		let (data, bitvec) = column.data().unwrap_option();
262		let row_count = data.len();
263
264		match data {
265			ColumnData::Blob {
266				container,
267				..
268			} => {
269				let mut result_data = Vec::with_capacity(row_count);
270
271				for i in 0..row_count {
272					if container.is_defined(i) {
273						let blob = &container[i];
274						let bytes = blob.as_bytes();
275
276						// Deserialize from postcard
277						let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
278							Error(Box::new(internal!(
279								"Failed to deserialize FlowNodeType: {}",
280								e
281							)))
282						})?;
283
284						// Convert to JsonFlowNodeType for clean serialization
285						let json_node_type: JsonFlowNodeType = (&node_type).into();
286
287						// Serialize to JSON (untagged - extract inner value only)
288						let json_value = to_value(&json_node_type).map_err(|e| {
289							Error(Box::new(internal!(
290								"Failed to serialize FlowNodeType to JSON: {}",
291								e
292							)))
293						})?;
294
295						// Extract the inner object from the tagged enum {"variant_name": {...}}
296						let inner_value = match json_value {
297							JsonValue::Object(map) if map.len() == 1 => map
298								.into_iter()
299								.next()
300								.map(|(_, v)| v)
301								.unwrap_or(JsonValue::Null),
302							JsonValue::String(_) => {
303								// Unit variants serialize as strings, return null for
304								// untagged
305								JsonValue::Null
306							}
307							other => other,
308						};
309
310						let json = to_string(&inner_value).map_err(|e| {
311							Error(Box::new(internal!(
312								"Failed to serialize FlowNodeType to JSON: {}",
313								e
314							)))
315						})?;
316
317						result_data.push(json);
318					} else {
319						result_data.push(String::new());
320					}
321				}
322
323				let result_col_data = ColumnData::utf8(result_data);
324				let final_data = match bitvec {
325					Some(bv) => ColumnData::Option {
326						inner: Box::new(result_col_data),
327						bitvec: bv.clone(),
328					},
329					None => result_col_data,
330				};
331				Ok(Columns::new(vec![Column::new(ctx.fragment.clone(), final_data)]))
332			}
333			_ => Err(FunctionError::ExecutionFailed {
334				function: ctx.fragment.clone(),
335				reason: "flow_node::to_json only supports Blob input".to_string(),
336			}),
337		}
338	}
339}