#![cfg(feature = "pyo3")]
use std::ffi::CString;
use std::sync::Arc;
use arrow_schema::DataType;
use pyo3::prelude::*;
use pyo3::types::{PyAnyMethods, PyDict, PyDictMethods, PyList, PyListMethods, PyTuple};
use smol_str::SmolStr;
use uni_plugin::traits::scalar::{ArgType, FnSignature, NullHandling};
use uni_plugin::{Capability, CapabilitySet, PluginId, PluginRegistrar, QName};
use crate::adapter_aggregate::{PyAggregateFn, build_py_agg_signature};
use crate::adapter_procedure::PyProcedure;
use crate::adapter_scalar::PyScalarFn;
use crate::adapter_scalar_helpers::{
determinism_to_volatility, type_name_to_datatype as type_name_to_datatype_shared,
};
use crate::error::PyPluginError;
use crate::manifest::{
ManifestBuilder, PyAggregateEntry, PyManifest, PyProcedureEntry, PyScalarEntry,
};
use crate::runtime::PyPluginRuntime;
#[derive(Debug)]
pub struct LoadOutcome {
pub plugin_id: PluginId,
pub version: String,
pub effective_capabilities: CapabilitySet,
pub denied_capabilities: Vec<Capability>,
pub scalars_registered: Vec<String>,
pub aggregates_registered: Vec<String>,
pub procedures_registered: Vec<String>,
pub runtime: Arc<PyPluginRuntime>,
}
#[derive(Default, Clone)]
pub struct PyPluginLoader {
default_plugin_id: Option<SmolStr>,
}
impl std::fmt::Debug for PyPluginLoader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PyPluginLoader")
.field("default_plugin_id", &self.default_plugin_id)
.finish()
}
}
impl PyPluginLoader {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_default_plugin_id(id: impl Into<SmolStr>) -> Self {
Self {
default_plugin_id: Some(id.into()),
}
}
pub fn load(
&self,
py: Python<'_>,
module_src: &str,
module_name: &str,
registrar: &mut PluginRegistrar<'_>,
registrar_caps: &CapabilitySet,
) -> Result<LoadOutcome, PyPluginError> {
let builder = ManifestBuilder::new();
let module = build_module_with_sink(py, module_name, &builder)?;
let module_src_c = CString::new(module_src).map_err(|e| {
PyPluginError::ManifestInvalid(format!("module source contains NUL: {e}"))
})?;
py.run(
module_src_c.as_c_str(),
Some(&module.dict()),
Some(&module.dict()),
)
.map_err(|err| {
PyPluginError::from(err).with_qname(format!("<module {module_name}>"))
})?;
let manifest = builder.into_manifest();
self.finalize(&manifest, module_name, registrar, registrar_caps)
}
pub fn load_from_builder(
&self,
builder: &ManifestBuilder,
registrar: &mut PluginRegistrar<'_>,
registrar_caps: &CapabilitySet,
) -> Result<LoadOutcome, PyPluginError> {
let manifest = builder.into_manifest();
self.finalize(&manifest, "py.live", registrar, registrar_caps)
}
fn finalize(
&self,
manifest: &PyManifest,
default_id: &str,
registrar: &mut PluginRegistrar<'_>,
registrar_caps: &CapabilitySet,
) -> Result<LoadOutcome, PyPluginError> {
manifest.validate_non_empty()?;
let resolved_id = self.resolve_plugin_id(manifest, default_id)?;
let runtime = PyPluginRuntime::new(resolved_id.clone());
let declared = derive_declared_capabilities(manifest);
let (effective, denied) = intersect_caps(&declared, registrar_caps);
registrar.set_plugin_id(resolved_id.clone());
let scalars_registered = if effective.contains(&Capability::ScalarFn) {
register_scalars(
registrar,
Arc::clone(&runtime),
&resolved_id,
&manifest.scalar_fns,
)?
} else {
Vec::new()
};
let aggregates_registered = if effective.contains(&Capability::AggregateFn) {
register_aggregates(
registrar,
Arc::clone(&runtime),
&resolved_id,
&manifest.aggregate_fns,
)?
} else {
Vec::new()
};
let procedures_registered = if effective.contains(&Capability::Procedure) {
register_procedures(
registrar,
Arc::clone(&runtime),
&resolved_id,
&manifest.procedures,
)?
} else {
Vec::new()
};
Ok(LoadOutcome {
plugin_id: resolved_id,
version: manifest.version.to_string(),
effective_capabilities: effective,
denied_capabilities: denied,
scalars_registered,
aggregates_registered,
procedures_registered,
runtime,
})
}
fn resolve_plugin_id(
&self,
manifest: &PyManifest,
module_name: &str,
) -> Result<PluginId, PyPluginError> {
let id_str: SmolStr = if manifest.id.as_str() != "py.live" {
manifest.id.clone()
} else if let Some(d) = &self.default_plugin_id {
d.clone()
} else if !module_name.is_empty() && module_name != "py.live" {
SmolStr::new(module_name)
} else {
return Err(PyPluginError::ManifestInvalid(
"plugin id was neither declared in the module nor supplied by the loader".into(),
));
};
Ok(PluginId::new(id_str))
}
}
fn register_scalars(
registrar: &mut PluginRegistrar<'_>,
runtime: Arc<PyPluginRuntime>,
plugin_id: &PluginId,
entries: &[PyScalarEntry],
) -> Result<Vec<String>, PyPluginError> {
let mut registered = Vec::with_capacity(entries.len());
for entry in entries {
let args_types: Vec<ArgType> = entry
.args
.iter()
.map(|t| type_name_to_argtype(t.as_str()))
.collect::<Result<_, PyPluginError>>()?;
let returns_type = type_name_to_argtype(entry.returns.as_str())?;
let sig = FnSignature {
args: args_types,
returns: returns_type,
volatility: determinism_to_volatility(entry.determinism.as_str()),
null_handling: NullHandling::PropagateNulls,
};
let local_name = entry.name.clone();
let qname = QName::new(plugin_id.as_str(), local_name.clone());
let callable = Python::attach(|py| entry.callable.clone_ref(py));
runtime.insert(local_name.clone(), callable);
let adapter = if entry.vectorized {
PyScalarFn::new_vectorized(Arc::clone(&runtime), local_name, sig.clone())
} else {
PyScalarFn::new(Arc::clone(&runtime), local_name, sig.clone())
};
registrar
.scalar_fn(qname.clone(), sig, Arc::new(adapter))
.map_err(PyPluginError::from)?;
registered.push(qname.to_string());
}
Ok(registered)
}
fn register_aggregates(
registrar: &mut PluginRegistrar<'_>,
runtime: Arc<PyPluginRuntime>,
plugin_id: &PluginId,
entries: &[PyAggregateEntry],
) -> Result<Vec<String>, PyPluginError> {
let mut registered = Vec::with_capacity(entries.len());
for entry in entries {
let sig = build_py_agg_signature(&entry.args, &entry.returns, entry.determinism.as_str())
.map_err(|e| PyPluginError::ManifestInvalid(e.message))?;
let local_name = entry.name.clone();
let qname = QName::new(plugin_id.as_str(), local_name.clone());
Python::attach(|py| {
runtime.insert(format!("{local_name}::init"), entry.init.clone_ref(py));
runtime.insert(
format!("{local_name}::accumulate"),
entry.accumulate.clone_ref(py),
);
runtime.insert(format!("{local_name}::merge"), entry.merge.clone_ref(py));
runtime.insert(
format!("{local_name}::finalize"),
entry.finalize.clone_ref(py),
);
});
let adapter = PyAggregateFn::new(Arc::clone(&runtime), local_name, sig.clone());
registrar
.aggregate_fn(qname.clone(), sig, Arc::new(adapter))
.map_err(PyPluginError::from)?;
registered.push(qname.to_string());
}
Ok(registered)
}
fn register_procedures(
registrar: &mut PluginRegistrar<'_>,
runtime: Arc<PyPluginRuntime>,
plugin_id: &PluginId,
entries: &[PyProcedureEntry],
) -> Result<Vec<String>, PyPluginError> {
use arrow_schema::Field;
use uni_plugin::capability::SideEffects;
use uni_plugin::traits::procedure::{NamedArgType, ProcedureMode, ProcedureSignature};
let mut registered = Vec::with_capacity(entries.len());
for entry in entries {
let args: Vec<NamedArgType> = entry
.args
.iter()
.enumerate()
.map(|(i, t)| {
let ty = type_name_to_argtype(t.as_str())?;
Ok(NamedArgType {
name: SmolStr::from(format!("arg{i}")),
ty,
default: None,
doc: String::new(),
})
})
.collect::<Result<_, PyPluginError>>()?;
let yields: Vec<Field> = entry
.yields
.iter()
.enumerate()
.map(|(i, t)| {
let dt = type_name_to_datatype(t.as_str())?;
Ok(Field::new(format!("col{i}"), dt, true))
})
.collect::<Result<_, PyPluginError>>()?;
let mode = match entry.mode.trim().to_ascii_lowercase().as_str() {
"write" => ProcedureMode::Write,
"schema" => ProcedureMode::Schema,
"dbms" => ProcedureMode::Dbms,
_ => ProcedureMode::Read,
};
let side_effects = match mode {
ProcedureMode::Read => SideEffects::ReadOnly,
_ => SideEffects::Writes,
};
let sig = ProcedureSignature {
args,
yields,
mode,
side_effects,
retry_contract: None,
batch_input: None,
docs: String::new(),
};
let local_name = entry.name.clone();
let qname = QName::new(plugin_id.as_str(), local_name.clone());
let callable = Python::attach(|py| entry.callable.clone_ref(py));
runtime.insert(local_name.clone(), callable);
let adapter = PyProcedure::new(Arc::clone(&runtime), local_name, sig.clone());
registrar
.procedure(qname.clone(), sig, Arc::new(adapter))
.map_err(PyPluginError::from)?;
registered.push(qname.to_string());
}
Ok(registered)
}
fn type_name_to_datatype(name: &str) -> Result<DataType, PyPluginError> {
type_name_to_datatype_shared(name).ok_or_else(|| {
let normalized = name.trim().to_ascii_lowercase();
PyPluginError::ManifestInvalid(format!("unknown yield/arg type `{normalized}`"))
})
}
fn type_name_to_argtype(name: &str) -> Result<ArgType, PyPluginError> {
let dt = type_name_to_datatype_shared(name).ok_or_else(|| {
let normalized = name.trim().to_ascii_lowercase();
PyPluginError::ManifestInvalid(format!(
"unknown argument/return type `{normalized}` — v1 covers float/int/string/bool"
))
})?;
Ok(ArgType::Primitive(dt))
}
fn derive_declared_capabilities(m: &PyManifest) -> CapabilitySet {
let mut set = CapabilitySet::new();
if !m.scalar_fns.is_empty() {
set.insert(Capability::ScalarFn);
}
if !m.aggregate_fns.is_empty() {
set.insert(Capability::AggregateFn);
}
if !m.procedures.is_empty() {
set.insert(Capability::Procedure);
}
set
}
fn intersect_caps(
declared: &CapabilitySet,
granted: &CapabilitySet,
) -> (CapabilitySet, Vec<Capability>) {
let effective = declared.intersect(granted);
let denied: Vec<Capability> = declared
.iter()
.filter(|c| !granted.contains(c))
.cloned()
.collect();
(effective, denied)
}
fn build_module_with_sink<'py>(
py: Python<'py>,
module_name: &str,
builder: &Arc<ManifestBuilder>,
) -> Result<Bound<'py, pyo3::types::PyModule>, PyPluginError> {
let stub_src = CString::new("# uni-plugin-pyo3 host-injected module\n")
.map_err(|e| PyPluginError::Internal(format!("CString stub: {e}")))?;
let module_name_c = CString::new(module_name)
.unwrap_or_else(|_| CString::new("uni_plugin_pyo3_module").expect("static"));
let filename_c = CString::new(format!("{module_name}.py"))
.unwrap_or_else(|_| CString::new("uni_plugin_pyo3_module.py").expect("static"));
let module = pyo3::types::PyModule::from_code(
py,
stub_src.as_c_str(),
filename_c.as_c_str(),
module_name_c.as_c_str(),
)
.map_err(PyPluginError::from)?;
let sink = Py::new(py, PyDecoratorSink::from_builder(Arc::clone(builder)))
.map_err(PyPluginError::from)?;
module
.setattr("_uni_decorator_sink", sink.clone_ref(py))
.map_err(PyPluginError::from)?;
module.setattr("db", sink).map_err(PyPluginError::from)?;
Ok(module)
}
#[derive(Debug)]
#[doc(hidden)]
#[pyclass]
pub struct PyDecoratorSink {
pub(crate) builder: Arc<ManifestBuilder>,
}
impl PyDecoratorSink {
#[doc(hidden)]
#[must_use]
pub fn from_builder(builder: Arc<ManifestBuilder>) -> Self {
Self { builder }
}
}
#[pymethods]
impl PyDecoratorSink {
#[pyo3(signature = (name, args, returns, vectorized=false, determinism="pure"))]
fn scalar_fn(
&self,
py: Python<'_>,
name: String,
args: Bound<'_, PyAny>,
returns: String,
vectorized: bool,
determinism: &str,
) -> PyResult<Py<PyAny>> {
make_scalar_trampoline(
py,
Arc::clone(&self.builder),
name,
args,
returns,
vectorized,
determinism,
)
}
#[pyo3(signature = (name, args, returns, determinism="pure"))]
fn aggregate_fn(
&self,
py: Python<'_>,
name: String,
args: Bound<'_, PyAny>,
returns: String,
determinism: &str,
) -> PyResult<Py<PyAny>> {
make_aggregate_trampoline(
py,
Arc::clone(&self.builder),
name,
args,
returns,
determinism,
)
}
#[pyo3(signature = (name, args, yields, mode="read"))]
fn procedure(
&self,
py: Python<'_>,
name: String,
args: Bound<'_, PyAny>,
yields: Bound<'_, PyAny>,
mode: &str,
) -> PyResult<Py<PyAny>> {
make_procedure_trampoline(py, Arc::clone(&self.builder), name, args, yields, mode)
}
fn set_plugin_id(&self, id: String) {
self.builder.set_id(id);
}
fn set_version(&self, version: String) {
self.builder.set_version(version);
}
fn set_determinism(&self, determinism: String) {
self.builder.set_determinism(determinism);
}
}
#[doc(hidden)]
pub fn make_scalar_trampoline(
py: Python<'_>,
builder: Arc<ManifestBuilder>,
name: String,
args: Bound<'_, PyAny>,
returns: String,
vectorized: bool,
determinism: &str,
) -> PyResult<Py<PyAny>> {
let args_vec = extract_args_list(&args)?;
let trampoline = PyDecoratorTrampoline::new_scalar(
builder,
SmolStr::new(&name),
args_vec,
SmolStr::new(&returns),
vectorized,
SmolStr::new(determinism),
);
Ok(Py::new(py, trampoline)?.into_any())
}
#[doc(hidden)]
pub fn make_aggregate_trampoline(
py: Python<'_>,
builder: Arc<ManifestBuilder>,
name: String,
args: Bound<'_, PyAny>,
returns: String,
determinism: &str,
) -> PyResult<Py<PyAny>> {
let args_vec = extract_args_list(&args)?;
let trampoline = PyDecoratorTrampoline::new_aggregate(
builder,
SmolStr::new(&name),
args_vec,
SmolStr::new(&returns),
SmolStr::new(determinism),
);
Ok(Py::new(py, trampoline)?.into_any())
}
#[doc(hidden)]
pub fn make_procedure_trampoline(
py: Python<'_>,
builder: Arc<ManifestBuilder>,
name: String,
args: Bound<'_, PyAny>,
yields: Bound<'_, PyAny>,
mode: &str,
) -> PyResult<Py<PyAny>> {
let args_vec = extract_args_list(&args)?;
let yields_vec = extract_args_list(&yields)?;
let trampoline = PyDecoratorTrampoline::new_procedure(
builder,
SmolStr::new(&name),
args_vec,
yields_vec,
SmolStr::new(mode),
);
Ok(Py::new(py, trampoline)?.into_any())
}
fn extract_args_list(obj: &Bound<'_, PyAny>) -> PyResult<Vec<SmolStr>> {
if let Ok(list) = obj.cast::<PyList>() {
let mut out = Vec::with_capacity(list.len());
for item in list.iter() {
out.push(SmolStr::new(item.extract::<String>()?));
}
return Ok(out);
}
if let Ok(tuple) = obj.cast::<PyTuple>() {
let len = tuple.len();
let mut out = Vec::with_capacity(len);
for i in 0..len {
let item = tuple.get_item(i)?;
out.push(SmolStr::new(item.extract::<String>()?));
}
return Ok(out);
}
let mut out = Vec::new();
let iter = obj.try_iter()?;
for item in iter {
out.push(SmolStr::new(item?.extract::<String>()?));
}
Ok(out)
}
#[derive(Debug)]
#[doc(hidden)]
#[pyclass]
pub struct PyDecoratorTrampoline {
kind: TrampolineKind,
builder: Arc<ManifestBuilder>,
name: SmolStr,
args: Vec<SmolStr>,
returns: SmolStr,
yields: Vec<SmolStr>,
mode: SmolStr,
vectorized: bool,
determinism: SmolStr,
}
#[derive(Debug, Clone, Copy)]
enum TrampolineKind {
Scalar,
Aggregate,
Procedure,
}
impl PyDecoratorTrampoline {
fn new_scalar(
builder: Arc<ManifestBuilder>,
name: SmolStr,
args: Vec<SmolStr>,
returns: SmolStr,
vectorized: bool,
determinism: SmolStr,
) -> Self {
Self {
kind: TrampolineKind::Scalar,
builder,
name,
args,
returns,
yields: Vec::new(),
mode: SmolStr::default(),
vectorized,
determinism,
}
}
fn new_aggregate(
builder: Arc<ManifestBuilder>,
name: SmolStr,
args: Vec<SmolStr>,
returns: SmolStr,
determinism: SmolStr,
) -> Self {
Self {
kind: TrampolineKind::Aggregate,
builder,
name,
args,
returns,
yields: Vec::new(),
mode: SmolStr::default(),
vectorized: false,
determinism,
}
}
fn new_procedure(
builder: Arc<ManifestBuilder>,
name: SmolStr,
args: Vec<SmolStr>,
yields: Vec<SmolStr>,
mode: SmolStr,
) -> Self {
Self {
kind: TrampolineKind::Procedure,
builder,
name,
args,
yields,
returns: SmolStr::default(),
mode,
vectorized: false,
determinism: SmolStr::default(),
}
}
}
#[pymethods]
impl PyDecoratorTrampoline {
fn __call__(&self, py: Python<'_>, target: Py<PyAny>) -> PyResult<Py<PyAny>> {
match self.kind {
TrampolineKind::Scalar => {
let entry = PyScalarEntry {
name: self.name.clone(),
args: self.args.clone(),
returns: self.returns.clone(),
vectorized: self.vectorized,
determinism: self.determinism.clone(),
callable: target.clone_ref(py),
};
self.builder.push_scalar(entry);
}
TrampolineKind::Aggregate => {
let bound = target.bind(py);
let (init, accumulate, merge, finalize) = extract_agg_methods(bound)?;
let entry = crate::manifest::PyAggregateEntry {
name: self.name.clone(),
args: self.args.clone(),
returns: self.returns.clone(),
determinism: self.determinism.clone(),
init,
accumulate,
merge,
finalize,
};
self.builder.push_aggregate(entry);
}
TrampolineKind::Procedure => {
let entry = crate::manifest::PyProcedureEntry {
name: self.name.clone(),
args: self.args.clone(),
yields: self.yields.clone(),
mode: self.mode.clone(),
callable: target.clone_ref(py),
};
self.builder.push_procedure(entry);
}
}
Ok(target)
}
}
type AggCallables = (Py<PyAny>, Py<PyAny>, Py<PyAny>, Py<PyAny>);
fn extract_agg_methods(obj: &Bound<'_, PyAny>) -> PyResult<AggCallables> {
const KEYS: [&str; 4] = ["init", "accumulate", "merge", "finalize"];
let resolved: Vec<Py<PyAny>> = if let Ok(dict) = obj.cast::<PyDict>() {
KEYS.iter()
.map(|key| {
dict.get_item(key)?
.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err(format!(
"aggregate spec dict missing `{key}` key"
))
})
.map(|v| v.unbind())
})
.collect::<PyResult<_>>()?
} else {
KEYS.iter()
.map(|key| obj.getattr(*key).map(|v| v.unbind()))
.collect::<PyResult<_>>()?
};
let [init, accumulate, merge, finalize] =
resolved.try_into().expect("KEYS has exactly four entries");
Ok((init, accumulate, merge, finalize))
}
#[cfg(test)]
mod tests {
use super::*;
use uni_plugin::PluginRegistry;
fn loader_with_caps() -> (PyPluginLoader, CapabilitySet) {
let loader = PyPluginLoader::with_default_plugin_id("ai.test.pyloader");
let caps = CapabilitySet::from_iter_of([
Capability::ScalarFn,
Capability::AggregateFn,
Capability::Procedure,
]);
(loader, caps)
}
#[test]
fn loader_registers_decorated_scalar() {
Python::initialize();
Python::attach(|py| {
let (loader, caps) = loader_with_caps();
let registry = PluginRegistry::new();
let mut r =
PluginRegistrar::new(PluginId::new("ai.test.placeholder"), &caps, ®istry);
let src = r#"
@db.scalar_fn("double", args=["float"], returns="float", determinism="pure")
def double(x):
return x * 2.0
"#;
let outcome = loader
.load(py, src, "ai.test.pyloader", &mut r, &caps)
.expect("load");
assert_eq!(outcome.scalars_registered.len(), 1);
assert!(outcome.denied_capabilities.is_empty());
r.commit_to_registry().expect("commit");
let q = QName::new("ai.test.pyloader", "double");
assert!(registry.scalar_fn(&q).is_some());
});
}
#[test]
fn loader_denies_ungranted_caps() {
Python::initialize();
Python::attach(|py| {
let loader = PyPluginLoader::with_default_plugin_id("ai.test.deny");
let caps = CapabilitySet::from_iter_of([Capability::AggregateFn]);
let registry = PluginRegistry::new();
let mut r =
PluginRegistrar::new(PluginId::new("ai.test.placeholder"), &caps, ®istry);
let src = r#"
@db.scalar_fn("plus1", args=["float"], returns="float")
def plus1(x):
return x + 1.0
"#;
let outcome = loader
.load(py, src, "ai.test.deny", &mut r, &caps)
.expect("load");
assert!(outcome.scalars_registered.is_empty());
assert!(outcome.denied_capabilities.contains(&Capability::ScalarFn));
});
}
#[test]
fn loader_empty_module_errors() {
Python::initialize();
Python::attach(|py| {
let (loader, caps) = loader_with_caps();
let registry = PluginRegistry::new();
let mut r =
PluginRegistrar::new(PluginId::new("ai.test.placeholder"), &caps, ®istry);
let src = "x = 1 + 1\n";
let err = loader
.load(py, src, "ai.test.empty", &mut r, &caps)
.unwrap_err();
assert!(matches!(err, PyPluginError::ManifestInvalid(_)));
});
}
#[test]
fn loader_parse_error_surfaces() {
Python::initialize();
Python::attach(|py| {
let (loader, caps) = loader_with_caps();
let registry = PluginRegistry::new();
let mut r =
PluginRegistrar::new(PluginId::new("ai.test.placeholder"), &caps, ®istry);
let src = "this is @@@ not valid python\n";
let err = loader
.load(py, src, "ai.test.bad", &mut r, &caps)
.unwrap_err();
match err {
PyPluginError::PythonException { message, .. } => {
assert!(message.contains("SyntaxError"), "got: {message}");
}
other => panic!("unexpected: {other:?}"),
}
});
}
#[test]
fn loader_unknown_type_name_rejected() {
Python::initialize();
Python::attach(|py| {
let (loader, caps) = loader_with_caps();
let registry = PluginRegistry::new();
let mut r =
PluginRegistrar::new(PluginId::new("ai.test.placeholder"), &caps, ®istry);
let src = r#"
@db.scalar_fn("oops", args=["quaternion"], returns="float")
def oops(x):
return x
"#;
let err = loader
.load(py, src, "ai.test.types", &mut r, &caps)
.unwrap_err();
assert!(matches!(err, PyPluginError::ManifestInvalid(_)));
});
}
#[test]
fn loader_set_plugin_id_overrides_default() {
Python::initialize();
Python::attach(|py| {
let (loader, caps) = loader_with_caps();
let registry = PluginRegistry::new();
let mut r =
PluginRegistrar::new(PluginId::new("ai.test.placeholder"), &caps, ®istry);
let src = r#"
db.set_plugin_id("ai.example.geo")
db.set_version("0.3.1")
@db.scalar_fn("haversine", args=["float","float","float","float"], returns="float", vectorized=True, determinism="pure")
def haversine(lat1, lon1, lat2, lon2):
import pyarrow as pa
return pa.array([0.0] * len(lat1))
"#;
let outcome = loader
.load(py, src, "ai.test.pyloader", &mut r, &caps)
.expect("load");
assert_eq!(outcome.plugin_id.as_str(), "ai.example.geo");
assert_eq!(outcome.version, "0.3.1");
r.commit_to_registry().expect("commit");
let q = QName::new("ai.example.geo", "haversine");
assert!(registry.scalar_fn(&q).is_some());
});
}
}