use arrow_array::builder::{BooleanBuilder, Float64Builder, Int64Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::common::Result as DFResult;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::Stream;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use uni_common::Value;
use uni_cypher::ast::Expr;
use crate::query::df_graph::GraphExecutionContext;
use crate::query::df_graph::common::{
arrow_err, compute_plan_properties, evaluate_simple_expr, labels_data_type,
};
use crate::query::df_graph::scan::resolve_property_type;
pub(crate) fn map_yield_to_canonical(yield_name: &str) -> &'static str {
match yield_name.to_lowercase().as_str() {
"vid" | "_vid" => "vid",
"distance" | "dist" | "_distance" => "distance",
"score" | "_score" => "score",
"vector_score" => "vector_score",
"fts_score" => "fts_score",
"raw_score" => "raw_score",
"rerank_score" | "_rerank_score" => "rerank_score",
_ => "node",
}
}
pub(crate) const NODE_YIELD_PROCEDURE_NAMES: &[&str] = &[
"uni.vector.query",
"uni.fts.query",
"uni.search",
"uni.create.vNode",
];
pub(crate) fn is_node_yield_procedure_static(name: &str) -> bool {
NODE_YIELD_PROCEDURE_NAMES.contains(&name)
}
pub(crate) fn canonical_search_type(canonical: &str) -> DataType {
match canonical {
"distance" => DataType::Float64,
"score" | "vector_score" | "fts_score" | "raw_score" | "rerank_score" => DataType::Float32,
"vid" => DataType::Int64,
_ => DataType::Utf8,
}
}
fn expand_node_yield_fields(
output_name: &str,
target_properties: &HashMap<String, Vec<String>>,
graph_ctx: &GraphExecutionContext,
fields: &mut Vec<Field>,
) {
fields.push(Field::new(
format!("{}._vid", output_name),
DataType::UInt64,
false,
));
fields.push(Field::new(output_name, DataType::Utf8, false));
fields.push(Field::new(
format!("{}._labels", output_name),
labels_data_type(),
true,
));
if let Some(props) = target_properties.get(output_name) {
let uni_schema = graph_ctx.storage().schema_manager().schema();
for prop_name in props {
let col_name = format!("{}.{}", output_name, prop_name);
let arrow_type = resolve_property_type(prop_name, None);
let resolved_type = uni_schema
.properties
.values()
.find_map(|label_props| {
label_props
.get(prop_name.as_str())
.map(|_| resolve_property_type(prop_name, Some(label_props)))
})
.unwrap_or(arrow_type);
fields.push(Field::new(&col_name, resolved_type, true));
}
}
}
fn field_from_signature(col_name: &str, sig_field: &Field) -> Field {
let mut new_field = Field::new(
col_name,
sig_field.data_type().clone(),
sig_field.is_nullable(),
);
if !sig_field.metadata().is_empty() {
new_field = new_field.with_metadata(sig_field.metadata().clone());
}
new_field
}
pub struct GraphProcedureCallExec {
graph_ctx: Arc<GraphExecutionContext>,
procedure_name: String,
arguments: Vec<Expr>,
yield_items: Vec<(String, Option<String>)>,
params: HashMap<String, Value>,
outer_values: HashMap<String, Value>,
target_properties: HashMap<String, Vec<String>>,
schema: SchemaRef,
properties: Arc<PlanProperties>,
metrics: ExecutionPlanMetricsSet,
}
impl fmt::Debug for GraphProcedureCallExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GraphProcedureCallExec")
.field("procedure_name", &self.procedure_name)
.field("yield_items", &self.yield_items)
.finish()
}
}
impl GraphProcedureCallExec {
pub fn new(
graph_ctx: Arc<GraphExecutionContext>,
procedure_name: String,
arguments: Vec<Expr>,
yield_items: Vec<(String, Option<String>)>,
params: HashMap<String, Value>,
outer_values: HashMap<String, Value>,
target_properties: HashMap<String, Vec<String>>,
) -> Self {
let schema = Self::build_schema(
&procedure_name,
&yield_items,
&target_properties,
&graph_ctx,
);
let properties = compute_plan_properties(schema.clone());
Self {
graph_ctx,
procedure_name,
arguments,
yield_items,
params,
outer_values,
target_properties,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn build_schema(
procedure_name: &str,
yield_items: &[(String, Option<String>)],
target_properties: &HashMap<String, Vec<String>>,
graph_ctx: &GraphExecutionContext,
) -> SchemaRef {
let mut fields = Vec::new();
if let Some(registry) = graph_ctx.procedure_registry()
&& let Some(entry) = registry.resolve_user_procedure(procedure_name)
{
let supports_node_yield = entry.signature.yields.iter().any(|f| {
f.metadata()
.get("_yield_kind")
.is_some_and(|v| v == "node_vid_source")
});
for (yield_name, alias) in yield_items {
let col_name = alias.as_ref().unwrap_or(yield_name);
if supports_node_yield {
let canonical = map_yield_to_canonical(yield_name);
if canonical == "node" {
expand_node_yield_fields(
col_name,
target_properties,
graph_ctx,
&mut fields,
);
continue;
}
if let Some(sig_field) = entry
.signature
.yields
.iter()
.find(|f| f.name() == canonical)
{
fields.push(field_from_signature(col_name, sig_field));
} else {
fields.push(Field::new(col_name, canonical_search_type(canonical), true));
}
continue;
}
let field = entry
.signature
.yields
.iter()
.find(|f| f.name() == yield_name.as_str())
.map(|f| field_from_signature(col_name, f))
.unwrap_or_else(|| Field::new(col_name, DataType::Utf8, true));
fields.push(field);
}
} else if let Some(registry) = graph_ctx.procedure_registry()
&& let Some(proc_def) = registry.get(procedure_name)
{
for (name, alias) in yield_items {
let col_name = alias.as_ref().unwrap_or(name);
let data_type = proc_def
.outputs
.iter()
.find(|o| o.name == *name)
.map(|o| procedure_value_type_to_arrow(&o.output_type))
.unwrap_or(DataType::Utf8);
fields.push(Field::new(col_name, data_type, true));
}
} else if yield_items.is_empty() {
} else {
for (name, alias) in yield_items {
let col_name = alias.as_ref().unwrap_or(name);
fields.push(Field::new(col_name, DataType::Utf8, true));
}
}
Arc::new(Schema::new(fields))
}
}
pub(crate) fn value_type_to_arrow(vt: &uni_algo::algo::procedures::ValueType) -> DataType {
use uni_algo::algo::procedures::ValueType;
match vt {
ValueType::Int => DataType::Int64,
ValueType::Float => DataType::Float64,
ValueType::String => DataType::Utf8,
ValueType::Bool => DataType::Boolean,
ValueType::List
| ValueType::Map
| ValueType::Node
| ValueType::Relationship
| ValueType::Path
| ValueType::Any => DataType::Utf8,
}
}
pub(crate) fn is_complex_value_type(vt: &uni_algo::algo::procedures::ValueType) -> bool {
use uni_algo::algo::procedures::ValueType;
matches!(
vt,
ValueType::List
| ValueType::Map
| ValueType::Node
| ValueType::Relationship
| ValueType::Path
)
}
fn procedure_value_type_to_arrow(
vt: &crate::query::executor::procedure::ProcedureValueType,
) -> DataType {
use crate::query::executor::procedure::ProcedureValueType;
match vt {
ProcedureValueType::Integer => DataType::Int64,
ProcedureValueType::Float | ProcedureValueType::Number => DataType::Float64,
ProcedureValueType::Boolean => DataType::Boolean,
ProcedureValueType::String | ProcedureValueType::Any => DataType::Utf8,
}
}
impl DisplayAs for GraphProcedureCallExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"GraphProcedureCallExec: procedure={}",
self.procedure_name
)
}
}
impl ExecutionPlan for GraphProcedureCallExec {
fn name(&self) -> &str {
"GraphProcedureCallExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
return Err(datafusion::error::DataFusionError::Internal(
"GraphProcedureCallExec has no children".to_string(),
));
}
Ok(self)
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let metrics = BaselineMetrics::new(&self.metrics, partition);
let mut evaluated_args = Vec::with_capacity(self.arguments.len());
for arg in &self.arguments {
evaluated_args.push(evaluate_simple_expr(arg, &self.params, &self.outer_values)?);
}
Ok(Box::pin(ProcedureCallStream::new(
self.graph_ctx.clone(),
self.procedure_name.clone(),
evaluated_args,
self.yield_items.clone(),
self.target_properties.clone(),
self.schema.clone(),
metrics,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
enum ProcedureCallState {
Init,
Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
Done,
}
struct ProcedureCallStream {
graph_ctx: Arc<GraphExecutionContext>,
procedure_name: String,
evaluated_args: Vec<Value>,
yield_items: Vec<(String, Option<String>)>,
target_properties: HashMap<String, Vec<String>>,
schema: SchemaRef,
state: ProcedureCallState,
metrics: BaselineMetrics,
}
impl ProcedureCallStream {
fn new(
graph_ctx: Arc<GraphExecutionContext>,
procedure_name: String,
evaluated_args: Vec<Value>,
yield_items: Vec<(String, Option<String>)>,
target_properties: HashMap<String, Vec<String>>,
schema: SchemaRef,
metrics: BaselineMetrics,
) -> Self {
Self {
graph_ctx,
procedure_name,
evaluated_args,
yield_items,
target_properties,
schema,
state: ProcedureCallState::Init,
metrics,
}
}
}
impl Stream for ProcedureCallStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let metrics = self.metrics.clone();
let _timer = metrics.elapsed_compute().timer();
loop {
let state = std::mem::replace(&mut self.state, ProcedureCallState::Done);
match state {
ProcedureCallState::Init => {
let graph_ctx = self.graph_ctx.clone();
let procedure_name = self.procedure_name.clone();
let evaluated_args = self.evaluated_args.clone();
let yield_items = self.yield_items.clone();
let target_properties = self.target_properties.clone();
let schema = self.schema.clone();
let fut = async move {
graph_ctx.check_timeout().map_err(|e| {
datafusion::error::DataFusionError::Execution(e.to_string())
})?;
execute_procedure(
&graph_ctx,
&procedure_name,
&evaluated_args,
&yield_items,
&target_properties,
&schema,
)
.await
};
self.state = ProcedureCallState::Executing(Box::pin(fut));
}
ProcedureCallState::Executing(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(batch)) => {
self.state = ProcedureCallState::Done;
self.metrics
.record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
return Poll::Ready(batch.map(Ok));
}
Poll::Ready(Err(e)) => {
self.state = ProcedureCallState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = ProcedureCallState::Executing(fut);
return Poll::Pending;
}
},
ProcedureCallState::Done => {
return Poll::Ready(None);
}
}
}
}
}
impl RecordBatchStream for ProcedureCallStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
async fn execute_procedure(
graph_ctx: &GraphExecutionContext,
procedure_name: &str,
args: &[Value],
yield_items: &[(String, Option<String>)],
target_properties: &HashMap<String, Vec<String>>,
schema: &SchemaRef,
) -> DFResult<Option<RecordBatch>> {
if let Some(registry) = graph_ctx.procedure_registry()
&& let Some(entry) = registry.resolve_user_procedure(procedure_name)
{
return execute_plugin_procedure(
graph_ctx,
procedure_name,
&entry,
args,
yield_items,
target_properties,
schema,
)
.await;
}
execute_registered_procedure(graph_ctx, procedure_name, args, yield_items, schema).await
}
async fn execute_plugin_procedure(
graph_ctx: &GraphExecutionContext,
procedure_name: &str,
entry: &uni_plugin::registry::ProcedureEntry,
args: &[Value],
yield_items: &[(String, Option<String>)],
target_properties: &HashMap<String, Vec<String>>,
schema: &SchemaRef,
) -> DFResult<Option<RecordBatch>> {
use datafusion::logical_expr::ColumnarValue;
use futures::StreamExt;
let mut columnar_args: Vec<ColumnarValue> = Vec::with_capacity(args.len());
for v in args {
columnar_args.push(value_to_columnar(v).map_err(|e| {
datafusion::error::DataFusionError::Execution(format!(
"Procedure '{procedure_name}': argument conversion failed: {e}"
))
})?);
}
let mut host =
crate::query::executor::procedure_host::QueryProcedureHost::from_graph_ctx_with_request(
graph_ctx,
target_properties.clone(),
yield_items.to_vec(),
Some(schema.clone()),
);
if let Some(writer) = graph_ctx.writer() {
host = host.with_writer(std::sync::Arc::clone(writer));
}
let principal = crate::current_principal();
let ctx = uni_plugin::host::build_procedure_context(&host, principal.as_deref());
let mut stream = entry.procedure.invoke(ctx, &columnar_args).map_err(|e| {
datafusion::error::DataFusionError::Execution(format!("Procedure '{procedure_name}': {e}"))
})?;
let mut batches: Vec<RecordBatch> = Vec::new();
while let Some(item) = stream.next().await {
let batch = item.map_err(|e| {
datafusion::error::DataFusionError::Execution(format!(
"Procedure '{procedure_name}' stream error: {e}"
))
})?;
batches.push(batch);
}
if batches.is_empty() {
return Ok(Some(create_empty_batch(schema.clone())?));
}
let plugin_schema = batches[0].schema();
let combined = if batches.len() == 1 {
batches.pop().unwrap()
} else {
arrow::compute::concat_batches(&plugin_schema, &batches).map_err(arrow_err)?
};
if combined.schema().fields() == schema.fields() {
return Ok(Some(combined));
}
if yield_items.is_empty()
|| (yield_items.len() == combined.num_columns()
&& yield_items
.iter()
.zip(combined.schema().fields().iter())
.all(|((name, _alias), field)| name == field.name()))
{
return Ok(Some(combined));
}
let mut projected_cols: Vec<ArrayRef> = Vec::with_capacity(yield_items.len());
let mut projected_fields: Vec<Field> = Vec::with_capacity(yield_items.len());
for (name, _alias) in yield_items {
let idx = combined.schema().index_of(name).map_err(|_| {
datafusion::error::DataFusionError::Execution(format!(
"Procedure '{procedure_name}': YIELD column `{name}` not in plugin output schema"
))
})?;
projected_cols.push(combined.column(idx).clone());
projected_fields.push(combined.schema().field(idx).clone());
}
let projected_schema = Arc::new(Schema::new(projected_fields));
let projected = RecordBatch::try_new(projected_schema, projected_cols).map_err(arrow_err)?;
Ok(Some(projected))
}
pub(crate) fn value_to_columnar(
v: &Value,
) -> Result<datafusion::logical_expr::ColumnarValue, String> {
use datafusion::logical_expr::ColumnarValue;
use datafusion::scalar::ScalarValue;
let scalar = match v {
Value::Null => ScalarValue::Null,
Value::Bool(b) => ScalarValue::Boolean(Some(*b)),
Value::Int(i) => ScalarValue::Int64(Some(*i)),
Value::Float(f) => ScalarValue::Float64(Some(*f)),
Value::String(s) => ScalarValue::Utf8(Some(s.clone())),
Value::Bytes(b) => ScalarValue::Binary(Some(b.clone())),
other => {
let json = serde_json::to_vec(other)
.map_err(|e| format!("plugin arg encoding failed for {other:?}: {e}"))?;
ScalarValue::LargeBinary(Some(json))
}
};
Ok(ColumnarValue::Scalar(scalar))
}
pub(crate) fn build_typed_column<'a>(
values: impl Iterator<Item = Option<&'a Value>>,
num_rows: usize,
data_type: &DataType,
) -> ArrayRef {
match data_type {
DataType::UInt64 => {
let mut builder = arrow_array::builder::UInt64Builder::with_capacity(num_rows);
for val in values {
match val.and_then(uni_common::Value::as_u64) {
Some(u) => builder.append_value(u),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
DataType::Struct(fields) if is_edge_struct_shape(fields) => {
build_edge_struct_column(values, num_rows, fields)
}
DataType::Int64 => {
let mut builder = Int64Builder::with_capacity(num_rows);
for val in values {
match val.and_then(|v| v.as_i64()) {
Some(i) => builder.append_value(i),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
DataType::Float64 => {
let mut builder = Float64Builder::with_capacity(num_rows);
for val in values {
match val.and_then(|v| v.as_f64()) {
Some(f) => builder.append_value(f),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
DataType::Boolean => {
let mut builder = BooleanBuilder::with_capacity(num_rows);
for val in values {
match val.and_then(|v| v.as_bool()) {
Some(b) => builder.append_value(b),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
_ => {
let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
for val in values {
match val {
Some(Value::String(s)) => builder.append_value(s),
Some(v) => builder.append_value(format!("{v}")),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
}
}
fn is_edge_struct_shape(fields: &arrow_schema::Fields) -> bool {
let names: std::collections::HashSet<&str> = fields.iter().map(|f| f.name().as_str()).collect();
names.contains("_eid")
&& names.contains("_type_name")
&& names.contains("_src")
&& names.contains("_dst")
&& names.contains("properties")
}
fn build_edge_struct_column<'a>(
values: impl Iterator<Item = Option<&'a Value>>,
_num_rows: usize,
fields: &arrow_schema::Fields,
) -> ArrayRef {
use arrow_array::builder::{LargeBinaryBuilder, StringBuilder, UInt64Builder};
use uni_common::Value as V;
let mut eid_b = UInt64Builder::new();
let mut type_b = StringBuilder::new();
let mut src_b = UInt64Builder::new();
let mut dst_b = UInt64Builder::new();
let mut props_b = LargeBinaryBuilder::new();
let mut validity: Vec<bool> = Vec::new();
for val in values {
match val {
Some(V::Edge(e)) => {
eid_b.append_value(e.eid.as_u64());
type_b.append_value(&e.edge_type);
src_b.append_value(e.src.as_u64());
dst_b.append_value(e.dst.as_u64());
let props_value = V::Map(e.properties.clone());
let bytes = uni_common::cypher_value_codec::encode(&props_value);
props_b.append_value(&bytes);
validity.push(true);
}
_ => {
eid_b.append_null();
type_b.append_null();
src_b.append_null();
dst_b.append_null();
props_b.append_null();
validity.push(false);
}
}
}
let arrays: Vec<ArrayRef> = vec![
Arc::new(eid_b.finish()),
Arc::new(type_b.finish()),
Arc::new(src_b.finish()),
Arc::new(dst_b.finish()),
Arc::new(props_b.finish()),
];
let canonical: [&str; 5] = ["_eid", "_type_name", "_src", "_dst", "properties"];
let mut ordered: Vec<ArrayRef> = Vec::with_capacity(fields.len());
for f in fields.iter() {
let idx = canonical
.iter()
.position(|n| *n == f.name().as_str())
.expect("is_edge_struct_shape vetted these field names");
ordered.push(arrays[idx].clone());
}
let nulls = arrow::buffer::NullBuffer::from(validity);
Arc::new(
arrow_array::StructArray::try_new(fields.clone(), ordered, Some(nulls))
.expect("StructArray construction with vetted shape"),
)
}
pub(crate) fn create_empty_batch(schema: SchemaRef) -> DFResult<RecordBatch> {
if schema.fields().is_empty() {
let options = arrow_array::RecordBatchOptions::new().with_row_count(Some(0));
RecordBatch::try_new_with_options(schema, vec![], &options).map_err(arrow_err)
} else {
Ok(RecordBatch::new_empty(schema))
}
}
async fn execute_registered_procedure(
graph_ctx: &GraphExecutionContext,
procedure_name: &str,
args: &[Value],
yield_items: &[(String, Option<String>)],
schema: &SchemaRef,
) -> DFResult<Option<RecordBatch>> {
let registry = graph_ctx.procedure_registry().ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"Procedure '{}' not supported in DataFusion engine (no procedure registry)",
procedure_name
))
})?;
let proc_def = registry.get(procedure_name).ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"ProcedureNotFound: Unknown procedure '{}'",
procedure_name
))
})?;
if args.len() != proc_def.params.len() {
return Err(datafusion::error::DataFusionError::Execution(format!(
"InvalidNumberOfArguments: Procedure '{}' expects {} argument(s), got {}",
proc_def.name,
proc_def.params.len(),
args.len()
)));
}
for (i, (arg_val, param)) in args.iter().zip(&proc_def.params).enumerate() {
if !arg_val.is_null() && !check_proc_type_compatible(arg_val, ¶m.param_type) {
return Err(datafusion::error::DataFusionError::Execution(format!(
"InvalidArgumentType: Argument {} ('{}') of procedure '{}' has incompatible type",
i, param.name, proc_def.name
)));
}
}
let filtered: Vec<&HashMap<String, Value>> = proc_def
.data
.iter()
.filter(|row| {
for (param, arg_val) in proc_def.params.iter().zip(args) {
if let Some(row_val) = row.get(¶m.name)
&& !proc_values_match(row_val, arg_val)
{
return false;
}
}
true
})
.collect();
if yield_items.is_empty() {
return Ok(Some(create_empty_batch(schema.clone())?));
}
if filtered.is_empty() {
return Ok(Some(create_empty_batch(schema.clone())?));
}
let num_rows = filtered.len();
let mut columns: Vec<ArrayRef> = Vec::new();
for (idx, (name, _alias)) in yield_items.iter().enumerate() {
let field = schema.field(idx);
let values = filtered.iter().map(|row| row.get(name.as_str()));
columns.push(build_typed_column(values, num_rows, field.data_type()));
}
let batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
Ok(Some(batch))
}
fn check_proc_type_compatible(
val: &Value,
expected: &crate::query::executor::procedure::ProcedureValueType,
) -> bool {
use crate::query::executor::procedure::ProcedureValueType;
match expected {
ProcedureValueType::Any => true,
ProcedureValueType::String => val.is_string(),
ProcedureValueType::Boolean => val.is_bool(),
ProcedureValueType::Integer => val.is_i64(),
ProcedureValueType::Float => val.is_f64() || val.is_i64(),
ProcedureValueType::Number => val.is_number(),
}
}
fn proc_values_match(row_val: &Value, arg_val: &Value) -> bool {
if arg_val.is_null() || row_val.is_null() {
return arg_val.is_null() && row_val.is_null();
}
if let (Some(a), Some(b)) = (row_val.as_f64(), arg_val.as_f64()) {
return (a - b).abs() < f64::EPSILON;
}
row_val == arg_val
}
pub(crate) fn json_to_value(jv: &serde_json::Value) -> Value {
match jv {
serde_json::Value::Null => Value::Null,
serde_json::Value::Bool(b) => Value::Bool(*b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::Int(i)
} else if let Some(f) = n.as_f64() {
Value::Float(f)
} else {
Value::Null
}
}
serde_json::Value::String(s) => Value::String(s.clone()),
other => Value::String(other.to_string()),
}
}
pub(crate) fn require_string_arg(
args: &[Value],
index: usize,
description: &str,
) -> DFResult<String> {
args.get(index)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!("{description} must be a string"))
})
}
pub(crate) fn extract_optional_filter(args: &[Value], index: usize) -> Option<String> {
args.get(index).and_then(|v| {
if v.is_null() {
None
} else {
v.as_str().map(|s| s.to_string())
}
})
}