datafusion_python/
udtf.rs1use std::ptr::NonNull;
19use std::sync::Arc;
20
21use datafusion::catalog::{TableFunctionImpl, TableProvider};
22use datafusion::error::Result as DataFusionResult;
23use datafusion::logical_expr::Expr;
24use datafusion_ffi::udtf::FFI_TableFunction;
25use pyo3::IntoPyObjectExt;
26use pyo3::exceptions::{PyImportError, PyTypeError};
27use pyo3::prelude::*;
28use pyo3::types::{PyCapsule, PyTuple, PyType};
29
30use crate::context::PySessionContext;
31use crate::errors::{py_datafusion_err, to_datafusion_err};
32use crate::expr::PyExpr;
33use crate::table::PyTable;
34
35#[pyclass(from_py_object, frozen, name = "TableFunction", module = "datafusion")]
37#[derive(Debug, Clone)]
38pub struct PyTableFunction {
39 pub(crate) name: String,
40 pub(crate) inner: PyTableFunctionInner,
41}
42
43#[derive(Debug, Clone)]
45pub(crate) enum PyTableFunctionInner {
46 PythonFunction(Arc<Py<PyAny>>),
47 FFIFunction(Arc<dyn TableFunctionImpl>),
48}
49
50#[pymethods]
51impl PyTableFunction {
52 #[new]
53 #[pyo3(signature=(name, func, session))]
54 pub fn new(
55 name: &str,
56 func: Bound<'_, PyAny>,
57 session: Option<Bound<PyAny>>,
58 ) -> PyResult<Self> {
59 let inner = if func.hasattr("__datafusion_table_function__")? {
60 let py = func.py();
61 let session = match session {
62 Some(session) => session,
63 None => PySessionContext::global_ctx()?.into_bound_py_any(py)?,
64 };
65 let capsule = func
66 .getattr("__datafusion_table_function__")?
67 .call1((session,)).map_err(|err| {
68 if err.get_type(py).is(PyType::new::<PyTypeError>(py)) {
69 PyImportError::new_err("Incompatible libraries. DataFusion 52.0.0 introduced an incompatible signature change for table functions. Either downgrade DataFusion or upgrade your function library.")
70 } else {
71 err
72 }
73 })?;
74 let capsule = capsule.cast::<PyCapsule>()?;
75 let data: NonNull<FFI_TableFunction> = capsule
76 .pointer_checked(Some(c"datafusion_table_function"))?
77 .cast();
78 let ffi_func = unsafe { data.as_ref() };
79 let foreign_func: Arc<dyn TableFunctionImpl> = ffi_func.to_owned().into();
80
81 PyTableFunctionInner::FFIFunction(foreign_func)
82 } else {
83 let py_obj = Arc::new(func.unbind());
84 PyTableFunctionInner::PythonFunction(py_obj)
85 };
86
87 Ok(Self {
88 name: name.to_string(),
89 inner,
90 })
91 }
92
93 #[pyo3(signature = (*args))]
94 pub fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyTable> {
95 let args: Vec<Expr> = args.iter().map(|e| e.expr.clone()).collect();
96 let table_provider = self.call(&args).map_err(py_datafusion_err)?;
97
98 Ok(PyTable::from(table_provider))
99 }
100
101 fn __repr__(&self) -> PyResult<String> {
102 Ok(format!("TableUDF({})", self.name))
103 }
104}
105
106#[allow(clippy::result_large_err)]
107fn call_python_table_function(
108 func: &Arc<Py<PyAny>>,
109 args: &[Expr],
110) -> DataFusionResult<Arc<dyn TableProvider>> {
111 let args = args
112 .iter()
113 .map(|arg| PyExpr::from(arg.clone()))
114 .collect::<Vec<_>>();
115
116 Python::attach(|py| {
118 let py_args = PyTuple::new(py, args)?;
119 let provider_obj = func.call1(py, py_args)?;
120 let provider = provider_obj.bind(py).clone();
121
122 Ok::<Arc<dyn TableProvider>, PyErr>(PyTable::new(provider, None)?.table)
123 })
124 .map_err(to_datafusion_err)
125}
126
127impl TableFunctionImpl for PyTableFunction {
128 fn call(&self, args: &[Expr]) -> DataFusionResult<Arc<dyn TableProvider>> {
129 match &self.inner {
130 PyTableFunctionInner::FFIFunction(func) => func.call(args),
131 PyTableFunctionInner::PythonFunction(obj) => call_python_table_function(obj, args),
132 }
133 }
134}