use std::any::Any;
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, Float64Array};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result as DfResult;
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Bm25Score {
signature: Signature,
}
impl Default for Bm25Score {
fn default() -> Self {
Self::new()
}
}
impl Bm25Score {
pub fn new() -> Self {
Self {
signature: Signature::exact(vec![DataType::Utf8, DataType::Utf8], Volatility::Stable),
}
}
}
impl ScalarUDFImpl for Bm25Score {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"bm25_score"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> DfResult<DataType> {
Ok(DataType::Float64)
}
fn invoke_with_args(
&self,
args: datafusion::logical_expr::ScalarFunctionArgs,
) -> DfResult<ColumnarValue> {
let len = match &args.args[0] {
ColumnarValue::Array(arr) => arr.len(),
ColumnarValue::Scalar(_) => 1,
};
let zeros = Float64Array::from(vec![0.0; len]);
Ok(ColumnarValue::Array(Arc::new(zeros) as ArrayRef))
}
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::StringArray;
#[test]
fn returns_float64() {
let udf = Bm25Score::new();
assert_eq!(
udf.return_type(&[DataType::Utf8, DataType::Utf8]).unwrap(),
DataType::Float64
);
}
#[test]
fn invoke_returns_zeros() {
use datafusion::arrow::datatypes::Field;
use datafusion::logical_expr::ScalarFunctionArgs;
let udf = Bm25Score::new();
let field =
ColumnarValue::Array(Arc::new(StringArray::from(vec!["body", "body"])) as ArrayRef);
let query =
ColumnarValue::Array(Arc::new(StringArray::from(vec!["test", "test"])) as ArrayRef);
let args = ScalarFunctionArgs {
args: vec![field, query],
arg_fields: vec![],
number_rows: 2,
return_field: Arc::new(Field::new("", DataType::Float64, false)),
config_options: Arc::new(datafusion::config::ConfigOptions::new()),
};
let result = udf.invoke_with_args(args).unwrap();
match result {
ColumnarValue::Array(arr) => {
let f64_arr = arr.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(f64_arr.len(), 2);
assert_eq!(f64_arr.value(0), 0.0);
}
_ => panic!("expected array"),
}
}
}