use std::sync::Arc;
use arrow_schema::{DataType, SchemaRef};
use vgi_rpc::Result;
pub use crate::protocol::dtos::FunctionExample;
use crate::protocol::enums;
#[derive(Clone, Copy)]
pub struct TypeBound {
pub name: &'static str,
pub pred: fn(&DataType) -> bool,
}
impl std::fmt::Debug for TypeBound {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TypeBound({})", self.name)
}
}
pub const ADDABLE: TypeBound = TypeBound {
name: "_is_addable_type",
pred: is_addable,
};
pub const MULTIPLIABLE: TypeBound = TypeBound {
name: "_is_multipliable_type",
pred: is_multipliable,
};
fn is_integer(t: &DataType) -> bool {
use DataType::*;
matches!(
t,
Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64
)
}
fn is_floating(t: &DataType) -> bool {
matches!(t, DataType::Float16 | DataType::Float32 | DataType::Float64)
}
fn is_decimal(t: &DataType) -> bool {
matches!(t, DataType::Decimal128(_, _) | DataType::Decimal256(_, _))
}
fn is_temporal(t: &DataType) -> bool {
use DataType::*;
matches!(
t,
Date32 | Date64 | Time32(_) | Time64(_) | Timestamp(_, _) | Duration(_) | Interval(_)
)
}
fn is_addable(t: &DataType) -> bool {
is_integer(t) || is_floating(t) || is_decimal(t) || is_temporal(t)
}
fn is_multipliable(t: &DataType) -> bool {
is_integer(t) || is_floating(t) || is_decimal(t)
}
#[derive(Debug, Clone)]
pub struct ArgSpec {
pub name: String,
pub position: i32,
pub arrow_type: String,
pub doc: String,
pub is_const: bool,
pub is_varargs: bool,
pub arrow_data_type: Option<DataType>,
pub type_bound: Option<TypeBound>,
}
impl ArgSpec {
fn base(name: &str, position: i32, arrow_type: &str, doc: &str) -> Self {
ArgSpec {
name: name.to_string(),
position,
arrow_type: arrow_type.to_string(),
doc: doc.to_string(),
is_const: false,
is_varargs: false,
arrow_data_type: None,
type_bound: None,
}
}
pub fn any_column(name: &str, position: i32, doc: &str) -> Self {
Self::base(name, position, "any", doc)
}
pub fn column(name: &str, position: i32, arrow_type: &str, doc: &str) -> Self {
Self::base(name, position, arrow_type, doc)
}
pub fn column_typed(name: &str, position: i32, ty: DataType, doc: &str) -> Self {
let mut s = Self::base(name, position, "", doc);
s.arrow_data_type = Some(ty);
s
}
pub fn const_arg(name: &str, position: i32, arrow_type: &str, doc: &str) -> Self {
let mut s = Self::base(name, position, arrow_type, doc);
s.is_const = true;
s
}
pub fn const_typed(name: &str, position: i32, ty: DataType, doc: &str) -> Self {
let mut s = Self::base(name, position, "", doc);
s.is_const = true;
s.arrow_data_type = Some(ty);
s
}
pub fn varargs(mut self) -> Self {
self.is_varargs = true;
self
}
pub fn as_const(mut self) -> Self {
self.is_const = true;
self
}
pub fn with_bound(mut self, bound: TypeBound) -> Self {
self.type_bound = Some(bound);
self
}
}
pub fn validate_type_bounds(specs: &[ArgSpec], input_schema: Option<&SchemaRef>) -> Result<()> {
let Some(schema) = input_schema else {
return Ok(());
};
for spec in specs {
let Some(bound) = spec.type_bound else {
continue;
};
if spec.position < 0 {
continue;
}
if let Some(field) = schema.fields().get(spec.position as usize) {
if !(bound.pred)(field.data_type()) {
return Err(vgi_rpc::RpcError::value_error(format!(
"{}: argument {} of type {} does not satisfy {}",
bound.name,
spec.name,
field.data_type(),
bound.name
)));
}
}
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct FunctionMetadata {
pub description: String,
pub stability: Option<String>,
pub null_handling: Option<String>,
pub categories: Vec<String>,
pub examples: Vec<FunctionExample>,
pub return_type: Option<DataType>,
pub projection_pushdown: bool,
pub filter_pushdown: bool,
pub sampling_pushdown: bool,
pub auto_apply_filters: bool,
pub supports_batch_index: bool,
pub partition_kind: Option<String>,
pub order_preservation: Option<String>,
pub sink_order_dependent: bool,
pub source_order_dependent: bool,
pub requires_input_batch_index: bool,
pub supports_window: bool,
pub streaming_partitioned: bool,
pub late_materialization: bool,
pub required_settings: Vec<String>,
}
impl Default for FunctionMetadata {
fn default() -> Self {
FunctionMetadata {
description: String::new(),
stability: Some(enums::stability::CONSISTENT.to_string()),
null_handling: None,
categories: Vec::new(),
examples: Vec::new(),
return_type: None,
projection_pushdown: false,
filter_pushdown: false,
sampling_pushdown: false,
auto_apply_filters: false,
supports_batch_index: false,
partition_kind: None,
order_preservation: None,
sink_order_dependent: false,
source_order_dependent: false,
requires_input_batch_index: false,
supports_window: false,
streaming_partitioned: false,
late_materialization: false,
required_settings: Vec::new(),
}
}
}
#[derive(Clone, Default)]
pub struct BindParams {
pub input_schema: Option<SchemaRef>,
pub arguments: crate::arguments::Arguments,
pub settings: crate::settings::Settings,
pub secrets: crate::secrets::Secrets,
pub resolved_secrets_provided: bool,
pub auth_principal: Option<String>,
pub attach_opaque_data: Option<Vec<u8>>,
pub transaction_opaque_data: Option<Vec<u8>>,
pub storage: Option<crate::storage::SharedStorage>,
}
#[derive(Clone)]
pub struct BindResponse {
pub output_schema: SchemaRef,
pub opaque_data: Vec<u8>,
}
impl BindResponse {
pub fn result(ty: DataType) -> Self {
BindResponse {
output_schema: Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"result", ty, true,
)])),
opaque_data: Vec::new(),
}
}
}
#[derive(Clone)]
pub struct ProcessParams {
pub output_schema: SchemaRef,
pub input_schema: Option<SchemaRef>,
pub execution_id: Vec<u8>,
pub init_opaque_data: Vec<u8>,
pub arguments: crate::arguments::Arguments,
pub settings: crate::settings::Settings,
pub secrets: crate::secrets::Secrets,
pub auth_principal: Option<String>,
pub projection_ids: Option<Vec<i64>>,
pub pushdown_filters: Option<Vec<u8>>,
pub join_keys: Vec<Vec<u8>>,
pub storage: Option<crate::storage::SharedStorage>,
pub order_by_column: Option<String>,
pub order_by_direction: Option<String>,
pub order_by_null_order: Option<String>,
pub order_by_limit: Option<i64>,
pub tablesample_percentage: Option<f64>,
pub tablesample_seed: Option<i64>,
pub attach_opaque_data: Option<Vec<u8>>,
pub at_unit: Option<String>,
pub at_value: Option<String>,
}
pub trait ScalarFunction: Send + Sync {
fn name(&self) -> &str;
fn metadata(&self) -> FunctionMetadata;
fn argument_specs(&self) -> Vec<ArgSpec>;
fn secret_lookups(&self, _params: &BindParams) -> Vec<crate::secrets::SecretLookup> {
Vec::new()
}
fn on_bind(&self, params: &BindParams) -> Result<BindResponse> {
if let Some(ty) = self.metadata().return_type {
return Ok(BindResponse::result(ty));
}
let ty = params
.input_schema
.as_ref()
.and_then(|s| s.fields().first().map(|f| f.data_type().clone()))
.unwrap_or(DataType::Int64);
Ok(BindResponse::result(ty))
}
fn process(
&self,
params: &ProcessParams,
batch: &arrow_array::RecordBatch,
) -> Result<arrow_array::RecordBatch>;
}