datafusion_python/
utils.rs1use crate::errors::{PyDataFusionError, PyDataFusionResult};
19use crate::TokioRuntime;
20use datafusion::logical_expr::Volatility;
21use pyo3::exceptions::PyValueError;
22use pyo3::prelude::*;
23use pyo3::types::PyCapsule;
24use std::future::Future;
25use std::sync::OnceLock;
26use tokio::runtime::Runtime;
27
28#[inline]
30pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
31 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
37 RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap()))
38}
39
40pub fn wait_for_future<F>(py: Python, f: F) -> F::Output
42where
43 F: Future + Send,
44 F::Output: Send,
45{
46 let runtime: &Runtime = &get_tokio_runtime().0;
47 py.allow_threads(|| runtime.block_on(f))
48}
49
50pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {
51 Ok(match value {
52 "immutable" => Volatility::Immutable,
53 "stable" => Volatility::Stable,
54 "volatile" => Volatility::Volatile,
55 value => {
56 return Err(PyDataFusionError::Common(format!(
57 "Unsupportad volatility type: `{value}`, supported \
58 values are: immutable, stable and volatile."
59 )))
60 }
61 })
62}
63
64pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
65 let capsule_name = capsule.name()?;
66 if capsule_name.is_none() {
67 return Err(PyValueError::new_err(
68 "Expected schema PyCapsule to have name set.",
69 ));
70 }
71
72 let capsule_name = capsule_name.unwrap().to_str()?;
73 if capsule_name != name {
74 return Err(PyValueError::new_err(format!(
75 "Expected name '{}' in PyCapsule, instead got '{}'",
76 name, capsule_name
77 )));
78 }
79
80 Ok(())
81}