1use std::time::Duration;
5
6use postcard::from_bytes;
7use reifydb_core::{
8 common::{JoinType, WindowSize, WindowSlide, WindowType},
9 internal,
10 sort::SortKey,
11 value::column::data::ColumnData,
12};
13use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
14use reifydb_type::{error::Error, value::r#type::Type};
15use serde::{Serialize, Serializer};
16use serde_json::{Value as JsonValue, to_string, to_value};
17
18use crate::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionResult, propagate_options};
19
20#[derive(Debug, Clone, Serialize)]
23#[serde(rename_all = "snake_case")]
24pub enum JsonFlowNodeType {
25 SourceInlineData {},
26 SourceTable {
27 table: u64,
28 },
29 SourceView {
30 view: u64,
31 },
32 SourceFlow {
33 flow: u64,
34 },
35 SourceRingBuffer {
36 ringbuffer: u64,
37 },
38 SourceSeries {
39 series: u64,
40 },
41 Filter {
42 conditions: Vec<JsonExpression>,
43 },
44 Gate {
45 conditions: Vec<JsonExpression>,
46 },
47 Map {
48 expressions: Vec<JsonExpression>,
49 },
50 Extend {
51 expressions: Vec<JsonExpression>,
52 },
53 Join {
54 join_type: JoinType,
55 left: Vec<JsonExpression>,
56 right: Vec<JsonExpression>,
57 alias: Option<String>,
58 },
59 Aggregate {
60 by: Vec<JsonExpression>,
61 map: Vec<JsonExpression>,
62 },
63 Append,
64 Sort {
65 by: Vec<SortKey>,
66 },
67 Take {
68 limit: usize,
69 },
70 Distinct {
71 expressions: Vec<JsonExpression>,
72 },
73 Apply {
74 operator: String,
75 expressions: Vec<JsonExpression>,
76 },
77 SinkView {
78 view: u64,
79 },
80 SinkSubscription {
81 subscription: String,
82 },
83 Window {
84 window_type: WindowType,
85 size: WindowSize,
86 slide: Option<WindowSlide>,
87 group_by: Vec<JsonExpression>,
88 aggregations: Vec<JsonExpression>,
89 min_events: usize,
90 max_window_count: Option<usize>,
91 #[serde(serialize_with = "serialize_duration_opt")]
92 max_window_age: Option<Duration>,
93 },
94}
95
96fn serialize_duration_opt<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
97where
98 S: Serializer,
99{
100 match duration {
101 Some(d) => serializer.serialize_some(&d.as_secs()),
102 None => serializer.serialize_none(),
103 }
104}
105
106impl From<&FlowNodeType> for JsonFlowNodeType {
107 fn from(node_type: &FlowNodeType) -> Self {
108 match node_type {
109 FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
110 FlowNodeType::SourceTable {
111 table,
112 } => JsonFlowNodeType::SourceTable {
113 table: table.0,
114 },
115 FlowNodeType::SourceView {
116 view,
117 } => JsonFlowNodeType::SourceView {
118 view: view.0,
119 },
120 FlowNodeType::SourceFlow {
121 flow,
122 } => JsonFlowNodeType::SourceFlow {
123 flow: flow.0,
124 },
125 FlowNodeType::SourceRingBuffer {
126 ringbuffer,
127 } => JsonFlowNodeType::SourceRingBuffer {
128 ringbuffer: ringbuffer.0,
129 },
130 FlowNodeType::SourceSeries {
131 series,
132 } => JsonFlowNodeType::SourceSeries {
133 series: series.0,
134 },
135 FlowNodeType::Filter {
136 conditions,
137 } => JsonFlowNodeType::Filter {
138 conditions: conditions.iter().map(|e| e.into()).collect(),
139 },
140 FlowNodeType::Gate {
141 conditions,
142 } => JsonFlowNodeType::Gate {
143 conditions: conditions.iter().map(|e| e.into()).collect(),
144 },
145 FlowNodeType::Map {
146 expressions,
147 } => JsonFlowNodeType::Map {
148 expressions: expressions.iter().map(|e| e.into()).collect(),
149 },
150 FlowNodeType::Extend {
151 expressions,
152 } => JsonFlowNodeType::Extend {
153 expressions: expressions.iter().map(|e| e.into()).collect(),
154 },
155 FlowNodeType::Join {
156 join_type,
157 left,
158 right,
159 alias,
160 } => JsonFlowNodeType::Join {
161 join_type: *join_type,
162 left: left.iter().map(|e| e.into()).collect(),
163 right: right.iter().map(|e| e.into()).collect(),
164 alias: alias.clone(),
165 },
166 FlowNodeType::Aggregate {
167 by,
168 map,
169 } => JsonFlowNodeType::Aggregate {
170 by: by.iter().map(|e| e.into()).collect(),
171 map: map.iter().map(|e| e.into()).collect(),
172 },
173 FlowNodeType::Append => JsonFlowNodeType::Append,
174 FlowNodeType::Sort {
175 by,
176 } => JsonFlowNodeType::Sort {
177 by: by.clone(),
178 },
179 FlowNodeType::Take {
180 limit,
181 } => JsonFlowNodeType::Take {
182 limit: *limit,
183 },
184 FlowNodeType::Distinct {
185 expressions,
186 } => JsonFlowNodeType::Distinct {
187 expressions: expressions.iter().map(|e| e.into()).collect(),
188 },
189 FlowNodeType::Apply {
190 operator,
191 expressions,
192 } => JsonFlowNodeType::Apply {
193 operator: operator.clone(),
194 expressions: expressions.iter().map(|e| e.into()).collect(),
195 },
196 FlowNodeType::SinkView {
197 view,
198 } => JsonFlowNodeType::SinkView {
199 view: view.0,
200 },
201 FlowNodeType::SinkSubscription {
202 subscription,
203 } => JsonFlowNodeType::SinkSubscription {
204 subscription: subscription.0.to_string(),
205 },
206 FlowNodeType::Window {
207 window_type,
208 size,
209 slide,
210 group_by,
211 aggregations,
212 min_events,
213 max_window_count,
214 max_window_age,
215 } => JsonFlowNodeType::Window {
216 window_type: window_type.clone(),
217 size: size.clone(),
218 slide: slide.clone(),
219 group_by: group_by.iter().map(|e| e.into()).collect(),
220 aggregations: aggregations.iter().map(|e| e.into()).collect(),
221 min_events: *min_events,
222 max_window_count: *max_window_count,
223 max_window_age: *max_window_age,
224 },
225 }
226 }
227}
228
229pub struct FlowNodeToJson;
230
231impl FlowNodeToJson {
232 pub fn new() -> Self {
233 Self
234 }
235}
236
237impl ScalarFunction for FlowNodeToJson {
238 fn scalar(&self, ctx: ScalarFunctionContext) -> ScalarFunctionResult<ColumnData> {
239 if let Some(result) = propagate_options(self, &ctx) {
240 return result;
241 }
242
243 let columns = ctx.columns;
244 let row_count = ctx.row_count;
245
246 if columns.is_empty() {
247 return Ok(ColumnData::utf8(Vec::<String>::new()));
248 }
249
250 let column = columns.get(0).unwrap();
251
252 match &column.data() {
253 ColumnData::Blob {
254 container,
255 ..
256 } => {
257 let mut result_data = Vec::with_capacity(row_count);
258
259 for i in 0..row_count {
260 if container.is_defined(i) {
261 let blob = &container[i];
262 let bytes = blob.as_bytes();
263
264 let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
266 Error(internal!("Failed to deserialize FlowNodeType: {}", e))
267 })?;
268
269 let json_node_type: JsonFlowNodeType = (&node_type).into();
271
272 let json_value = to_value(&json_node_type).map_err(|e| {
274 Error(internal!(
275 "Failed to serialize FlowNodeType to JSON: {}",
276 e
277 ))
278 })?;
279
280 let inner_value = match json_value {
282 JsonValue::Object(map) if map.len() == 1 => map
283 .into_iter()
284 .next()
285 .map(|(_, v)| v)
286 .unwrap_or(JsonValue::Null),
287 JsonValue::String(_) => {
288 JsonValue::Null
291 }
292 other => other,
293 };
294
295 let json = to_string(&inner_value).map_err(|e| {
296 Error(internal!(
297 "Failed to serialize FlowNodeType to JSON: {}",
298 e
299 ))
300 })?;
301
302 result_data.push(json);
303 } else {
304 result_data.push(String::new());
305 }
306 }
307
308 Ok(ColumnData::utf8(result_data))
309 }
310 _ => Err(Error(internal!("flow_node::to_json only supports Blob input")).into()),
311 }
312 }
313
314 fn return_type(&self, _input_types: &[Type]) -> Type {
315 Type::Utf8
316 }
317}