reifydb-routine 0.4.5

Built-in routines (functions and procedures) for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use postcard::from_bytes;
use reifydb_core::{
	common::{JoinType, WindowKind},
	internal,
	sort::SortKey,
	value::column::data::ColumnData,
};
use reifydb_rql::{expression::json::JsonExpression, flow::node::FlowNodeType};
use reifydb_type::{error::Error, value::r#type::Type};
use serde::Serialize;
use serde_json::{Value as JsonValue, to_string, to_value};

use crate::function::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionResult, propagate_options};

/// JSON-serializable version of FlowNodeType that uses JsonExpression
/// for clean expression serialization without Fragment metadata.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum JsonFlowNodeType {
	SourceInlineData {},
	SourceTable {
		table: u64,
	},
	SourceView {
		view: u64,
	},
	SourceFlow {
		flow: u64,
	},
	SourceRingBuffer {
		ringbuffer: u64,
	},
	SourceSeries {
		series: u64,
	},
	Filter {
		conditions: Vec<JsonExpression>,
	},
	Gate {
		conditions: Vec<JsonExpression>,
	},
	Map {
		expressions: Vec<JsonExpression>,
	},
	Extend {
		expressions: Vec<JsonExpression>,
	},
	Join {
		join_type: JoinType,
		left: Vec<JsonExpression>,
		right: Vec<JsonExpression>,
		alias: Option<String>,
	},
	Aggregate {
		by: Vec<JsonExpression>,
		map: Vec<JsonExpression>,
	},
	Append,
	Sort {
		by: Vec<SortKey>,
	},
	Take {
		limit: usize,
	},
	Distinct {
		expressions: Vec<JsonExpression>,
	},
	Apply {
		operator: String,
		expressions: Vec<JsonExpression>,
	},
	SinkView {
		view: u64,
	},
	SinkSubscription {
		subscription: String,
	},
	Window {
		kind: WindowKind,
		group_by: Vec<JsonExpression>,
		aggregations: Vec<JsonExpression>,
		ts: Option<String>,
	},
}

impl From<&FlowNodeType> for JsonFlowNodeType {
	fn from(node_type: &FlowNodeType) -> Self {
		match node_type {
			FlowNodeType::SourceInlineData {} => JsonFlowNodeType::SourceInlineData {},
			FlowNodeType::SourceTable {
				table,
			} => JsonFlowNodeType::SourceTable {
				table: table.0,
			},
			FlowNodeType::SourceView {
				view,
			} => JsonFlowNodeType::SourceView {
				view: view.0,
			},
			FlowNodeType::SourceFlow {
				flow,
			} => JsonFlowNodeType::SourceFlow {
				flow: flow.0,
			},
			FlowNodeType::SourceRingBuffer {
				ringbuffer,
			} => JsonFlowNodeType::SourceRingBuffer {
				ringbuffer: ringbuffer.0,
			},
			FlowNodeType::SourceSeries {
				series,
			} => JsonFlowNodeType::SourceSeries {
				series: series.0,
			},
			FlowNodeType::Filter {
				conditions,
			} => JsonFlowNodeType::Filter {
				conditions: conditions.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::Gate {
				conditions,
			} => JsonFlowNodeType::Gate {
				conditions: conditions.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::Map {
				expressions,
			} => JsonFlowNodeType::Map {
				expressions: expressions.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::Extend {
				expressions,
			} => JsonFlowNodeType::Extend {
				expressions: expressions.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::Join {
				join_type,
				left,
				right,
				alias,
			} => JsonFlowNodeType::Join {
				join_type: *join_type,
				left: left.iter().map(|e| e.into()).collect(),
				right: right.iter().map(|e| e.into()).collect(),
				alias: alias.clone(),
			},
			FlowNodeType::Aggregate {
				by,
				map,
			} => JsonFlowNodeType::Aggregate {
				by: by.iter().map(|e| e.into()).collect(),
				map: map.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::Append => JsonFlowNodeType::Append,
			FlowNodeType::Sort {
				by,
			} => JsonFlowNodeType::Sort {
				by: by.clone(),
			},
			FlowNodeType::Take {
				limit,
			} => JsonFlowNodeType::Take {
				limit: *limit,
			},
			FlowNodeType::Distinct {
				expressions,
			} => JsonFlowNodeType::Distinct {
				expressions: expressions.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::Apply {
				operator,
				expressions,
			} => JsonFlowNodeType::Apply {
				operator: operator.clone(),
				expressions: expressions.iter().map(|e| e.into()).collect(),
			},
			FlowNodeType::SinkTableView {
				view,
				..
			}
			| FlowNodeType::SinkRingBufferView {
				view,
				..
			}
			| FlowNodeType::SinkSeriesView {
				view,
				..
			} => JsonFlowNodeType::SinkView {
				view: view.0,
			},
			FlowNodeType::SinkSubscription {
				subscription,
			} => JsonFlowNodeType::SinkSubscription {
				subscription: subscription.0.to_string(),
			},
			FlowNodeType::Window {
				kind,
				group_by,
				aggregations,
				ts,
			} => JsonFlowNodeType::Window {
				kind: kind.clone(),
				group_by: group_by.iter().map(|e| e.into()).collect(),
				aggregations: aggregations.iter().map(|e| e.into()).collect(),
				ts: ts.clone(),
			},
		}
	}
}

pub struct FlowNodeToJson;

impl FlowNodeToJson {
	pub fn new() -> Self {
		Self
	}
}

impl ScalarFunction for FlowNodeToJson {
	fn scalar(&self, ctx: ScalarFunctionContext) -> ScalarFunctionResult<ColumnData> {
		if let Some(result) = propagate_options(self, &ctx) {
			return result;
		}

		let columns = ctx.columns;
		let row_count = ctx.row_count;

		if columns.is_empty() {
			return Ok(ColumnData::utf8(Vec::<String>::new()));
		}

		let column = columns.get(0).unwrap();

		match &column.data() {
			ColumnData::Blob {
				container,
				..
			} => {
				let mut result_data = Vec::with_capacity(row_count);

				for i in 0..row_count {
					if container.is_defined(i) {
						let blob = &container[i];
						let bytes = blob.as_bytes();

						// Deserialize from postcard
						let node_type: FlowNodeType = from_bytes(bytes).map_err(|e| {
							Error(internal!("Failed to deserialize FlowNodeType: {}", e))
						})?;

						// Convert to JsonFlowNodeType for clean serialization
						let json_node_type: JsonFlowNodeType = (&node_type).into();

						// Serialize to JSON (untagged - extract inner value only)
						let json_value = to_value(&json_node_type).map_err(|e| {
							Error(internal!(
								"Failed to serialize FlowNodeType to JSON: {}",
								e
							))
						})?;

						// Extract the inner object from the tagged enum {"variant_name": {...}}
						let inner_value = match json_value {
							JsonValue::Object(map) if map.len() == 1 => map
								.into_iter()
								.next()
								.map(|(_, v)| v)
								.unwrap_or(JsonValue::Null),
							JsonValue::String(_) => {
								// Unit variants serialize as strings, return null for
								// untagged
								JsonValue::Null
							}
							other => other,
						};

						let json = to_string(&inner_value).map_err(|e| {
							Error(internal!(
								"Failed to serialize FlowNodeType to JSON: {}",
								e
							))
						})?;

						result_data.push(json);
					} else {
						result_data.push(String::new());
					}
				}

				Ok(ColumnData::utf8(result_data))
			}
			_ => Err(Error(internal!("flow_node::to_json only supports Blob input")).into()),
		}
	}

	fn return_type(&self, _input_types: &[Type]) -> Type {
		Type::Utf8
	}
}