Skip to main content

reifydb_function/flow/
to_json.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::time::Duration;
5
6use postcard::from_bytes;
7use reifydb_core::{
8	common::{JoinType, WindowSize, WindowSlide, WindowType},
9	internal,
10	sort::SortKey,
11	value::column::data::ColumnData,
12};
13use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
14use reifydb_type::{error::Error, value::r#type::Type};
15use serde::{Serialize, Serializer};
16use serde_json::{Value as JsonValue, to_string, to_value};
17
18use crate::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionResult, propagate_options};
19
20/// JSON-serializable version of FlowNodeType that uses JsonExpression
21/// for clean expression serialization without Fragment metadata.
22#[derive(Debug, Clone, Serialize)]
23#[serde(rename_all = "snake_case")]
24pub enum JsonFlowNodeType {
25	SourceInlineData {},
26	SourceTable {
27		table: u64,
28	},
29	SourceView {
30		view: u64,
31	},
32	SourceFlow {
33		flow: u64,
34	},
35	SourceRingBuffer {
36		ringbuffer: u64,
37	},
38	SourceSeries {
39		series: u64,
40	},
41	Filter {
42		conditions: Vec<JsonExpression>,
43	},
44	Gate {
45		conditions: Vec<JsonExpression>,
46	},
47	Map {
48		expressions: Vec<JsonExpression>,
49	},
50	Extend {
51		expressions: Vec<JsonExpression>,
52	},
53	Join {
54		join_type: JoinType,
55		left: Vec<JsonExpression>,
56		right: Vec<JsonExpression>,
57		alias: Option<String>,
58	},
59	Aggregate {
60		by: Vec<JsonExpression>,
61		map: Vec<JsonExpression>,
62	},
63	Append,
64	Sort {
65		by: Vec<SortKey>,
66	},
67	Take {
68		limit: usize,
69	},
70	Distinct {
71		expressions: Vec<JsonExpression>,
72	},
73	Apply {
74		operator: String,
75		expressions: Vec<JsonExpression>,
76	},
77	SinkView {
78		view: u64,
79	},
80	SinkSubscription {
81		subscription: String,
82	},
83	Window {
84		window_type: WindowType,
85		size: WindowSize,
86		slide: Option<WindowSlide>,
87		group_by: Vec<JsonExpression>,
88		aggregations: Vec<JsonExpression>,
89		min_events: usize,
90		max_window_count: Option<usize>,
91		#[serde(serialize_with = "serialize_duration_opt")]
92		max_window_age: Option<Duration>,
93	},
94}
95
96fn serialize_duration_opt<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
97where
98	S: Serializer,
99{
100	match duration {
101		Some(d) => serializer.serialize_some(&d.as_secs()),
102		None => serializer.serialize_none(),
103	}
104}
105
106impl From<&FlowNodeType> for JsonFlowNodeType {
107	fn from(node_type: &FlowNodeType) -> Self {
108		match node_type {
109			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
110			FlowNodeType::SourceTable {
111				table,
112			} => JsonFlowNodeType::SourceTable {
113				table: table.0,
114			},
115			FlowNodeType::SourceView {
116				view,
117			} => JsonFlowNodeType::SourceView {
118				view: view.0,
119			},
120			FlowNodeType::SourceFlow {
121				flow,
122			} => JsonFlowNodeType::SourceFlow {
123				flow: flow.0,
124			},
125			FlowNodeType::SourceRingBuffer {
126				ringbuffer,
127			} => JsonFlowNodeType::SourceRingBuffer {
128				ringbuffer: ringbuffer.0,
129			},
130			FlowNodeType::SourceSeries {
131				series,
132			} => JsonFlowNodeType::SourceSeries {
133				series: series.0,
134			},
135			FlowNodeType::Filter {
136				conditions,
137			} => JsonFlowNodeType::Filter {
138				conditions: conditions.iter().map(|e| e.into()).collect(),
139			},
140			FlowNodeType::Gate {
141				conditions,
142			} => JsonFlowNodeType::Gate {
143				conditions: conditions.iter().map(|e| e.into()).collect(),
144			},
145			FlowNodeType::Map {
146				expressions,
147			} => JsonFlowNodeType::Map {
148				expressions: expressions.iter().map(|e| e.into()).collect(),
149			},
150			FlowNodeType::Extend {
151				expressions,
152			} => JsonFlowNodeType::Extend {
153				expressions: expressions.iter().map(|e| e.into()).collect(),
154			},
155			FlowNodeType::Join {
156				join_type,
157				left,
158				right,
159				alias,
160			} => JsonFlowNodeType::Join {
161				join_type: *join_type,
162				left: left.iter().map(|e| e.into()).collect(),
163				right: right.iter().map(|e| e.into()).collect(),
164				alias: alias.clone(),
165			},
166			FlowNodeType::Aggregate {
167				by,
168				map,
169			} => JsonFlowNodeType::Aggregate {
170				by: by.iter().map(|e| e.into()).collect(),
171				map: map.iter().map(|e| e.into()).collect(),
172			},
173			FlowNodeType::Append => JsonFlowNodeType::Append,
174			FlowNodeType::Sort {
175				by,
176			} => JsonFlowNodeType::Sort {
177				by: by.clone(),
178			},
179			FlowNodeType::Take {
180				limit,
181			} => JsonFlowNodeType::Take {
182				limit: *limit,
183			},
184			FlowNodeType::Distinct {
185				expressions,
186			} => JsonFlowNodeType::Distinct {
187				expressions: expressions.iter().map(|e| e.into()).collect(),
188			},
189			FlowNodeType::Apply {
190				operator,
191				expressions,
192			} => JsonFlowNodeType::Apply {
193				operator: operator.clone(),
194				expressions: expressions.iter().map(|e| e.into()).collect(),
195			},
196			FlowNodeType::SinkView {
197				view,
198			} => JsonFlowNodeType::SinkView {
199				view: view.0,
200			},
201			FlowNodeType::SinkSubscription {
202				subscription,
203			} => JsonFlowNodeType::SinkSubscription {
204				subscription: subscription.0.to_string(),
205			},
206			FlowNodeType::Window {
207				window_type,
208				size,
209				slide,
210				group_by,
211				aggregations,
212				min_events,
213				max_window_count,
214				max_window_age,
215			} => JsonFlowNodeType::Window {
216				window_type: window_type.clone(),
217				size: size.clone(),
218				slide: slide.clone(),
219				group_by: group_by.iter().map(|e| e.into()).collect(),
220				aggregations: aggregations.iter().map(|e| e.into()).collect(),
221				min_events: *min_events,
222				max_window_count: *max_window_count,
223				max_window_age: *max_window_age,
224			},
225		}
226	}
227}
228
229pub struct FlowNodeToJson;
230
231impl FlowNodeToJson {
232	pub fn new() -> Self {
233		Self
234	}
235}
236
237impl ScalarFunction for FlowNodeToJson {
238	fn scalar(&self, ctx: ScalarFunctionContext) -> ScalarFunctionResult<ColumnData> {
239		if let Some(result) = propagate_options(self, &ctx) {
240			return result;
241		}
242
243		let columns = ctx.columns;
244		let row_count = ctx.row_count;
245
246		if columns.is_empty() {
247			return Ok(ColumnData::utf8(Vec::<String>::new()));
248		}
249
250		let column = columns.get(0).unwrap();
251
252		match &column.data() {
253			ColumnData::Blob {
254				container,
255				..
256			} => {
257				let mut result_data = Vec::with_capacity(row_count);
258
259				for i in 0..row_count {
260					if container.is_defined(i) {
261						let blob = &container[i];
262						let bytes = blob.as_bytes();
263
264						// Deserialize from postcard
265						let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
266							Error(internal!("Failed to deserialize FlowNodeType: {}", e))
267						})?;
268
269						// Convert to JsonFlowNodeType for clean serialization
270						let json_node_type: JsonFlowNodeType = (&node_type).into();
271
272						// Serialize to JSON (untagged - extract inner value only)
273						let json_value = to_value(&json_node_type).map_err(|e| {
274							Error(internal!(
275								"Failed to serialize FlowNodeType to JSON: {}",
276								e
277							))
278						})?;
279
280						// Extract the inner object from the tagged enum {"variant_name": {...}}
281						let inner_value = match json_value {
282							JsonValue::Object(map) if map.len() == 1 => map
283								.into_iter()
284								.next()
285								.map(|(_, v)| v)
286								.unwrap_or(JsonValue::Null),
287							JsonValue::String(_) => {
288								// Unit variants serialize as strings, return null for
289								// untagged
290								JsonValue::Null
291							}
292							other => other,
293						};
294
295						let json = to_string(&inner_value).map_err(|e| {
296							Error(internal!(
297								"Failed to serialize FlowNodeType to JSON: {}",
298								e
299							))
300						})?;
301
302						result_data.push(json);
303					} else {
304						result_data.push(String::new());
305					}
306				}
307
308				Ok(ColumnData::utf8(result_data))
309			}
310			_ => Err(Error(internal!("flow_node::to_json only supports Blob input")).into()),
311		}
312	}
313
314	fn return_type(&self, _input_types: &[Type]) -> Type {
315		Type::Utf8
316	}
317}