datafusion_python/
utils.rs1use crate::errors::{PyDataFusionError, PyDataFusionResult};
19use crate::TokioRuntime;
20use datafusion::execution::context::SessionContext;
21use datafusion::logical_expr::Volatility;
22use pyo3::exceptions::PyValueError;
23use pyo3::prelude::*;
24use pyo3::types::PyCapsule;
25use std::future::Future;
26use std::sync::OnceLock;
27use tokio::runtime::Runtime;
28
29#[inline]
31pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
32 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
38 RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap()))
39}
40
41#[inline]
43pub(crate) fn get_global_ctx() -> &'static SessionContext {
44 static CTX: OnceLock<SessionContext> = OnceLock::new();
45 CTX.get_or_init(SessionContext::new)
46}
47
48pub fn wait_for_future<F>(py: Python, f: F) -> F::Output
50where
51 F: Future + Send,
52 F::Output: Send,
53{
54 let runtime: &Runtime = &get_tokio_runtime().0;
55 py.allow_threads(|| runtime.block_on(f))
56}
57
58pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {
59 Ok(match value {
60 "immutable" => Volatility::Immutable,
61 "stable" => Volatility::Stable,
62 "volatile" => Volatility::Volatile,
63 value => {
64 return Err(PyDataFusionError::Common(format!(
65 "Unsupportad volatility type: `{value}`, supported \
66 values are: immutable, stable and volatile."
67 )))
68 }
69 })
70}
71
72pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
73 let capsule_name = capsule.name()?;
74 if capsule_name.is_none() {
75 return Err(PyValueError::new_err(
76 "Expected schema PyCapsule to have name set.",
77 ));
78 }
79
80 let capsule_name = capsule_name.unwrap().to_str()?;
81 if capsule_name != name {
82 return Err(PyValueError::new_err(format!(
83 "Expected name '{}' in PyCapsule, instead got '{}'",
84 name, capsule_name
85 )));
86 }
87
88 Ok(())
89}