reifydb_routine/function/flow/
to_json.rs1use postcard::from_bytes;
5use reifydb_core::{
6 common::{JoinType, WindowKind},
7 internal,
8 sort::SortKey,
9 value::column::{Column, columns::Columns, data::ColumnData},
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::function::{Function, FunctionCapability, FunctionContext, FunctionInfo, error::FunctionError};
17
18#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23 SourceInlineData {},
24 SourceTable {
25 table: u64,
26 },
27 SourceView {
28 view: u64,
29 },
30 SourceFlow {
31 flow: u64,
32 },
33 SourceRingBuffer {
34 ringbuffer: u64,
35 },
36 SourceSeries {
37 series: u64,
38 },
39 Filter {
40 conditions: Vec<JsonExpression>,
41 },
42 Gate {
43 conditions: Vec<JsonExpression>,
44 },
45 Map {
46 expressions: Vec<JsonExpression>,
47 },
48 Extend {
49 expressions: Vec<JsonExpression>,
50 },
51 Join {
52 join_type: JoinType,
53 left: Vec<JsonExpression>,
54 right: Vec<JsonExpression>,
55 alias: Option<String>,
56 },
57 Aggregate {
58 by: Vec<JsonExpression>,
59 map: Vec<JsonExpression>,
60 },
61 Append,
62 Sort {
63 by: Vec<SortKey>,
64 },
65 Take {
66 limit: usize,
67 },
68 Distinct {
69 expressions: Vec<JsonExpression>,
70 },
71 Apply {
72 operator: String,
73 expressions: Vec<JsonExpression>,
74 },
75 SinkView {
76 view: u64,
77 },
78 SinkSubscription {
79 subscription: String,
80 },
81 Window {
82 kind: WindowKind,
83 group_by: Vec<JsonExpression>,
84 aggregations: Vec<JsonExpression>,
85 ts: Option<String>,
86 },
87}
88
89impl From<&FlowNodeType> for JsonFlowNodeType {
90 fn from(node_type: &FlowNodeType) -> Self {
91 match node_type {
92 FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
93 FlowNodeType::SourceTable {
94 table,
95 } => JsonFlowNodeType::SourceTable {
96 table: table.0,
97 },
98 FlowNodeType::SourceView {
99 view,
100 } => JsonFlowNodeType::SourceView {
101 view: view.0,
102 },
103 FlowNodeType::SourceFlow {
104 flow,
105 } => JsonFlowNodeType::SourceFlow {
106 flow: flow.0,
107 },
108 FlowNodeType::SourceRingBuffer {
109 ringbuffer,
110 } => JsonFlowNodeType::SourceRingBuffer {
111 ringbuffer: ringbuffer.0,
112 },
113 FlowNodeType::SourceSeries {
114 series,
115 } => JsonFlowNodeType::SourceSeries {
116 series: series.0,
117 },
118 FlowNodeType::Filter {
119 conditions,
120 } => JsonFlowNodeType::Filter {
121 conditions: conditions.iter().map(|e| e.into()).collect(),
122 },
123 FlowNodeType::Gate {
124 conditions,
125 } => JsonFlowNodeType::Gate {
126 conditions: conditions.iter().map(|e| e.into()).collect(),
127 },
128 FlowNodeType::Map {
129 expressions,
130 } => JsonFlowNodeType::Map {
131 expressions: expressions.iter().map(|e| e.into()).collect(),
132 },
133 FlowNodeType::Extend {
134 expressions,
135 } => JsonFlowNodeType::Extend {
136 expressions: expressions.iter().map(|e| e.into()).collect(),
137 },
138 FlowNodeType::Join {
139 join_type,
140 left,
141 right,
142 alias,
143 } => JsonFlowNodeType::Join {
144 join_type: *join_type,
145 left: left.iter().map(|e| e.into()).collect(),
146 right: right.iter().map(|e| e.into()).collect(),
147 alias: alias.clone(),
148 },
149 FlowNodeType::Aggregate {
150 by,
151 map,
152 } => JsonFlowNodeType::Aggregate {
153 by: by.iter().map(|e| e.into()).collect(),
154 map: map.iter().map(|e| e.into()).collect(),
155 },
156 FlowNodeType::Append => JsonFlowNodeType::Append,
157 FlowNodeType::Sort {
158 by,
159 } => JsonFlowNodeType::Sort {
160 by: by.clone(),
161 },
162 FlowNodeType::Take {
163 limit,
164 } => JsonFlowNodeType::Take {
165 limit: *limit,
166 },
167 FlowNodeType::Distinct {
168 expressions,
169 } => JsonFlowNodeType::Distinct {
170 expressions: expressions.iter().map(|e| e.into()).collect(),
171 },
172 FlowNodeType::Apply {
173 operator,
174 expressions,
175 } => JsonFlowNodeType::Apply {
176 operator: operator.clone(),
177 expressions: expressions.iter().map(|e| e.into()).collect(),
178 },
179 FlowNodeType::SinkTableView {
180 view,
181 ..
182 }
183 | FlowNodeType::SinkRingBufferView {
184 view,
185 ..
186 }
187 | FlowNodeType::SinkSeriesView {
188 view,
189 ..
190 } => JsonFlowNodeType::SinkView {
191 view: view.0,
192 },
193 FlowNodeType::SinkSubscription {
194 subscription,
195 } => JsonFlowNodeType::SinkSubscription {
196 subscription: subscription.0.to_string(),
197 },
198 FlowNodeType::Window {
199 kind,
200 group_by,
201 aggregations,
202 ts,
203 } => JsonFlowNodeType::Window {
204 kind: kind.clone(),
205 group_by: group_by.iter().map(|e| e.into()).collect(),
206 aggregations: aggregations.iter().map(|e| e.into()).collect(),
207 ts: ts.clone(),
208 },
209 }
210 }
211}
212
213pub struct FlowNodeToJson {
214 info: FunctionInfo,
215}
216
217impl Default for FlowNodeToJson {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223impl FlowNodeToJson {
224 pub fn new() -> Self {
225 Self {
226 info: FunctionInfo::new("flow_node::to_json"),
227 }
228 }
229}
230
231impl Function for FlowNodeToJson {
232 fn info(&self) -> &FunctionInfo {
233 &self.info
234 }
235
236 fn capabilities(&self) -> &[FunctionCapability] {
237 &[FunctionCapability::Scalar]
238 }
239
240 fn return_type(&self, _input_types: &[Type]) -> Type {
241 Type::Utf8
242 }
243
244 fn execute(&self, ctx: &FunctionContext, args: &Columns) -> Result<Columns, FunctionError> {
245 if args.is_empty() {
246 return Ok(Columns::new(vec![Column::new(
247 ctx.fragment.clone(),
248 ColumnData::utf8(Vec::<String>::new()),
249 )]));
250 }
251
252 if args.len() != 1 {
253 return Err(FunctionError::ArityMismatch {
254 function: ctx.fragment.clone(),
255 expected: 1,
256 actual: args.len(),
257 });
258 }
259
260 let column = &args[0];
261 let (data, bitvec) = column.data().unwrap_option();
262 let row_count = data.len();
263
264 match data {
265 ColumnData::Blob {
266 container,
267 ..
268 } => {
269 let mut result_data = Vec::with_capacity(row_count);
270
271 for i in 0..row_count {
272 if container.is_defined(i) {
273 let blob = &container[i];
274 let bytes = blob.as_bytes();
275
276 let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
278 Error(Box::new(internal!(
279 "Failed to deserialize FlowNodeType: {}",
280 e
281 )))
282 })?;
283
284 let json_node_type: JsonFlowNodeType = (&node_type).into();
286
287 let json_value = to_value(&json_node_type).map_err(|e| {
289 Error(Box::new(internal!(
290 "Failed to serialize FlowNodeType to JSON: {}",
291 e
292 )))
293 })?;
294
295 let inner_value = match json_value {
297 JsonValue::Object(map) if map.len() == 1 => map
298 .into_iter()
299 .next()
300 .map(|(_, v)| v)
301 .unwrap_or(JsonValue::Null),
302 JsonValue::String(_) => {
303 JsonValue::Null
306 }
307 other => other,
308 };
309
310 let json = to_string(&inner_value).map_err(|e| {
311 Error(Box::new(internal!(
312 "Failed to serialize FlowNodeType to JSON: {}",
313 e
314 )))
315 })?;
316
317 result_data.push(json);
318 } else {
319 result_data.push(String::new());
320 }
321 }
322
323 let result_col_data = ColumnData::utf8(result_data);
324 let final_data = match bitvec {
325 Some(bv) => ColumnData::Option {
326 inner: Box::new(result_col_data),
327 bitvec: bv.clone(),
328 },
329 None => result_col_data,
330 };
331 Ok(Columns::new(vec![Column::new(ctx.fragment.clone(), final_data)]))
332 }
333 _ => Err(FunctionError::ExecutionFailed {
334 function: ctx.fragment.clone(),
335 reason: "flow_node::to_json only supports Blob input".to_string(),
336 }),
337 }
338 }
339}