use crate::automation::callable::{Callable, ExecutionContext, Signature, Value};
use crate::error::DbxResult;
type TableFn = Box<dyn Fn(&ExecutionContext, &[Value]) -> DbxResult<Vec<Vec<Value>>> + Send + Sync>;
pub struct TableUDF {
name: String,
signature: Signature,
func: TableFn,
}
impl TableUDF {
pub fn new<F>(name: impl Into<String>, signature: Signature, func: F) -> Self
where
F: Fn(&ExecutionContext, &[Value]) -> DbxResult<Vec<Vec<Value>>> + Send + Sync + 'static,
{
Self {
name: name.into(),
signature,
func: Box::new(func),
}
}
}
impl Callable for TableUDF {
fn call(&self, ctx: &ExecutionContext, args: &[Value]) -> DbxResult<Value> {
let rows = (self.func)(ctx, args)?;
Ok(Value::Table(rows))
}
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> &Signature {
&self.signature
}
}
impl TableUDF {
pub fn execute(&self, ctx: &ExecutionContext, args: &[Value]) -> DbxResult<Vec<Vec<Value>>> {
(self.func)(ctx, args)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::automation::callable::DataType;
use crate::engine::Database;
use std::sync::Arc;
#[test]
fn test_table_udf_basic() {
let table_udf = TableUDF::new(
"generate_series",
Signature {
params: vec![DataType::Int, DataType::Int],
return_type: DataType::Int,
is_variadic: false,
},
|_ctx, args| {
let start = args[0].as_i64()?;
let end = args[1].as_i64()?;
let mut rows = Vec::new();
for i in start..=end {
rows.push(vec![Value::Int(i)]);
}
Ok(rows)
},
);
let db = Database::open_in_memory().unwrap();
let ctx = ExecutionContext::new(Arc::new(db));
let rows = table_udf
.execute(&ctx, &[Value::Int(1), Value::Int(5)])
.unwrap();
assert_eq!(rows.len(), 5);
assert_eq!(rows[0][0].as_i64().unwrap(), 1);
assert_eq!(rows[4][0].as_i64().unwrap(), 5);
}
#[test]
fn test_table_udf_multi_column() {
let table_udf = TableUDF::new(
"user_data",
Signature {
params: vec![],
return_type: DataType::String,
is_variadic: false,
},
|_ctx, _args| {
Ok(vec![
vec![Value::Int(1), Value::String("Alice".to_string())],
vec![Value::Int(2), Value::String("Bob".to_string())],
vec![Value::Int(3), Value::String("Charlie".to_string())],
])
},
);
let db = Database::open_in_memory().unwrap();
let ctx = ExecutionContext::new(Arc::new(db));
let rows = table_udf.execute(&ctx, &[]).unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0].len(), 2);
assert_eq!(rows[1][1].as_str().unwrap(), "Bob");
}
#[test]
fn test_table_udf_with_filter() {
let table_udf = TableUDF::new(
"filtered_range",
Signature {
params: vec![DataType::Int, DataType::Int, DataType::Int],
return_type: DataType::Int,
is_variadic: false,
},
|_ctx, args| {
let start = args[0].as_i64()?;
let end = args[1].as_i64()?;
let step = args[2].as_i64()?;
let mut rows = Vec::new();
let mut current = start;
while current <= end {
rows.push(vec![Value::Int(current)]);
current += step;
}
Ok(rows)
},
);
let db = Database::open_in_memory().unwrap();
let ctx = ExecutionContext::new(Arc::new(db));
let rows = table_udf
.execute(&ctx, &[Value::Int(0), Value::Int(10), Value::Int(2)])
.unwrap();
assert_eq!(rows.len(), 6); assert_eq!(rows[3][0].as_i64().unwrap(), 6);
}
#[test]
fn test_table_udf_with_engine() {
use crate::automation::ExecutionEngine;
let engine = ExecutionEngine::new();
let table_udf = Arc::new(TableUDF::new(
"range",
Signature {
params: vec![DataType::Int],
return_type: DataType::Int,
is_variadic: false,
},
|_ctx, args| {
let n = args[0].as_i64()?;
let mut rows = Vec::new();
for i in 0..n {
rows.push(vec![Value::Int(i)]);
}
Ok(rows)
},
));
engine.register(table_udf).unwrap();
let db = Database::open_in_memory().unwrap();
let ctx = ExecutionContext::new(Arc::new(db));
let result = engine.execute("range", &ctx, &[Value::Int(3)]).unwrap();
let table = result.as_table().unwrap();
assert_eq!(table.len(), 3);
assert_eq!(table[0][0].as_i64().unwrap(), 0);
}
}