1use std::time::Duration;
5
6use reifydb_core::{
7 common::{JoinType, WindowSize, WindowSlide, WindowType},
8 internal,
9 sort::SortKey,
10 value::column::data::ColumnData,
11};
12use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
13use reifydb_type::value::r#type::Type;
14use serde::Serialize;
15
16use crate::{ScalarFunction, ScalarFunctionContext, propagate_options};
17
18#[derive(Debug, Clone, Serialize)]
21#[serde(rename_all = "snake_case")]
22pub enum JsonFlowNodeType {
23 SourceInlineData {},
24 SourceTable {
25 table: u64,
26 },
27 SourceView {
28 view: u64,
29 },
30 SourceFlow {
31 flow: u64,
32 },
33 Filter {
34 conditions: Vec<JsonExpression>,
35 },
36 Map {
37 expressions: Vec<JsonExpression>,
38 },
39 Extend {
40 expressions: Vec<JsonExpression>,
41 },
42 Join {
43 join_type: JoinType,
44 left: Vec<JsonExpression>,
45 right: Vec<JsonExpression>,
46 alias: Option<String>,
47 },
48 Aggregate {
49 by: Vec<JsonExpression>,
50 map: Vec<JsonExpression>,
51 },
52 Append,
53 Sort {
54 by: Vec<SortKey>,
55 },
56 Take {
57 limit: usize,
58 },
59 Distinct {
60 expressions: Vec<JsonExpression>,
61 },
62 Apply {
63 operator: String,
64 expressions: Vec<JsonExpression>,
65 },
66 SinkView {
67 view: u64,
68 },
69 SinkSubscription {
70 subscription: String,
71 },
72 Window {
73 window_type: WindowType,
74 size: WindowSize,
75 slide: Option<WindowSlide>,
76 group_by: Vec<JsonExpression>,
77 aggregations: Vec<JsonExpression>,
78 min_events: usize,
79 max_window_count: Option<usize>,
80 #[serde(serialize_with = "serialize_duration_opt")]
81 max_window_age: Option<Duration>,
82 },
83}
84
85fn serialize_duration_opt<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
86where
87 S: serde::Serializer,
88{
89 match duration {
90 Some(d) => serializer.serialize_some(&d.as_secs()),
91 None => serializer.serialize_none(),
92 }
93}
94
95impl From<&FlowNodeType> for JsonFlowNodeType {
96 fn from(node_type: &FlowNodeType) -> Self {
97 match node_type {
98 FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
99 FlowNodeType::SourceTable {
100 table,
101 } => JsonFlowNodeType::SourceTable {
102 table: table.0,
103 },
104 FlowNodeType::SourceView {
105 view,
106 } => JsonFlowNodeType::SourceView {
107 view: view.0,
108 },
109 FlowNodeType::SourceFlow {
110 flow,
111 } => JsonFlowNodeType::SourceFlow {
112 flow: flow.0,
113 },
114 FlowNodeType::Filter {
115 conditions,
116 } => JsonFlowNodeType::Filter {
117 conditions: conditions.iter().map(|e| e.into()).collect(),
118 },
119 FlowNodeType::Map {
120 expressions,
121 } => JsonFlowNodeType::Map {
122 expressions: expressions.iter().map(|e| e.into()).collect(),
123 },
124 FlowNodeType::Extend {
125 expressions,
126 } => JsonFlowNodeType::Extend {
127 expressions: expressions.iter().map(|e| e.into()).collect(),
128 },
129 FlowNodeType::Join {
130 join_type,
131 left,
132 right,
133 alias,
134 } => JsonFlowNodeType::Join {
135 join_type: *join_type,
136 left: left.iter().map(|e| e.into()).collect(),
137 right: right.iter().map(|e| e.into()).collect(),
138 alias: alias.clone(),
139 },
140 FlowNodeType::Aggregate {
141 by,
142 map,
143 } => JsonFlowNodeType::Aggregate {
144 by: by.iter().map(|e| e.into()).collect(),
145 map: map.iter().map(|e| e.into()).collect(),
146 },
147 FlowNodeType::Append => JsonFlowNodeType::Append,
148 FlowNodeType::Sort {
149 by,
150 } => JsonFlowNodeType::Sort {
151 by: by.clone(),
152 },
153 FlowNodeType::Take {
154 limit,
155 } => JsonFlowNodeType::Take {
156 limit: *limit,
157 },
158 FlowNodeType::Distinct {
159 expressions,
160 } => JsonFlowNodeType::Distinct {
161 expressions: expressions.iter().map(|e| e.into()).collect(),
162 },
163 FlowNodeType::Apply {
164 operator,
165 expressions,
166 } => JsonFlowNodeType::Apply {
167 operator: operator.clone(),
168 expressions: expressions.iter().map(|e| e.into()).collect(),
169 },
170 FlowNodeType::SinkView {
171 view,
172 } => JsonFlowNodeType::SinkView {
173 view: view.0,
174 },
175 FlowNodeType::SinkSubscription {
176 subscription,
177 } => JsonFlowNodeType::SinkSubscription {
178 subscription: subscription.0.to_string(),
179 },
180 FlowNodeType::Window {
181 window_type,
182 size,
183 slide,
184 group_by,
185 aggregations,
186 min_events,
187 max_window_count,
188 max_window_age,
189 } => JsonFlowNodeType::Window {
190 window_type: window_type.clone(),
191 size: size.clone(),
192 slide: slide.clone(),
193 group_by: group_by.iter().map(|e| e.into()).collect(),
194 aggregations: aggregations.iter().map(|e| e.into()).collect(),
195 min_events: *min_events,
196 max_window_count: *max_window_count,
197 max_window_age: *max_window_age,
198 },
199 }
200 }
201}
202
203pub struct FlowNodeToJson;
204
205impl FlowNodeToJson {
206 pub fn new() -> Self {
207 Self
208 }
209}
210
211impl ScalarFunction for FlowNodeToJson {
212 fn scalar(&self, ctx: ScalarFunctionContext) -> crate::error::ScalarFunctionResult<ColumnData> {
213 if let Some(result) = propagate_options(self, &ctx) {
214 return result;
215 }
216
217 let columns = ctx.columns;
218 let row_count = ctx.row_count;
219
220 if columns.is_empty() {
221 return Ok(ColumnData::utf8(Vec::<String>::new()));
222 }
223
224 let column = columns.get(0).unwrap();
225
226 match &column.data() {
227 ColumnData::Blob {
228 container,
229 ..
230 } => {
231 let mut result_data = Vec::with_capacity(row_count);
232
233 for i in 0..row_count {
234 if container.is_defined(i) {
235 let blob = &container[i];
236 let bytes = blob.as_bytes();
237
238 let node_type: FlowNodeType =
240 postcard::from_bytes(bytes).map_err(|e| {
241 reifydb_type::error::Error(internal!(
242 "Failed to deserialize FlowNodeType: {}",
243 e
244 ))
245 })?;
246
247 let json_node_type: JsonFlowNodeType = (&node_type).into();
249
250 let json_value =
252 serde_json::to_value(&json_node_type).map_err(|e| {
253 reifydb_type::error::Error(internal!(
254 "Failed to serialize FlowNodeType to JSON: {}",
255 e
256 ))
257 })?;
258
259 let inner_value = match json_value {
261 serde_json::Value::Object(map) if map.len() == 1 => map
262 .into_iter()
263 .next()
264 .map(|(_, v)| v)
265 .unwrap_or(serde_json::Value::Null),
266 serde_json::Value::String(_) => {
267 serde_json::Value::Null
270 }
271 other => other,
272 };
273
274 let json = serde_json::to_string(&inner_value).map_err(|e| {
275 reifydb_type::error::Error(internal!(
276 "Failed to serialize FlowNodeType to JSON: {}",
277 e
278 ))
279 })?;
280
281 result_data.push(json);
282 } else {
283 result_data.push(String::new());
284 }
285 }
286
287 Ok(ColumnData::utf8(result_data))
288 }
289 _ => Err(reifydb_type::error::Error(internal!("flow_node::to_json only supports Blob input"))
290 .into()),
291 }
292 }
293
294 fn return_type(&self, _input_types: &[Type]) -> Type {
295 Type::Utf8
296 }
297}