Skip to main content

reifydb_function/flow/
to_json.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::time::Duration;
5
6use reifydb_core::{
7	common::{JoinType, WindowSize, WindowSlide, WindowType},
8	internal,
9	sort::SortKey,
10	value::column::data::ColumnData,
11};
12use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
13use reifydb_type::value::r#type::Type;
14use serde::Serialize;
15
16use crate::{ScalarFunction, ScalarFunctionContext, propagate_options};
17
18/// JSON-serializable version of FlowNodeType that uses JsonExpression
19/// for clean expression serialization without Fragment metadata.
20#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23	SourceInlineData {},
24	SourceTable {
25		table: u64,
26	},
27	SourceView {
28		view: u64,
29	},
30	SourceFlow {
31		flow: u64,
32	},
33	Filter {
34		conditions: Vec<JsonExpression>,
35	},
36	Map {
37		expressions: Vec<JsonExpression>,
38	},
39	Extend {
40		expressions: Vec<JsonExpression>,
41	},
42	Join {
43		join_type: JoinType,
44		left: Vec<JsonExpression>,
45		right: Vec<JsonExpression>,
46		alias: Option<String>,
47	},
48	Aggregate {
49		by: Vec<JsonExpression>,
50		map: Vec<JsonExpression>,
51	},
52	Append,
53	Sort {
54		by: Vec<SortKey>,
55	},
56	Take {
57		limit: usize,
58	},
59	Distinct {
60		expressions: Vec<JsonExpression>,
61	},
62	Apply {
63		operator: String,
64		expressions: Vec<JsonExpression>,
65	},
66	SinkView {
67		view: u64,
68	},
69	SinkSubscription {
70		subscription: String,
71	},
72	Window {
73		window_type: WindowType,
74		size: WindowSize,
75		slide: Option<WindowSlide>,
76		group_by: Vec<JsonExpression>,
77		aggregations: Vec<JsonExpression>,
78		min_events: usize,
79		max_window_count: Option<usize>,
80		#[serde(serialize_with = "serialize_duration_opt")]
81		max_window_age: Option<Duration>,
82	},
83}
84
85fn serialize_duration_opt<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
86where
87	S: serde::Serializer,
88{
89	match duration {
90		Some(d) => serializer.serialize_some(&d.as_secs()),
91		None => serializer.serialize_none(),
92	}
93}
94
95impl From<&FlowNodeType> for JsonFlowNodeType {
96	fn from(node_type: &FlowNodeType) -> Self {
97		match node_type {
98			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
99			FlowNodeType::SourceTable {
100				table,
101			} => JsonFlowNodeType::SourceTable {
102				table: table.0,
103			},
104			FlowNodeType::SourceView {
105				view,
106			} => JsonFlowNodeType::SourceView {
107				view: view.0,
108			},
109			FlowNodeType::SourceFlow {
110				flow,
111			} => JsonFlowNodeType::SourceFlow {
112				flow: flow.0,
113			},
114			FlowNodeType::Filter {
115				conditions,
116			} => JsonFlowNodeType::Filter {
117				conditions: conditions.iter().map(|e| e.into()).collect(),
118			},
119			FlowNodeType::Map {
120				expressions,
121			} => JsonFlowNodeType::Map {
122				expressions: expressions.iter().map(|e| e.into()).collect(),
123			},
124			FlowNodeType::Extend {
125				expressions,
126			} => JsonFlowNodeType::Extend {
127				expressions: expressions.iter().map(|e| e.into()).collect(),
128			},
129			FlowNodeType::Join {
130				join_type,
131				left,
132				right,
133				alias,
134			} => JsonFlowNodeType::Join {
135				join_type: *join_type,
136				left: left.iter().map(|e| e.into()).collect(),
137				right: right.iter().map(|e| e.into()).collect(),
138				alias: alias.clone(),
139			},
140			FlowNodeType::Aggregate {
141				by,
142				map,
143			} => JsonFlowNodeType::Aggregate {
144				by: by.iter().map(|e| e.into()).collect(),
145				map: map.iter().map(|e| e.into()).collect(),
146			},
147			FlowNodeType::Append => JsonFlowNodeType::Append,
148			FlowNodeType::Sort {
149				by,
150			} => JsonFlowNodeType::Sort {
151				by: by.clone(),
152			},
153			FlowNodeType::Take {
154				limit,
155			} => JsonFlowNodeType::Take {
156				limit: *limit,
157			},
158			FlowNodeType::Distinct {
159				expressions,
160			} => JsonFlowNodeType::Distinct {
161				expressions: expressions.iter().map(|e| e.into()).collect(),
162			},
163			FlowNodeType::Apply {
164				operator,
165				expressions,
166			} => JsonFlowNodeType::Apply {
167				operator: operator.clone(),
168				expressions: expressions.iter().map(|e| e.into()).collect(),
169			},
170			FlowNodeType::SinkView {
171				view,
172			} => JsonFlowNodeType::SinkView {
173				view: view.0,
174			},
175			FlowNodeType::SinkSubscription {
176				subscription,
177			} => JsonFlowNodeType::SinkSubscription {
178				subscription: subscription.0.to_string(),
179			},
180			FlowNodeType::Window {
181				window_type,
182				size,
183				slide,
184				group_by,
185				aggregations,
186				min_events,
187				max_window_count,
188				max_window_age,
189			} => JsonFlowNodeType::Window {
190				window_type: window_type.clone(),
191				size: size.clone(),
192				slide: slide.clone(),
193				group_by: group_by.iter().map(|e| e.into()).collect(),
194				aggregations: aggregations.iter().map(|e| e.into()).collect(),
195				min_events: *min_events,
196				max_window_count: *max_window_count,
197				max_window_age: *max_window_age,
198			},
199		}
200	}
201}
202
203pub struct FlowNodeToJson;
204
205impl FlowNodeToJson {
206	pub fn new() -> Self {
207		Self
208	}
209}
210
211impl ScalarFunction for FlowNodeToJson {
212	fn scalar(&self, ctx: ScalarFunctionContext) -> crate::error::ScalarFunctionResult<ColumnData> {
213		if let Some(result) = propagate_options(self, &ctx) {
214			return result;
215		}
216
217		let columns = ctx.columns;
218		let row_count = ctx.row_count;
219
220		if columns.is_empty() {
221			return Ok(ColumnData::utf8(Vec::<String>::new()));
222		}
223
224		let column = columns.get(0).unwrap();
225
226		match &column.data() {
227			ColumnData::Blob {
228				container,
229				..
230			} => {
231				let mut result_data = Vec::with_capacity(row_count);
232
233				for i in 0..row_count {
234					if container.is_defined(i) {
235						let blob = &container[i];
236						let bytes = blob.as_bytes();
237
238						// Deserialize from postcard
239						let node_type: FlowNodeType =
240							postcard::from_bytes(bytes).map_err(|e| {
241								reifydb_type::error::Error(internal!(
242									"Failed to deserialize FlowNodeType: {}",
243									e
244								))
245							})?;
246
247						// Convert to JsonFlowNodeType for clean serialization
248						let json_node_type: JsonFlowNodeType = (&node_type).into();
249
250						// Serialize to JSON (untagged - extract inner value only)
251						let json_value =
252							serde_json::to_value(&json_node_type).map_err(|e| {
253								reifydb_type::error::Error(internal!(
254									"Failed to serialize FlowNodeType to JSON: {}",
255									e
256								))
257							})?;
258
259						// Extract the inner object from the tagged enum {"variant_name": {...}}
260						let inner_value = match json_value {
261							serde_json::Value::Object(map) if map.len() == 1 => map
262								.into_iter()
263								.next()
264								.map(|(_, v)| v)
265								.unwrap_or(serde_json::Value::Null),
266							serde_json::Value::String(_) => {
267								// Unit variants serialize as strings, return null for
268								// untagged
269								serde_json::Value::Null
270							}
271							other => other,
272						};
273
274						let json = serde_json::to_string(&inner_value).map_err(|e| {
275							reifydb_type::error::Error(internal!(
276								"Failed to serialize FlowNodeType to JSON: {}",
277								e
278							))
279						})?;
280
281						result_data.push(json);
282					} else {
283						result_data.push(String::new());
284					}
285				}
286
287				Ok(ColumnData::utf8(result_data))
288			}
289			_ => Err(reifydb_type::error::Error(internal!("flow_node::to_json only supports Blob input"))
290				.into()),
291		}
292	}
293
294	fn return_type(&self, _input_types: &[Type]) -> Type {
295		Type::Utf8
296	}
297}