reifydb_routine/function/flow/
to_json.rs1use postcard::from_bytes;
5use reifydb_core::{
6 common::{JoinType, WindowKind},
7 internal,
8 sort::SortKey,
9 value::column::data::ColumnData,
10};
11use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
12use reifydb_type::{error::Error, value::r#type::Type};
13use serde::Serialize;
14use serde_json::{Value as JsonValue, to_string, to_value};
15
16use crate::function::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionResult, propagate_options};
17
18#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23 SourceInlineData {},
24 SourceTable {
25 table: u64,
26 },
27 SourceView {
28 view: u64,
29 },
30 SourceFlow {
31 flow: u64,
32 },
33 SourceRingBuffer {
34 ringbuffer: u64,
35 },
36 SourceSeries {
37 series: u64,
38 },
39 Filter {
40 conditions: Vec<JsonExpression>,
41 },
42 Gate {
43 conditions: Vec<JsonExpression>,
44 },
45 Map {
46 expressions: Vec<JsonExpression>,
47 },
48 Extend {
49 expressions: Vec<JsonExpression>,
50 },
51 Join {
52 join_type: JoinType,
53 left: Vec<JsonExpression>,
54 right: Vec<JsonExpression>,
55 alias: Option<String>,
56 },
57 Aggregate {
58 by: Vec<JsonExpression>,
59 map: Vec<JsonExpression>,
60 },
61 Append,
62 Sort {
63 by: Vec<SortKey>,
64 },
65 Take {
66 limit: usize,
67 },
68 Distinct {
69 expressions: Vec<JsonExpression>,
70 },
71 Apply {
72 operator: String,
73 expressions: Vec<JsonExpression>,
74 },
75 SinkView {
76 view: u64,
77 },
78 SinkSubscription {
79 subscription: String,
80 },
81 Window {
82 kind: WindowKind,
83 group_by: Vec<JsonExpression>,
84 aggregations: Vec<JsonExpression>,
85 ts: Option<String>,
86 },
87}
88
89impl From<&FlowNodeType> for JsonFlowNodeType {
90 fn from(node_type: &FlowNodeType) -> Self {
91 match node_type {
92 FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
93 FlowNodeType::SourceTable {
94 table,
95 } => JsonFlowNodeType::SourceTable {
96 table: table.0,
97 },
98 FlowNodeType::SourceView {
99 view,
100 } => JsonFlowNodeType::SourceView {
101 view: view.0,
102 },
103 FlowNodeType::SourceFlow {
104 flow,
105 } => JsonFlowNodeType::SourceFlow {
106 flow: flow.0,
107 },
108 FlowNodeType::SourceRingBuffer {
109 ringbuffer,
110 } => JsonFlowNodeType::SourceRingBuffer {
111 ringbuffer: ringbuffer.0,
112 },
113 FlowNodeType::SourceSeries {
114 series,
115 } => JsonFlowNodeType::SourceSeries {
116 series: series.0,
117 },
118 FlowNodeType::Filter {
119 conditions,
120 } => JsonFlowNodeType::Filter {
121 conditions: conditions.iter().map(|e| e.into()).collect(),
122 },
123 FlowNodeType::Gate {
124 conditions,
125 } => JsonFlowNodeType::Gate {
126 conditions: conditions.iter().map(|e| e.into()).collect(),
127 },
128 FlowNodeType::Map {
129 expressions,
130 } => JsonFlowNodeType::Map {
131 expressions: expressions.iter().map(|e| e.into()).collect(),
132 },
133 FlowNodeType::Extend {
134 expressions,
135 } => JsonFlowNodeType::Extend {
136 expressions: expressions.iter().map(|e| e.into()).collect(),
137 },
138 FlowNodeType::Join {
139 join_type,
140 left,
141 right,
142 alias,
143 } => JsonFlowNodeType::Join {
144 join_type: *join_type,
145 left: left.iter().map(|e| e.into()).collect(),
146 right: right.iter().map(|e| e.into()).collect(),
147 alias: alias.clone(),
148 },
149 FlowNodeType::Aggregate {
150 by,
151 map,
152 } => JsonFlowNodeType::Aggregate {
153 by: by.iter().map(|e| e.into()).collect(),
154 map: map.iter().map(|e| e.into()).collect(),
155 },
156 FlowNodeType::Append => JsonFlowNodeType::Append,
157 FlowNodeType::Sort {
158 by,
159 } => JsonFlowNodeType::Sort {
160 by: by.clone(),
161 },
162 FlowNodeType::Take {
163 limit,
164 } => JsonFlowNodeType::Take {
165 limit: *limit,
166 },
167 FlowNodeType::Distinct {
168 expressions,
169 } => JsonFlowNodeType::Distinct {
170 expressions: expressions.iter().map(|e| e.into()).collect(),
171 },
172 FlowNodeType::Apply {
173 operator,
174 expressions,
175 } => JsonFlowNodeType::Apply {
176 operator: operator.clone(),
177 expressions: expressions.iter().map(|e| e.into()).collect(),
178 },
179 FlowNodeType::SinkTableView {
180 view,
181 ..
182 }
183 | FlowNodeType::SinkRingBufferView {
184 view,
185 ..
186 }
187 | FlowNodeType::SinkSeriesView {
188 view,
189 ..
190 } => JsonFlowNodeType::SinkView {
191 view: view.0,
192 },
193 FlowNodeType::SinkSubscription {
194 subscription,
195 } => JsonFlowNodeType::SinkSubscription {
196 subscription: subscription.0.to_string(),
197 },
198 FlowNodeType::Window {
199 kind,
200 group_by,
201 aggregations,
202 ts,
203 } => JsonFlowNodeType::Window {
204 kind: kind.clone(),
205 group_by: group_by.iter().map(|e| e.into()).collect(),
206 aggregations: aggregations.iter().map(|e| e.into()).collect(),
207 ts: ts.clone(),
208 },
209 }
210 }
211}
212
213pub struct FlowNodeToJson;
214
215impl FlowNodeToJson {
216 pub fn new() -> Self {
217 Self
218 }
219}
220
221impl ScalarFunction for FlowNodeToJson {
222 fn scalar(&self, ctx: ScalarFunctionContext) -> ScalarFunctionResult<ColumnData> {
223 if let Some(result) = propagate_options(self, &ctx) {
224 return result;
225 }
226
227 let columns = ctx.columns;
228 let row_count = ctx.row_count;
229
230 if columns.is_empty() {
231 return Ok(ColumnData::utf8(Vec::<String>::new()));
232 }
233
234 let column = columns.get(0).unwrap();
235
236 match &column.data() {
237 ColumnData::Blob {
238 container,
239 ..
240 } => {
241 let mut result_data = Vec::with_capacity(row_count);
242
243 for i in 0..row_count {
244 if container.is_defined(i) {
245 let blob = &container[i];
246 let bytes = blob.as_bytes();
247
248 let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
250 Error(internal!("Failed to deserialize FlowNodeType: {}", e))
251 })?;
252
253 let json_node_type: JsonFlowNodeType = (&node_type).into();
255
256 let json_value = to_value(&json_node_type).map_err(|e| {
258 Error(internal!(
259 "Failed to serialize FlowNodeType to JSON: {}",
260 e
261 ))
262 })?;
263
264 let inner_value = match json_value {
266 JsonValue::Object(map) if map.len() == 1 => map
267 .into_iter()
268 .next()
269 .map(|(_, v)| v)
270 .unwrap_or(JsonValue::Null),
271 JsonValue::String(_) => {
272 JsonValue::Null
275 }
276 other => other,
277 };
278
279 let json = to_string(&inner_value).map_err(|e| {
280 Error(internal!(
281 "Failed to serialize FlowNodeType to JSON: {}",
282 e
283 ))
284 })?;
285
286 result_data.push(json);
287 } else {
288 result_data.push(String::new());
289 }
290 }
291
292 Ok(ColumnData::utf8(result_data))
293 }
294 _ => Err(Error(internal!("flow_node::to_json only supports Blob input")).into()),
295 }
296 }
297
298 fn return_type(&self, _input_types: &[Type]) -> Type {
299 Type::Utf8
300 }
301}