Skip to main content

reifydb_routine/function/flow/
to_json.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::from_bytes;
5use reifydb_core::{
6	common::{JoinType, WindowKind},
7	internal,
8	sort::SortKey,
9	value::column::data::ColumnData,
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::function::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionResult, propagate_options};
17
18/// JSON-serializable version of FlowNodeType that uses JsonExpression
19/// for clean expression serialization without Fragment metadata.
20#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23	SourceInlineData {},
24	SourceTable {
25		table: u64,
26	},
27	SourceView {
28		view: u64,
29	},
30	SourceFlow {
31		flow: u64,
32	},
33	SourceRingBuffer {
34		ringbuffer: u64,
35	},
36	SourceSeries {
37		series: u64,
38	},
39	Filter {
40		conditions: Vec<JsonExpression>,
41	},
42	Gate {
43		conditions: Vec<JsonExpression>,
44	},
45	Map {
46		expressions: Vec<JsonExpression>,
47	},
48	Extend {
49		expressions: Vec<JsonExpression>,
50	},
51	Join {
52		join_type: JoinType,
53		left: Vec<JsonExpression>,
54		right: Vec<JsonExpression>,
55		alias: Option<String>,
56	},
57	Aggregate {
58		by: Vec<JsonExpression>,
59		map: Vec<JsonExpression>,
60	},
61	Append,
62	Sort {
63		by: Vec<SortKey>,
64	},
65	Take {
66		limit: usize,
67	},
68	Distinct {
69		expressions: Vec<JsonExpression>,
70	},
71	Apply {
72		operator: String,
73		expressions: Vec<JsonExpression>,
74	},
75	SinkView {
76		view: u64,
77	},
78	SinkSubscription {
79		subscription: String,
80	},
81	Window {
82		kind: WindowKind,
83		group_by: Vec<JsonExpression>,
84		aggregations: Vec<JsonExpression>,
85		ts: Option<String>,
86	},
87}
88
89impl From<&FlowNodeType> for JsonFlowNodeType {
90	fn from(node_type: &FlowNodeType) -> Self {
91		match node_type {
92			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
93			FlowNodeType::SourceTable {
94				table,
95			} => JsonFlowNodeType::SourceTable {
96				table: table.0,
97			},
98			FlowNodeType::SourceView {
99				view,
100			} => JsonFlowNodeType::SourceView {
101				view: view.0,
102			},
103			FlowNodeType::SourceFlow {
104				flow,
105			} => JsonFlowNodeType::SourceFlow {
106				flow: flow.0,
107			},
108			FlowNodeType::SourceRingBuffer {
109				ringbuffer,
110			} => JsonFlowNodeType::SourceRingBuffer {
111				ringbuffer: ringbuffer.0,
112			},
113			FlowNodeType::SourceSeries {
114				series,
115			} => JsonFlowNodeType::SourceSeries {
116				series: series.0,
117			},
118			FlowNodeType::Filter {
119				conditions,
120			} => JsonFlowNodeType::Filter {
121				conditions: conditions.iter().map(|e| e.into()).collect(),
122			},
123			FlowNodeType::Gate {
124				conditions,
125			} => JsonFlowNodeType::Gate {
126				conditions: conditions.iter().map(|e| e.into()).collect(),
127			},
128			FlowNodeType::Map {
129				expressions,
130			} => JsonFlowNodeType::Map {
131				expressions: expressions.iter().map(|e| e.into()).collect(),
132			},
133			FlowNodeType::Extend {
134				expressions,
135			} => JsonFlowNodeType::Extend {
136				expressions: expressions.iter().map(|e| e.into()).collect(),
137			},
138			FlowNodeType::Join {
139				join_type,
140				left,
141				right,
142				alias,
143			} => JsonFlowNodeType::Join {
144				join_type: *join_type,
145				left: left.iter().map(|e| e.into()).collect(),
146				right: right.iter().map(|e| e.into()).collect(),
147				alias: alias.clone(),
148			},
149			FlowNodeType::Aggregate {
150				by,
151				map,
152			} => JsonFlowNodeType::Aggregate {
153				by: by.iter().map(|e| e.into()).collect(),
154				map: map.iter().map(|e| e.into()).collect(),
155			},
156			FlowNodeType::Append => JsonFlowNodeType::Append,
157			FlowNodeType::Sort {
158				by,
159			} => JsonFlowNodeType::Sort {
160				by: by.clone(),
161			},
162			FlowNodeType::Take {
163				limit,
164			} => JsonFlowNodeType::Take {
165				limit: *limit,
166			},
167			FlowNodeType::Distinct {
168				expressions,
169			} => JsonFlowNodeType::Distinct {
170				expressions: expressions.iter().map(|e| e.into()).collect(),
171			},
172			FlowNodeType::Apply {
173				operator,
174				expressions,
175			} => JsonFlowNodeType::Apply {
176				operator: operator.clone(),
177				expressions: expressions.iter().map(|e| e.into()).collect(),
178			},
179			FlowNodeType::SinkTableView {
180				view,
181				..
182			}
183			| FlowNodeType::SinkRingBufferView {
184				view,
185				..
186			}
187			| FlowNodeType::SinkSeriesView {
188				view,
189				..
190			} => JsonFlowNodeType::SinkView {
191				view: view.0,
192			},
193			FlowNodeType::SinkSubscription {
194				subscription,
195			} => JsonFlowNodeType::SinkSubscription {
196				subscription: subscription.0.to_string(),
197			},
198			FlowNodeType::Window {
199				kind,
200				group_by,
201				aggregations,
202				ts,
203			} => JsonFlowNodeType::Window {
204				kind: kind.clone(),
205				group_by: group_by.iter().map(|e| e.into()).collect(),
206				aggregations: aggregations.iter().map(|e| e.into()).collect(),
207				ts: ts.clone(),
208			},
209		}
210	}
211}
212
213pub struct FlowNodeToJson;
214
215impl FlowNodeToJson {
216	pub fn new() -> Self {
217		Self
218	}
219}
220
221impl ScalarFunction for FlowNodeToJson {
222	fn scalar(&self, ctx: ScalarFunctionContext) -> ScalarFunctionResult<ColumnData> {
223		if let Some(result) = propagate_options(self, &ctx) {
224			return result;
225		}
226
227		let columns = ctx.columns;
228		let row_count = ctx.row_count;
229
230		if columns.is_empty() {
231			return Ok(ColumnData::utf8(Vec::<String>::new()));
232		}
233
234		let column = columns.get(0).unwrap();
235
236		match &column.data() {
237			ColumnData::Blob {
238				container,
239				..
240			} => {
241				let mut result_data = Vec::with_capacity(row_count);
242
243				for i in 0..row_count {
244					if container.is_defined(i) {
245						let blob = &container[i];
246						let bytes = blob.as_bytes();
247
248						// Deserialize from postcard
249						let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
250							Error(internal!("Failed to deserialize FlowNodeType: {}", e))
251						})?;
252
253						// Convert to JsonFlowNodeType for clean serialization
254						let json_node_type: JsonFlowNodeType = (&node_type).into();
255
256						// Serialize to JSON (untagged - extract inner value only)
257						let json_value = to_value(&json_node_type).map_err(|e| {
258							Error(internal!(
259								"Failed to serialize FlowNodeType to JSON: {}",
260								e
261							))
262						})?;
263
264						// Extract the inner object from the tagged enum {"variant_name": {...}}
265						let inner_value = match json_value {
266							JsonValue::Object(map) if map.len() == 1 => map
267								.into_iter()
268								.next()
269								.map(|(_, v)| v)
270								.unwrap_or(JsonValue::Null),
271							JsonValue::String(_) => {
272								// Unit variants serialize as strings, return null for
273								// untagged
274								JsonValue::Null
275							}
276							other => other,
277						};
278
279						let json = to_string(&inner_value).map_err(|e| {
280							Error(internal!(
281								"Failed to serialize FlowNodeType to JSON: {}",
282								e
283							))
284						})?;
285
286						result_data.push(json);
287					} else {
288						result_data.push(String::new());
289					}
290				}
291
292				Ok(ColumnData::utf8(result_data))
293			}
294			_ => Err(Error(internal!("flow_node::to_json only supports Blob input")).into()),
295		}
296	}
297
298	fn return_type(&self, _input_types: &[Type]) -> Type {
299		Type::Utf8
300	}
301}