Skip to main content

reifydb_routine/function/flow/
to_json.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::from_bytes;
5use reifydb_core::{
6	common::{JoinType, WindowKind},
7	internal,
8	sort::SortKey,
9	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::routine::{Function, FunctionKind, Routine, RoutineInfo, context::FunctionContext, error::RoutineError};
17
18#[derive(Debug, Clone, Serialize)]
19#[serde(rename_all = "snake_case")]
20pub enum JsonFlowNodeType {
21	SourceInlineData {},
22	SourceTable {
23		table: u64,
24	},
25	SourceView {
26		view: u64,
27	},
28	SourceFlow {
29		flow: u64,
30	},
31	SourceRingBuffer {
32		ringbuffer: u64,
33	},
34	SourceSeries {
35		series: u64,
36	},
37	Filter {
38		conditions: Vec<JsonExpression>,
39	},
40	Gate {
41		conditions: Vec<JsonExpression>,
42	},
43	Map {
44		expressions: Vec<JsonExpression>,
45	},
46	Extend {
47		expressions: Vec<JsonExpression>,
48	},
49	Join {
50		join_type: JoinType,
51		left: Vec<JsonExpression>,
52		right: Vec<JsonExpression>,
53		alias: Option<String>,
54	},
55	Aggregate {
56		by: Vec<JsonExpression>,
57		map: Vec<JsonExpression>,
58	},
59	Append,
60	Sort {
61		by: Vec<SortKey>,
62	},
63	Take {
64		limit: usize,
65	},
66	Distinct {
67		expressions: Vec<JsonExpression>,
68	},
69	Apply {
70		operator: String,
71		expressions: Vec<JsonExpression>,
72	},
73	SinkView {
74		view: u64,
75	},
76	SinkSubscription {
77		subscription: String,
78	},
79	Window {
80		kind: WindowKind,
81		group_by: Vec<JsonExpression>,
82		aggregations: Vec<JsonExpression>,
83		ts: Option<String>,
84	},
85}
86
87impl From<&FlowNodeType> for JsonFlowNodeType {
88	fn from(node_type: &FlowNodeType) -> Self {
89		match node_type {
90			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
91			FlowNodeType::SourceTable {
92				table,
93			} => JsonFlowNodeType::SourceTable {
94				table: table.0,
95			},
96			FlowNodeType::SourceView {
97				view,
98			} => JsonFlowNodeType::SourceView {
99				view: view.0,
100			},
101			FlowNodeType::SourceFlow {
102				flow,
103			} => JsonFlowNodeType::SourceFlow {
104				flow: flow.0,
105			},
106			FlowNodeType::SourceRingBuffer {
107				ringbuffer,
108			} => JsonFlowNodeType::SourceRingBuffer {
109				ringbuffer: ringbuffer.0,
110			},
111			FlowNodeType::SourceSeries {
112				series,
113			} => JsonFlowNodeType::SourceSeries {
114				series: series.0,
115			},
116			FlowNodeType::Filter {
117				conditions,
118			} => JsonFlowNodeType::Filter {
119				conditions: conditions.iter().map(|e| e.into()).collect(),
120			},
121			FlowNodeType::Gate {
122				conditions,
123			} => JsonFlowNodeType::Gate {
124				conditions: conditions.iter().map(|e| e.into()).collect(),
125			},
126			FlowNodeType::Map {
127				expressions,
128			} => JsonFlowNodeType::Map {
129				expressions: expressions.iter().map(|e| e.into()).collect(),
130			},
131			FlowNodeType::Extend {
132				expressions,
133			} => JsonFlowNodeType::Extend {
134				expressions: expressions.iter().map(|e| e.into()).collect(),
135			},
136			FlowNodeType::Join {
137				join_type,
138				left,
139				right,
140				alias,
141				ttl: _,
142			} => JsonFlowNodeType::Join {
143				join_type: *join_type,
144				left: left.iter().map(|e| e.into()).collect(),
145				right: right.iter().map(|e| e.into()).collect(),
146				alias: alias.clone(),
147			},
148			FlowNodeType::Aggregate {
149				by,
150				map,
151			} => JsonFlowNodeType::Aggregate {
152				by: by.iter().map(|e| e.into()).collect(),
153				map: map.iter().map(|e| e.into()).collect(),
154			},
155			FlowNodeType::Append => JsonFlowNodeType::Append,
156			FlowNodeType::Sort {
157				by,
158			} => JsonFlowNodeType::Sort {
159				by: by.clone(),
160			},
161			FlowNodeType::Take {
162				limit,
163			} => JsonFlowNodeType::Take {
164				limit: *limit,
165			},
166			FlowNodeType::Distinct {
167				expressions,
168				ttl: _,
169			} => JsonFlowNodeType::Distinct {
170				expressions: expressions.iter().map(|e| e.into()).collect(),
171			},
172			FlowNodeType::Apply {
173				operator,
174				expressions,
175				ttl: _,
176			} => JsonFlowNodeType::Apply {
177				operator: operator.clone(),
178				expressions: expressions.iter().map(|e| e.into()).collect(),
179			},
180			FlowNodeType::SinkTableView {
181				view,
182				..
183			}
184			| FlowNodeType::SinkRingBufferView {
185				view,
186				..
187			}
188			| FlowNodeType::SinkSeriesView {
189				view,
190				..
191			} => JsonFlowNodeType::SinkView {
192				view: view.0,
193			},
194			FlowNodeType::SinkSubscription {
195				subscription,
196			} => JsonFlowNodeType::SinkSubscription {
197				subscription: subscription.0.to_string(),
198			},
199			FlowNodeType::Window {
200				kind,
201				group_by,
202				aggregations,
203				ts,
204			} => JsonFlowNodeType::Window {
205				kind: kind.clone(),
206				group_by: group_by.iter().map(|e| e.into()).collect(),
207				aggregations: aggregations.iter().map(|e| e.into()).collect(),
208				ts: ts.clone(),
209			},
210		}
211	}
212}
213
214pub struct FlowNodeToJson {
215	info: RoutineInfo,
216}
217
218impl Default for FlowNodeToJson {
219	fn default() -> Self {
220		Self::new()
221	}
222}
223
224impl FlowNodeToJson {
225	pub fn new() -> Self {
226		Self {
227			info: RoutineInfo::new("flow_node::to_json"),
228		}
229	}
230}
231
232impl<'a> Routine<FunctionContext<'a>> for FlowNodeToJson {
233	fn info(&self) -> &RoutineInfo {
234		&self.info
235	}
236
237	fn return_type(&self, _input_types: &[Type]) -> Type {
238		Type::Utf8
239	}
240
241	fn execute(&self, ctx: &mut FunctionContext<'a>, args: &Columns) -> Result<Columns, RoutineError> {
242		if args.is_empty() {
243			return Ok(Columns::new(vec![ColumnWithName::new(
244				ctx.fragment.clone(),
245				ColumnBuffer::utf8(Vec::<String>::new()),
246			)]));
247		}
248
249		if args.len() != 1 {
250			return Err(RoutineError::FunctionArityMismatch {
251				function: ctx.fragment.clone(),
252				expected: 1,
253				actual: args.len(),
254			});
255		}
256
257		let column = &args[0];
258		let (data, bitvec) = column.unwrap_option();
259		let row_count = data.len();
260
261		match data {
262			ColumnBuffer::Blob {
263				container,
264				..
265			} => {
266				let mut result_data = Vec::with_capacity(row_count);
267
268				for i in 0..row_count {
269					if container.is_defined(i) {
270						let bytes = match container.get(i) {
271							Some(b) => b,
272							None => continue,
273						};
274
275						let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
276							Error(Box::new(internal!(
277								"Failed to deserialize FlowNodeType: {}",
278								e
279							)))
280						})?;
281
282						let json_node_type: JsonFlowNodeType = (&node_type).into();
283
284						let json_value = to_value(&json_node_type).map_err(|e| {
285							Error(Box::new(internal!(
286								"Failed to serialize FlowNodeType to JSON: {}",
287								e
288							)))
289						})?;
290
291						let inner_value = match json_value {
292							JsonValue::Object(map) if map.len() == 1 => map
293								.into_iter()
294								.next()
295								.map(|(_, v)| v)
296								.unwrap_or(JsonValue::Null),
297							JsonValue::String(_) => JsonValue::Null,
298							other => other,
299						};
300
301						let json = to_string(&inner_value).map_err(|e| {
302							Error(Box::new(internal!(
303								"Failed to serialize FlowNodeType to JSON: {}",
304								e
305							)))
306						})?;
307
308						result_data.push(json);
309					} else {
310						result_data.push(String::new());
311					}
312				}
313
314				let result_col_data = ColumnBuffer::utf8(result_data);
315				let final_data = match bitvec {
316					Some(bv) => ColumnBuffer::Option {
317						inner: Box::new(result_col_data),
318						bitvec: bv.clone(),
319					},
320					None => result_col_data,
321				};
322				Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), final_data)]))
323			}
324			_ => Err(RoutineError::FunctionExecutionFailed {
325				function: ctx.fragment.clone(),
326				reason: "flow_node::to_json only supports Blob input".to_string(),
327			}),
328		}
329	}
330}
331
332impl Function for FlowNodeToJson {
333	fn kinds(&self) -> &[FunctionKind] {
334		&[FunctionKind::Scalar]
335	}
336}