use crate::arg::{Arg, ArgType, NamedArg};
use anyhow::Context;
use arg::Args;
use arrow::array::RecordBatch;
use arrow::ffi_stream::FFI_ArrowArrayStream;
use arrow_utils::{DynamicArrowArrayStreamReader, VecRecordBatchReader};
use derive_builder::Builder;
use serde::Serialize;
use std::borrow::Cow;
use std::ffi::c_char;
use std::ptr::null_mut;
use std::sync::Arc;
pub mod arg;
mod arrow_utils;
pub unsafe fn create_raw(
registry: &FunctionRegistry,
arguments: *const i8,
named_arguments: *const i8,
timezone: *const i8,
) -> anyhow::Result<Box<dyn TableFunction>> {
let arguments = if arguments.is_null() {
None
} else {
unsafe {
Some(std::str::from_utf8_unchecked(
std::ffi::CStr::from_ptr(arguments as *const c_char).to_bytes(),
))
}
};
let named_arguments = if named_arguments.is_null() {
None
} else {
unsafe {
Some(std::str::from_utf8_unchecked(
std::ffi::CStr::from_ptr(named_arguments as *const c_char).to_bytes(),
))
}
};
let timezone = unsafe {
std::str::from_utf8_unchecked(
std::ffi::CStr::from_ptr(timezone as *const c_char).to_bytes(),
)
};
create(registry, arguments, named_arguments, timezone)
}
pub fn create(
registry: &FunctionRegistry,
arguments: Option<&str>,
named_arguments: Option<&str>,
timezone: &str,
) -> anyhow::Result<Box<dyn TableFunction>> {
let create_closure = &(registry.init);
let arguments = if let Some(arg) = arguments {
serde_json::from_str(arg).context("Failed to parse arguments from JSON")?
} else {
None
};
let named_arguments: Vec<NamedArg> = if let Some(arg) = named_arguments {
serde_json::from_str(arg).context("Failed to parse named arguments from JSON")?
} else {
vec![]
};
let ctx = FunctionContext {
arguments,
named_arguments: named_arguments
.into_iter()
.map(|named| (named.name, named.arg))
.collect(),
timezone: String::from(timezone),
};
create_closure(ctx)
}
type TableFunctionInitialize =
Arc<dyn Fn(FunctionContext) -> anyhow::Result<Box<dyn TableFunction>>>;
#[derive(Builder)]
pub struct FunctionRegistry {
#[builder(setter(into))]
name: &'static str,
init: TableFunctionInitialize,
#[builder(setter(strip_option, each(name = "signature", into)))]
signatures: Option<Vec<Signature>>,
}
impl std::fmt::Debug for FunctionRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FunctionRegistry")
.field("name", &self.name)
.field("init", &Arc::as_ptr(&self.init))
.field("signatures", &self.signatures)
.finish()
}
}
impl FunctionRegistry {
pub fn name(&self) -> &'static str {
self.name
}
pub fn signatures(&self) -> anyhow::Result<String> {
serde_json::to_string(&self.signatures).context("Failed to get signatures")
}
pub fn builder() -> FunctionRegistryBuilder {
FunctionRegistryBuilder::default()
}
}
#[derive(Clone, Debug, Serialize, Builder)]
pub struct Signature {
#[builder(setter(each(name = "parameter", into)))]
pub(crate) parameters: Vec<Parameter>,
}
impl Signature {
pub fn builder() -> SignatureBuilder {
SignatureBuilder::default()
}
pub fn empty() -> Signature {
Signature { parameters: vec![] }
}
}
#[derive(Clone, Debug, Serialize)]
pub struct Parameter {
pub(crate) name: Option<String>,
pub(crate) default: Option<Arg>,
pub(crate) arg_type: ArgType,
}
impl From<ArgType> for Parameter {
fn from(value: ArgType) -> Self {
Parameter {
name: None,
default: None,
arg_type: value,
}
}
}
impl<NAME, ARG> From<(Option<NAME>, ArgType, Option<ARG>)> for Parameter
where
NAME: Into<Cow<'static, str>>,
ARG: Into<Arg>,
{
fn from((name, arg_type, default): (Option<NAME>, ArgType, Option<ARG>)) -> Self {
Parameter {
name: name.map(|x| x.into().into_owned()),
default: default.map(|x| x.into()),
arg_type,
}
}
}
impl<P> From<Vec<P>> for Signature
where
P: Into<Parameter>,
{
fn from(value: Vec<P>) -> Self {
Signature {
parameters: value.into_iter().map(|x| x.into()).collect(),
}
}
}
pub struct FunctionContext {
pub arguments: Option<Args>,
pub named_arguments: Vec<(String, Arg)>,
pub timezone: String,
}
pub trait TableFunction {
fn process(&mut self, input: RecordBatch) -> anyhow::Result<Option<RecordBatch>>;
fn finalize(&mut self) -> anyhow::Result<Option<RecordBatch>> {
Ok(None)
}
}
pub unsafe fn process_raw(
func: &mut Box<dyn TableFunction>,
input_stream: i64,
) -> anyhow::Result<i64> {
let mut stream_reader: DynamicArrowArrayStreamReader = unsafe {
DynamicArrowArrayStreamReader::from_raw(
input_stream as *mut arrow::ffi_stream::FFI_ArrowArrayStream,
)
.expect("Failed to construct DynamicArrowArrayStreamReader")
};
if let Some(record_batch) = stream_reader.next() {
let record_batch = record_batch.expect("cannot iterate over record batch");
let Some(output) = func.process(record_batch)? else {
return Ok(null_mut::<FFI_ArrowArrayStream>() as i64);
};
let boxed = Box::new(FFI_ArrowArrayStream::new(VecRecordBatchReader::new(vec![
output,
])));
return Ok(Box::into_raw(boxed) as i64);
}
Ok(null_mut::<FFI_ArrowArrayStream>() as i64)
}
pub unsafe fn finalize_raw(func: &mut Box<dyn TableFunction>) -> anyhow::Result<i64> {
let Some(output) = func.finalize()? else {
return Ok(null_mut::<FFI_ArrowArrayStream>() as i64);
};
let boxed = Box::new(FFI_ArrowArrayStream::new(VecRecordBatchReader::new(vec![
output,
])));
Ok(Box::into_raw(boxed) as i64)
}
pub fn anyhow_error_to_string(error: &anyhow::Error) -> String {
format!("{:?}", error)
}