Skip to main content

datafusion_python/
context.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::path::PathBuf;
20use std::ptr::NonNull;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use arrow::array::RecordBatchReader;
25use arrow::ffi_stream::ArrowArrayStreamReader;
26use arrow::pyarrow::FromPyArrow;
27use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
28use datafusion::arrow::pyarrow::PyArrowType;
29use datafusion::arrow::record_batch::RecordBatch;
30use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory};
31use datafusion::common::{DFSchema, ScalarValue, TableReference, exec_err};
32use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
33use datafusion::datasource::file_format::parquet::ParquetFormat;
34use datafusion::datasource::listing::{
35    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
36};
37use datafusion::datasource::{MemTable, TableProvider};
38use datafusion::execution::TaskContextProvider;
39use datafusion::execution::context::{
40    DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
41};
42use datafusion::execution::disk_manager::DiskManagerMode;
43use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
44use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
45use datafusion::execution::runtime_env::RuntimeEnvBuilder;
46use datafusion::execution::session_state::SessionStateBuilder;
47use datafusion::prelude::{
48    AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
49};
50use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
51use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
52use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
53use datafusion_ffi::execution::FFI_TaskContextProvider;
54use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
55use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
56use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
57use datafusion_python_util::{
58    create_logical_extension_capsule, ffi_logical_codec_from_pycapsule, get_global_ctx,
59    get_tokio_runtime, spawn_future, wait_for_future,
60};
61use object_store::ObjectStore;
62use pyo3::IntoPyObjectExt;
63use pyo3::exceptions::{PyKeyError, PyRuntimeError, PyValueError};
64use pyo3::prelude::*;
65use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple};
66use url::Url;
67use uuid::Uuid;
68
69use crate::catalog::{
70    PyCatalog, PyCatalogList, RustWrappedPyCatalogProvider, RustWrappedPyCatalogProviderList,
71};
72use crate::common::data_type::PyScalarValue;
73use crate::common::df_schema::PyDFSchema;
74use crate::dataframe::PyDataFrame;
75use crate::dataset::Dataset;
76use crate::errors::{
77    PyDataFusionError, PyDataFusionResult, from_datafusion_error, py_datafusion_err,
78};
79use crate::expr::PyExpr;
80use crate::expr::sort_expr::PySortExpr;
81use crate::options::PyCsvReadOptions;
82use crate::physical_plan::PyExecutionPlan;
83use crate::record_batch::PyRecordBatchStream;
84use crate::sql::logical::PyLogicalPlan;
85use crate::sql::util::replace_placeholders_with_strings;
86use crate::store::StorageContexts;
87use crate::table::{PyTable, RustWrappedPyTableProviderFactory};
88use crate::udaf::PyAggregateUDF;
89use crate::udf::PyScalarUDF;
90use crate::udtf::PyTableFunction;
91use crate::udwf::PyWindowUDF;
92
93/// Configuration options for a SessionContext
94#[pyclass(
95    from_py_object,
96    frozen,
97    name = "SessionConfig",
98    module = "datafusion",
99    subclass
100)]
101#[derive(Clone, Default)]
102pub struct PySessionConfig {
103    pub config: SessionConfig,
104}
105
106impl From<SessionConfig> for PySessionConfig {
107    fn from(config: SessionConfig) -> Self {
108        Self { config }
109    }
110}
111
112#[pymethods]
113impl PySessionConfig {
114    #[pyo3(signature = (config_options=None))]
115    #[new]
116    fn new(config_options: Option<HashMap<String, String>>) -> Self {
117        let mut config = SessionConfig::new();
118        if let Some(hash_map) = config_options {
119            for (k, v) in &hash_map {
120                config = config.set(k, &ScalarValue::Utf8(Some(v.clone())));
121            }
122        }
123
124        Self { config }
125    }
126
127    fn with_create_default_catalog_and_schema(&self, enabled: bool) -> Self {
128        Self::from(
129            self.config
130                .clone()
131                .with_create_default_catalog_and_schema(enabled),
132        )
133    }
134
135    fn with_default_catalog_and_schema(&self, catalog: &str, schema: &str) -> Self {
136        Self::from(
137            self.config
138                .clone()
139                .with_default_catalog_and_schema(catalog, schema),
140        )
141    }
142
143    fn with_information_schema(&self, enabled: bool) -> Self {
144        Self::from(self.config.clone().with_information_schema(enabled))
145    }
146
147    fn with_batch_size(&self, batch_size: usize) -> Self {
148        Self::from(self.config.clone().with_batch_size(batch_size))
149    }
150
151    fn with_target_partitions(&self, target_partitions: usize) -> Self {
152        Self::from(
153            self.config
154                .clone()
155                .with_target_partitions(target_partitions),
156        )
157    }
158
159    fn with_repartition_aggregations(&self, enabled: bool) -> Self {
160        Self::from(self.config.clone().with_repartition_aggregations(enabled))
161    }
162
163    fn with_repartition_joins(&self, enabled: bool) -> Self {
164        Self::from(self.config.clone().with_repartition_joins(enabled))
165    }
166
167    fn with_repartition_windows(&self, enabled: bool) -> Self {
168        Self::from(self.config.clone().with_repartition_windows(enabled))
169    }
170
171    fn with_repartition_sorts(&self, enabled: bool) -> Self {
172        Self::from(self.config.clone().with_repartition_sorts(enabled))
173    }
174
175    fn with_repartition_file_scans(&self, enabled: bool) -> Self {
176        Self::from(self.config.clone().with_repartition_file_scans(enabled))
177    }
178
179    fn with_repartition_file_min_size(&self, size: usize) -> Self {
180        Self::from(self.config.clone().with_repartition_file_min_size(size))
181    }
182
183    fn with_parquet_pruning(&self, enabled: bool) -> Self {
184        Self::from(self.config.clone().with_parquet_pruning(enabled))
185    }
186
187    fn set(&self, key: &str, value: &str) -> Self {
188        Self::from(self.config.clone().set_str(key, value))
189    }
190
191    pub fn with_extension(&self, extension: Bound<PyAny>) -> PyResult<Self> {
192        if !extension.hasattr("__datafusion_extension_options__")? {
193            return Err(pyo3::exceptions::PyAttributeError::new_err(
194                "Expected extension object to define __datafusion_extension_options__()",
195            ));
196        }
197        let capsule = extension.call_method0("__datafusion_extension_options__")?;
198        let capsule = capsule.cast::<PyCapsule>()?;
199
200        let extension: NonNull<FFI_ExtensionOptions> = capsule
201            .pointer_checked(Some(c"datafusion_extension_options"))?
202            .cast();
203        let mut extension = unsafe { extension.as_ref() }.clone();
204
205        let mut config = self.config.clone();
206        let options = config.options_mut();
207        if let Some(prior_extension) = options.extensions.get::<FFI_ExtensionOptions>() {
208            extension
209                .merge(prior_extension)
210                .map_err(py_datafusion_err)?;
211        }
212
213        options.extensions.insert(extension);
214
215        Ok(Self::from(config))
216    }
217}
218
219/// Runtime options for a SessionContext
220#[pyclass(
221    from_py_object,
222    frozen,
223    name = "RuntimeEnvBuilder",
224    module = "datafusion",
225    subclass
226)]
227#[derive(Clone)]
228pub struct PyRuntimeEnvBuilder {
229    pub builder: RuntimeEnvBuilder,
230}
231
232#[pymethods]
233impl PyRuntimeEnvBuilder {
234    #[new]
235    fn new() -> Self {
236        Self {
237            builder: RuntimeEnvBuilder::default(),
238        }
239    }
240
241    fn with_disk_manager_disabled(&self) -> Self {
242        let mut runtime_builder = self.builder.clone();
243
244        let mut disk_mgr_builder = runtime_builder
245            .disk_manager_builder
246            .clone()
247            .unwrap_or_default();
248        disk_mgr_builder.set_mode(DiskManagerMode::Disabled);
249
250        runtime_builder = runtime_builder.with_disk_manager_builder(disk_mgr_builder);
251        Self {
252            builder: runtime_builder,
253        }
254    }
255
256    fn with_disk_manager_os(&self) -> Self {
257        let mut runtime_builder = self.builder.clone();
258
259        let mut disk_mgr_builder = runtime_builder
260            .disk_manager_builder
261            .clone()
262            .unwrap_or_default();
263        disk_mgr_builder.set_mode(DiskManagerMode::OsTmpDirectory);
264
265        runtime_builder = runtime_builder.with_disk_manager_builder(disk_mgr_builder);
266        Self {
267            builder: runtime_builder,
268        }
269    }
270
271    fn with_disk_manager_specified(&self, paths: Vec<String>) -> Self {
272        let paths = paths.iter().map(|s| s.into()).collect();
273        let mut runtime_builder = self.builder.clone();
274
275        let mut disk_mgr_builder = runtime_builder
276            .disk_manager_builder
277            .clone()
278            .unwrap_or_default();
279        disk_mgr_builder.set_mode(DiskManagerMode::Directories(paths));
280
281        runtime_builder = runtime_builder.with_disk_manager_builder(disk_mgr_builder);
282        Self {
283            builder: runtime_builder,
284        }
285    }
286
287    fn with_unbounded_memory_pool(&self) -> Self {
288        let builder = self.builder.clone();
289        let builder = builder.with_memory_pool(Arc::new(UnboundedMemoryPool::default()));
290        Self { builder }
291    }
292
293    fn with_fair_spill_pool(&self, size: usize) -> Self {
294        let builder = self.builder.clone();
295        let builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(size)));
296        Self { builder }
297    }
298
299    fn with_greedy_memory_pool(&self, size: usize) -> Self {
300        let builder = self.builder.clone();
301        let builder = builder.with_memory_pool(Arc::new(GreedyMemoryPool::new(size)));
302        Self { builder }
303    }
304
305    fn with_temp_file_path(&self, path: &str) -> Self {
306        let builder = self.builder.clone();
307        let builder = builder.with_temp_file_path(path);
308        Self { builder }
309    }
310}
311
312/// `PySQLOptions` allows you to specify options to the sql execution.
313#[pyclass(
314    from_py_object,
315    frozen,
316    name = "SQLOptions",
317    module = "datafusion",
318    subclass
319)]
320#[derive(Clone)]
321pub struct PySQLOptions {
322    pub options: SQLOptions,
323}
324
325impl From<SQLOptions> for PySQLOptions {
326    fn from(options: SQLOptions) -> Self {
327        Self { options }
328    }
329}
330
331#[pymethods]
332impl PySQLOptions {
333    #[new]
334    fn new() -> Self {
335        let options = SQLOptions::new();
336        Self { options }
337    }
338
339    /// Should DDL data modification commands  (e.g. `CREATE TABLE`) be run? Defaults to `true`.
340    fn with_allow_ddl(&self, allow: bool) -> Self {
341        Self::from(self.options.with_allow_ddl(allow))
342    }
343
344    /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`
345    pub fn with_allow_dml(&self, allow: bool) -> Self {
346        Self::from(self.options.with_allow_dml(allow))
347    }
348
349    /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` ...`) be run?. Defaults to `true`
350    pub fn with_allow_statements(&self, allow: bool) -> Self {
351        Self::from(self.options.with_allow_statements(allow))
352    }
353}
354
355/// `PySessionContext` is able to plan and execute DataFusion plans.
356/// It has a powerful optimizer, a physical planner for local execution, and a
357/// multi-threaded execution engine to perform the execution.
358#[pyclass(
359    from_py_object,
360    frozen,
361    name = "SessionContext",
362    module = "datafusion",
363    subclass
364)]
365#[derive(Clone)]
366pub struct PySessionContext {
367    pub ctx: Arc<SessionContext>,
368    logical_codec: Arc<FFI_LogicalExtensionCodec>,
369}
370
371#[pymethods]
372impl PySessionContext {
373    #[pyo3(signature = (config=None, runtime=None))]
374    #[new]
375    pub fn new(
376        config: Option<PySessionConfig>,
377        runtime: Option<PyRuntimeEnvBuilder>,
378    ) -> PyDataFusionResult<Self> {
379        let config = if let Some(c) = config {
380            c.config
381        } else {
382            SessionConfig::default().with_information_schema(true)
383        };
384        let runtime_env_builder = if let Some(c) = runtime {
385            c.builder
386        } else {
387            RuntimeEnvBuilder::default()
388        };
389        let runtime = Arc::new(runtime_env_builder.build()?);
390        let session_state = SessionStateBuilder::new()
391            .with_config(config)
392            .with_runtime_env(runtime)
393            .with_default_features()
394            .build();
395        let ctx = Arc::new(SessionContext::new_with_state(session_state));
396        let logical_codec = Self::default_logical_codec(&ctx);
397        Ok(PySessionContext { ctx, logical_codec })
398    }
399
400    pub fn enable_url_table(&self) -> PyResult<Self> {
401        Ok(PySessionContext {
402            ctx: Arc::new(self.ctx.as_ref().clone().enable_url_table()),
403            logical_codec: Arc::clone(&self.logical_codec),
404        })
405    }
406
407    #[staticmethod]
408    #[pyo3(signature = ())]
409    pub fn global_ctx() -> PyResult<Self> {
410        let ctx = get_global_ctx().clone();
411        let logical_codec = Self::default_logical_codec(&ctx);
412        Ok(Self { ctx, logical_codec })
413    }
414
415    /// Register an object store with the given name
416    #[pyo3(signature = (scheme, store, host=None))]
417    pub fn register_object_store(
418        &self,
419        scheme: &str,
420        store: StorageContexts,
421        host: Option<&str>,
422    ) -> PyResult<()> {
423        // for most stores the "host" is the bucket name and can be inferred from the store
424        let (store, upstream_host): (Arc<dyn ObjectStore>, String) = match store {
425            StorageContexts::AmazonS3(s3) => (s3.inner, s3.bucket_name),
426            StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name),
427            StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name),
428            StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()),
429            StorageContexts::HTTP(http) => (http.store, http.url),
430        };
431
432        // let users override the host to match the api signature from upstream
433        let derived_host = if let Some(host) = host {
434            host
435        } else {
436            &upstream_host
437        };
438        let url_string = format!("{scheme}{derived_host}");
439        let url = Url::parse(&url_string).map_err(|e| PyValueError::new_err(e.to_string()))?;
440        self.ctx.runtime_env().register_object_store(&url, store);
441        Ok(())
442    }
443
444    /// Deregister an object store with the given url
445    #[pyo3(signature = (scheme, host=None))]
446    pub fn deregister_object_store(
447        &self,
448        scheme: &str,
449        host: Option<&str>,
450    ) -> PyDataFusionResult<()> {
451        let host = host.unwrap_or("");
452        let url_string = format!("{scheme}{host}");
453        let url = Url::parse(&url_string).map_err(|e| PyDataFusionError::Common(e.to_string()))?;
454        self.ctx.runtime_env().deregister_object_store(&url)?;
455        Ok(())
456    }
457
458    #[allow(clippy::too_many_arguments)]
459    #[pyo3(signature = (name, path, table_partition_cols=vec![],
460    file_extension=".parquet",
461    schema=None,
462    file_sort_order=None))]
463    pub fn register_listing_table(
464        &self,
465        name: &str,
466        path: &str,
467        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
468        file_extension: &str,
469        schema: Option<PyArrowType<Schema>>,
470        file_sort_order: Option<Vec<Vec<PySortExpr>>>,
471        py: Python,
472    ) -> PyDataFusionResult<()> {
473        let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
474            .with_file_extension(file_extension)
475            .with_table_partition_cols(
476                table_partition_cols
477                    .into_iter()
478                    .map(|(name, ty)| (name, ty.0))
479                    .collect::<Vec<(String, DataType)>>(),
480            )
481            .with_file_sort_order(
482                file_sort_order
483                    .unwrap_or_default()
484                    .into_iter()
485                    .map(|e| e.into_iter().map(|f| f.into()).collect())
486                    .collect(),
487            );
488        let table_path = ListingTableUrl::parse(path)?;
489        let resolved_schema: SchemaRef = match schema {
490            Some(s) => Arc::new(s.0),
491            None => {
492                let state = self.ctx.state();
493                let schema = options.infer_schema(&state, &table_path);
494                wait_for_future(py, schema)??
495            }
496        };
497        let config = ListingTableConfig::new(table_path)
498            .with_listing_options(options)
499            .with_schema(resolved_schema);
500        let table = ListingTable::try_new(config)?;
501        self.ctx.register_table(name, Arc::new(table))?;
502        Ok(())
503    }
504
505    pub fn register_udtf(&self, func: PyTableFunction) {
506        let name = func.name.clone();
507        let func = Arc::new(func);
508        self.ctx.register_udtf(&name, func);
509    }
510
511    pub fn deregister_udtf(&self, name: &str) {
512        self.ctx.deregister_udtf(name);
513    }
514
515    #[pyo3(signature = (query, options=None, param_values=HashMap::default(), param_strings=HashMap::default()))]
516    pub fn sql_with_options(
517        &self,
518        py: Python,
519        mut query: String,
520        options: Option<PySQLOptions>,
521        param_values: HashMap<String, PyScalarValue>,
522        param_strings: HashMap<String, String>,
523    ) -> PyDataFusionResult<PyDataFrame> {
524        let options = if let Some(options) = options {
525            options.options
526        } else {
527            SQLOptions::new()
528        };
529
530        let param_values = param_values
531            .into_iter()
532            .map(|(name, value)| (name, ScalarValue::from(value)))
533            .collect::<HashMap<_, _>>();
534
535        let state = self.ctx.state();
536        let dialect = state.config().options().sql_parser.dialect.as_ref();
537
538        if !param_strings.is_empty() {
539            query = replace_placeholders_with_strings(&query, dialect, param_strings)?;
540        }
541
542        let mut df = wait_for_future(py, async {
543            self.ctx.sql_with_options(&query, options).await
544        })?
545        .map_err(from_datafusion_error)?;
546
547        if !param_values.is_empty() {
548            df = df.with_param_values(param_values)?;
549        }
550
551        Ok(PyDataFrame::new(df))
552    }
553
554    #[pyo3(signature = (partitions, name=None, schema=None))]
555    pub fn create_dataframe(
556        &self,
557        partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
558        name: Option<&str>,
559        schema: Option<PyArrowType<Schema>>,
560        py: Python,
561    ) -> PyDataFusionResult<PyDataFrame> {
562        let schema = if let Some(schema) = schema {
563            SchemaRef::from(schema.0)
564        } else {
565            partitions.0[0][0].schema()
566        };
567
568        let table = MemTable::try_new(schema, partitions.0)?;
569
570        // generate a random (unique) name for this table if none is provided
571        // table name cannot start with numeric digit
572        let table_name = match name {
573            Some(val) => val.to_owned(),
574            None => {
575                "c".to_owned()
576                    + Uuid::new_v4()
577                        .simple()
578                        .encode_lower(&mut Uuid::encode_buffer())
579            }
580        };
581
582        self.ctx.register_table(&*table_name, Arc::new(table))?;
583
584        let table = wait_for_future(py, self._table(&table_name))??;
585
586        let df = PyDataFrame::new(table);
587        Ok(df)
588    }
589
590    /// Create a DataFrame from an existing logical plan
591    pub fn create_dataframe_from_logical_plan(&self, plan: PyLogicalPlan) -> PyDataFrame {
592        PyDataFrame::new(DataFrame::new(self.ctx.state(), plan.plan.as_ref().clone()))
593    }
594
595    /// Construct datafusion dataframe from Python list
596    #[pyo3(signature = (data, name=None))]
597    pub fn from_pylist(
598        &self,
599        data: Bound<'_, PyList>,
600        name: Option<&str>,
601    ) -> PyResult<PyDataFrame> {
602        // Acquire GIL Token
603        let py = data.py();
604
605        // Instantiate pyarrow Table object & convert to Arrow Table
606        let table_class = py.import("pyarrow")?.getattr("Table")?;
607        let args = PyTuple::new(py, &[data])?;
608        let table = table_class.call_method1("from_pylist", args)?;
609
610        // Convert Arrow Table to datafusion DataFrame
611        let df = self.from_arrow(table, name, py)?;
612        Ok(df)
613    }
614
615    /// Construct datafusion dataframe from Python dictionary
616    #[pyo3(signature = (data, name=None))]
617    pub fn from_pydict(
618        &self,
619        data: Bound<'_, PyDict>,
620        name: Option<&str>,
621    ) -> PyResult<PyDataFrame> {
622        // Acquire GIL Token
623        let py = data.py();
624
625        // Instantiate pyarrow Table object & convert to Arrow Table
626        let table_class = py.import("pyarrow")?.getattr("Table")?;
627        let args = PyTuple::new(py, &[data])?;
628        let table = table_class.call_method1("from_pydict", args)?;
629
630        // Convert Arrow Table to datafusion DataFrame
631        let df = self.from_arrow(table, name, py)?;
632        Ok(df)
633    }
634
635    /// Construct datafusion dataframe from Arrow Table
636    #[pyo3(signature = (data, name=None))]
637    pub fn from_arrow(
638        &self,
639        data: Bound<'_, PyAny>,
640        name: Option<&str>,
641        py: Python,
642    ) -> PyDataFusionResult<PyDataFrame> {
643        let (schema, batches) =
644            if let Ok(stream_reader) = ArrowArrayStreamReader::from_pyarrow_bound(&data) {
645                // Works for any object that implements __arrow_c_stream__ in pycapsule.
646
647                let schema = stream_reader.schema().as_ref().to_owned();
648                let batches = stream_reader
649                    .collect::<Result<Vec<RecordBatch>, arrow::error::ArrowError>>()?;
650
651                (schema, batches)
652            } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
653                // While this says RecordBatch, it will work for any object that implements
654                // __arrow_c_array__ and returns a StructArray.
655
656                (array.schema().as_ref().to_owned(), vec![array])
657            } else {
658                return Err(PyDataFusionError::Common(
659                    "Expected either a Arrow Array or Arrow Stream in from_arrow().".to_string(),
660                ));
661            };
662
663        // Because create_dataframe() expects a vector of vectors of record batches
664        // here we need to wrap the vector of record batches in an additional vector
665        let list_of_batches = PyArrowType::from(vec![batches]);
666        self.create_dataframe(list_of_batches, name, Some(schema.into()), py)
667    }
668
669    /// Construct datafusion dataframe from pandas
670    #[allow(clippy::wrong_self_convention)]
671    #[pyo3(signature = (data, name=None))]
672    pub fn from_pandas(&self, data: Bound<'_, PyAny>, name: Option<&str>) -> PyResult<PyDataFrame> {
673        // Obtain GIL token
674        let py = data.py();
675
676        // Instantiate pyarrow Table object & convert to Arrow Table
677        let table_class = py.import("pyarrow")?.getattr("Table")?;
678        let args = PyTuple::new(py, &[data])?;
679        let table = table_class.call_method1("from_pandas", args)?;
680
681        // Convert Arrow Table to datafusion DataFrame
682        let df = self.from_arrow(table, name, py)?;
683        Ok(df)
684    }
685
686    /// Construct datafusion dataframe from polars
687    #[pyo3(signature = (data, name=None))]
688    pub fn from_polars(&self, data: Bound<'_, PyAny>, name: Option<&str>) -> PyResult<PyDataFrame> {
689        // Convert Polars dataframe to Arrow Table
690        let table = data.call_method0("to_arrow")?;
691
692        // Convert Arrow Table to datafusion DataFrame
693        let df = self.from_arrow(table, name, data.py())?;
694        Ok(df)
695    }
696
697    pub fn register_table(&self, name: &str, table: Bound<'_, PyAny>) -> PyDataFusionResult<()> {
698        let session = self.clone().into_bound_py_any(table.py())?;
699        let table = PyTable::new(table, Some(session))?;
700
701        self.ctx.register_table(name, table.table)?;
702        Ok(())
703    }
704
705    pub fn deregister_table(&self, name: &str) -> PyDataFusionResult<()> {
706        self.ctx.deregister_table(name)?;
707        Ok(())
708    }
709
710    pub fn register_table_factory(
711        &self,
712        format: &str,
713        mut factory: Bound<'_, PyAny>,
714    ) -> PyDataFusionResult<()> {
715        if factory.hasattr("__datafusion_table_provider_factory__")? {
716            let py = factory.py();
717            let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
718            factory = factory
719                .getattr("__datafusion_table_provider_factory__")?
720                .call1((codec_capsule,))?;
721        }
722
723        let factory: Arc<dyn TableProviderFactory> =
724            if let Ok(capsule) = factory.cast::<PyCapsule>().map_err(py_datafusion_err) {
725                let data: NonNull<FFI_TableProviderFactory> = capsule
726                    .pointer_checked(Some(c"datafusion_table_provider_factory"))?
727                    .cast();
728                let factory = unsafe { data.as_ref() };
729                factory.into()
730            } else {
731                Arc::new(RustWrappedPyTableProviderFactory::new(
732                    factory.into(),
733                    self.logical_codec.clone(),
734                ))
735            };
736
737        let st = self.ctx.state_ref();
738        let mut lock = st.write();
739        lock.table_factories_mut()
740            .insert(format.to_owned(), factory);
741
742        Ok(())
743    }
744
745    pub fn register_catalog_provider_list(
746        &self,
747        mut provider: Bound<PyAny>,
748    ) -> PyDataFusionResult<()> {
749        if provider.hasattr("__datafusion_catalog_provider_list__")? {
750            let py = provider.py();
751            let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
752            provider = provider
753                .getattr("__datafusion_catalog_provider_list__")?
754                .call1((codec_capsule,))?;
755        }
756
757        let provider = if let Ok(capsule) = provider.cast::<PyCapsule>() {
758            let data: NonNull<FFI_CatalogProviderList> = capsule
759                .pointer_checked(Some(c"datafusion_catalog_provider_list"))?
760                .cast();
761            let provider = unsafe { data.as_ref() };
762            let provider: Arc<dyn CatalogProviderList + Send> = provider.into();
763            provider as Arc<dyn CatalogProviderList>
764        } else {
765            match provider.extract::<PyCatalogList>() {
766                Ok(py_catalog_list) => py_catalog_list.catalog_list,
767                Err(_) => Arc::new(RustWrappedPyCatalogProviderList::new(
768                    provider.into(),
769                    Arc::clone(&self.logical_codec),
770                )) as Arc<dyn CatalogProviderList>,
771            }
772        };
773
774        self.ctx.register_catalog_list(provider);
775
776        Ok(())
777    }
778
779    pub fn register_catalog_provider(
780        &self,
781        name: &str,
782        mut provider: Bound<'_, PyAny>,
783    ) -> PyDataFusionResult<()> {
784        if provider.hasattr("__datafusion_catalog_provider__")? {
785            let py = provider.py();
786            let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
787            provider = provider
788                .getattr("__datafusion_catalog_provider__")?
789                .call1((codec_capsule,))?;
790        }
791
792        let provider = if let Ok(capsule) = provider.cast::<PyCapsule>() {
793            let data: NonNull<FFI_CatalogProvider> = capsule
794                .pointer_checked(Some(c"datafusion_catalog_provider"))?
795                .cast();
796            let provider = unsafe { data.as_ref() };
797            let provider: Arc<dyn CatalogProvider + Send> = provider.into();
798            provider as Arc<dyn CatalogProvider>
799        } else {
800            match provider.extract::<PyCatalog>() {
801                Ok(py_catalog) => py_catalog.catalog,
802                Err(_) => Arc::new(RustWrappedPyCatalogProvider::new(
803                    provider.into(),
804                    Arc::clone(&self.logical_codec),
805                )) as Arc<dyn CatalogProvider>,
806            }
807        };
808
809        let _ = self.ctx.register_catalog(name, provider);
810
811        Ok(())
812    }
813
814    /// Construct datafusion dataframe from Arrow Table
815    pub fn register_table_provider(
816        &self,
817        name: &str,
818        provider: Bound<'_, PyAny>,
819    ) -> PyDataFusionResult<()> {
820        // Deprecated: use `register_table` instead
821        self.register_table(name, provider)
822    }
823
824    pub fn register_record_batches(
825        &self,
826        name: &str,
827        partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
828    ) -> PyDataFusionResult<()> {
829        let schema = partitions.0[0][0].schema();
830        let table = MemTable::try_new(schema, partitions.0)?;
831        self.ctx.register_table(name, Arc::new(table))?;
832        Ok(())
833    }
834
835    #[allow(clippy::too_many_arguments)]
836    #[pyo3(signature = (name, path, table_partition_cols=vec![],
837                        parquet_pruning=true,
838                        file_extension=".parquet",
839                        skip_metadata=true,
840                        schema=None,
841                        file_sort_order=None))]
842    pub fn register_parquet(
843        &self,
844        name: &str,
845        path: &str,
846        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
847        parquet_pruning: bool,
848        file_extension: &str,
849        skip_metadata: bool,
850        schema: Option<PyArrowType<Schema>>,
851        file_sort_order: Option<Vec<Vec<PySortExpr>>>,
852        py: Python,
853    ) -> PyDataFusionResult<()> {
854        let mut options = ParquetReadOptions::default()
855            .table_partition_cols(
856                table_partition_cols
857                    .into_iter()
858                    .map(|(name, ty)| (name, ty.0))
859                    .collect::<Vec<(String, DataType)>>(),
860            )
861            .parquet_pruning(parquet_pruning)
862            .skip_metadata(skip_metadata);
863        options.file_extension = file_extension;
864        options.schema = schema.as_ref().map(|x| &x.0);
865        options.file_sort_order = file_sort_order
866            .unwrap_or_default()
867            .into_iter()
868            .map(|e| e.into_iter().map(|f| f.into()).collect())
869            .collect();
870
871        let result = self.ctx.register_parquet(name, path, options);
872        wait_for_future(py, result)??;
873        Ok(())
874    }
875
876    #[pyo3(signature = (name,
877                        path,
878                        options=None))]
879    pub fn register_csv(
880        &self,
881        name: &str,
882        path: &Bound<'_, PyAny>,
883        options: Option<&PyCsvReadOptions>,
884        py: Python,
885    ) -> PyDataFusionResult<()> {
886        let options = options
887            .map(|opts| opts.try_into())
888            .transpose()?
889            .unwrap_or_default();
890
891        if path.is_instance_of::<PyList>() {
892            let paths = path.extract::<Vec<String>>()?;
893            let result = self.register_csv_from_multiple_paths(name, paths, options);
894            wait_for_future(py, result)??;
895        } else {
896            let path = path.extract::<String>()?;
897            let result = self.ctx.register_csv(name, &path, options);
898            wait_for_future(py, result)??;
899        }
900
901        Ok(())
902    }
903
904    #[allow(clippy::too_many_arguments)]
905    #[pyo3(signature = (name,
906                        path,
907                        schema=None,
908                        schema_infer_max_records=1000,
909                        file_extension=".json",
910                        table_partition_cols=vec![],
911                        file_compression_type=None))]
912    pub fn register_json(
913        &self,
914        name: &str,
915        path: PathBuf,
916        schema: Option<PyArrowType<Schema>>,
917        schema_infer_max_records: usize,
918        file_extension: &str,
919        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
920        file_compression_type: Option<String>,
921        py: Python,
922    ) -> PyDataFusionResult<()> {
923        let path = path
924            .to_str()
925            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
926
927        let mut options = JsonReadOptions::default()
928            .file_compression_type(parse_file_compression_type(file_compression_type)?)
929            .table_partition_cols(
930                table_partition_cols
931                    .into_iter()
932                    .map(|(name, ty)| (name, ty.0))
933                    .collect::<Vec<(String, DataType)>>(),
934            );
935        options.schema_infer_max_records = schema_infer_max_records;
936        options.file_extension = file_extension;
937        options.schema = schema.as_ref().map(|x| &x.0);
938
939        let result = self.ctx.register_json(name, path, options);
940        wait_for_future(py, result)??;
941
942        Ok(())
943    }
944
945    #[allow(clippy::too_many_arguments)]
946    #[pyo3(signature = (name,
947                        path,
948                        schema=None,
949                        file_extension=".avro",
950                        table_partition_cols=vec![]))]
951    pub fn register_avro(
952        &self,
953        name: &str,
954        path: PathBuf,
955        schema: Option<PyArrowType<Schema>>,
956        file_extension: &str,
957        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
958        py: Python,
959    ) -> PyDataFusionResult<()> {
960        let path = path
961            .to_str()
962            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
963
964        let mut options = AvroReadOptions::default().table_partition_cols(
965            table_partition_cols
966                .into_iter()
967                .map(|(name, ty)| (name, ty.0))
968                .collect::<Vec<(String, DataType)>>(),
969        );
970        options.file_extension = file_extension;
971        options.schema = schema.as_ref().map(|x| &x.0);
972
973        let result = self.ctx.register_avro(name, path, options);
974        wait_for_future(py, result)??;
975
976        Ok(())
977    }
978
979    #[pyo3(signature = (name, path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))]
980    pub fn register_arrow(
981        &self,
982        name: &str,
983        path: &str,
984        schema: Option<PyArrowType<Schema>>,
985        file_extension: &str,
986        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
987        py: Python,
988    ) -> PyDataFusionResult<()> {
989        let mut options = ArrowReadOptions::default().table_partition_cols(
990            table_partition_cols
991                .into_iter()
992                .map(|(name, ty)| (name, ty.0))
993                .collect::<Vec<(String, DataType)>>(),
994        );
995        options.file_extension = file_extension;
996        options.schema = schema.as_ref().map(|x| &x.0);
997
998        let result = self.ctx.register_arrow(name, path, options);
999        wait_for_future(py, result)??;
1000        Ok(())
1001    }
1002
1003    pub fn register_batch(
1004        &self,
1005        name: &str,
1006        batch: PyArrowType<RecordBatch>,
1007    ) -> PyDataFusionResult<()> {
1008        self.ctx.register_batch(name, batch.0)?;
1009        Ok(())
1010    }
1011
1012    // Registers a PyArrow.Dataset
1013    pub fn register_dataset(
1014        &self,
1015        name: &str,
1016        dataset: &Bound<'_, PyAny>,
1017        py: Python,
1018    ) -> PyDataFusionResult<()> {
1019        let table: Arc<dyn TableProvider> = Arc::new(Dataset::new(dataset, py)?);
1020
1021        self.ctx.register_table(name, table)?;
1022
1023        Ok(())
1024    }
1025
1026    pub fn register_udf(&self, udf: PyScalarUDF) -> PyResult<()> {
1027        self.ctx.register_udf(udf.function);
1028        Ok(())
1029    }
1030
1031    pub fn deregister_udf(&self, name: &str) {
1032        self.ctx.deregister_udf(name);
1033    }
1034
1035    pub fn register_udaf(&self, udaf: PyAggregateUDF) -> PyResult<()> {
1036        self.ctx.register_udaf(udaf.function);
1037        Ok(())
1038    }
1039
1040    pub fn deregister_udaf(&self, name: &str) {
1041        self.ctx.deregister_udaf(name);
1042    }
1043
1044    pub fn register_udwf(&self, udwf: PyWindowUDF) -> PyResult<()> {
1045        self.ctx.register_udwf(udwf.function);
1046        Ok(())
1047    }
1048
1049    pub fn deregister_udwf(&self, name: &str) {
1050        self.ctx.deregister_udwf(name);
1051    }
1052
1053    #[pyo3(signature = (name="datafusion"))]
1054    pub fn catalog(&self, py: Python, name: &str) -> PyResult<Py<PyAny>> {
1055        let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(
1056            "Catalog with name {name} doesn't exist."
1057        )))?;
1058
1059        match catalog
1060            .as_any()
1061            .downcast_ref::<RustWrappedPyCatalogProvider>()
1062        {
1063            Some(wrapped_schema) => Ok(wrapped_schema.catalog_provider.clone_ref(py)),
1064            None => Ok(
1065                PyCatalog::new_from_parts(catalog, Arc::clone(&self.logical_codec))
1066                    .into_py_any(py)?,
1067            ),
1068        }
1069    }
1070
1071    pub fn catalog_names(&self) -> HashSet<String> {
1072        self.ctx.catalog_names().into_iter().collect()
1073    }
1074
1075    pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
1076        let res = wait_for_future(py, self.ctx.table(name))
1077            .map_err(|e| PyKeyError::new_err(e.to_string()))?;
1078        match res {
1079            Ok(df) => Ok(PyDataFrame::new(df)),
1080            Err(e) => {
1081                if let datafusion::error::DataFusionError::Plan(msg) = &e
1082                    && msg.contains("No table named")
1083                {
1084                    return Err(PyKeyError::new_err(msg.to_string()));
1085                }
1086                Err(py_datafusion_err(e))
1087            }
1088        }
1089    }
1090
1091    pub fn table_exist(&self, name: &str) -> PyDataFusionResult<bool> {
1092        Ok(self.ctx.table_exist(name)?)
1093    }
1094
1095    pub fn empty_table(&self) -> PyDataFusionResult<PyDataFrame> {
1096        Ok(PyDataFrame::new(self.ctx.read_empty()?))
1097    }
1098
1099    pub fn session_id(&self) -> String {
1100        self.ctx.session_id()
1101    }
1102
1103    pub fn session_start_time(&self) -> String {
1104        self.ctx.session_start_time().to_rfc3339()
1105    }
1106
1107    pub fn enable_ident_normalization(&self) -> bool {
1108        self.ctx.enable_ident_normalization()
1109    }
1110
1111    pub fn parse_sql_expr(&self, sql: &str, schema: PyDFSchema) -> PyDataFusionResult<PyExpr> {
1112        let df_schema: DFSchema = schema.into();
1113        Ok(self.ctx.parse_sql_expr(sql, &df_schema)?.into())
1114    }
1115
1116    pub fn execute_logical_plan(
1117        &self,
1118        plan: PyLogicalPlan,
1119        py: Python,
1120    ) -> PyDataFusionResult<PyDataFrame> {
1121        let df = wait_for_future(
1122            py,
1123            self.ctx.execute_logical_plan(plan.plan.as_ref().clone()),
1124        )??;
1125        Ok(PyDataFrame::new(df))
1126    }
1127
1128    pub fn refresh_catalogs(&self, py: Python) -> PyDataFusionResult<()> {
1129        wait_for_future(py, self.ctx.refresh_catalogs())??;
1130        Ok(())
1131    }
1132
1133    pub fn remove_optimizer_rule(&self, name: &str) -> bool {
1134        self.ctx.remove_optimizer_rule(name)
1135    }
1136
1137    pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
1138        let provider = wait_for_future(py, self.ctx.table_provider(name))
1139            // Outer error: runtime/async failure
1140            .map_err(|e| PyRuntimeError::new_err(e.to_string()))?
1141            // Inner error: table not found
1142            .map_err(|e| PyKeyError::new_err(e.to_string()))?;
1143        Ok(PyTable { table: provider })
1144    }
1145
1146    #[allow(clippy::too_many_arguments)]
1147    #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))]
1148    pub fn read_json(
1149        &self,
1150        path: PathBuf,
1151        schema: Option<PyArrowType<Schema>>,
1152        schema_infer_max_records: usize,
1153        file_extension: &str,
1154        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1155        file_compression_type: Option<String>,
1156        py: Python,
1157    ) -> PyDataFusionResult<PyDataFrame> {
1158        let path = path
1159            .to_str()
1160            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
1161        let mut options = JsonReadOptions::default()
1162            .table_partition_cols(
1163                table_partition_cols
1164                    .into_iter()
1165                    .map(|(name, ty)| (name, ty.0))
1166                    .collect::<Vec<(String, DataType)>>(),
1167            )
1168            .file_compression_type(parse_file_compression_type(file_compression_type)?);
1169        options.schema_infer_max_records = schema_infer_max_records;
1170        options.file_extension = file_extension;
1171        let df = if let Some(schema) = schema {
1172            options.schema = Some(&schema.0);
1173            let result = self.ctx.read_json(path, options);
1174            wait_for_future(py, result)??
1175        } else {
1176            let result = self.ctx.read_json(path, options);
1177            wait_for_future(py, result)??
1178        };
1179        Ok(PyDataFrame::new(df))
1180    }
1181
1182    #[pyo3(signature = (
1183        path,
1184        options=None))]
1185    pub fn read_csv(
1186        &self,
1187        path: &Bound<'_, PyAny>,
1188        options: Option<&PyCsvReadOptions>,
1189        py: Python,
1190    ) -> PyDataFusionResult<PyDataFrame> {
1191        let options = options
1192            .map(|opts| opts.try_into())
1193            .transpose()?
1194            .unwrap_or_default();
1195
1196        if path.is_instance_of::<PyList>() {
1197            let paths = path.extract::<Vec<String>>()?;
1198            let paths = paths.iter().map(|p| p as &str).collect::<Vec<&str>>();
1199            let result = self.ctx.read_csv(paths, options);
1200            let df = PyDataFrame::new(wait_for_future(py, result)??);
1201            Ok(df)
1202        } else {
1203            let path = path.extract::<String>()?;
1204            let result = self.ctx.read_csv(path, options);
1205            let df = PyDataFrame::new(wait_for_future(py, result)??);
1206            Ok(df)
1207        }
1208    }
1209
1210    #[allow(clippy::too_many_arguments)]
1211    #[pyo3(signature = (
1212        path,
1213        table_partition_cols=vec![],
1214        parquet_pruning=true,
1215        file_extension=".parquet",
1216        skip_metadata=true,
1217        schema=None,
1218        file_sort_order=None))]
1219    pub fn read_parquet(
1220        &self,
1221        path: &str,
1222        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1223        parquet_pruning: bool,
1224        file_extension: &str,
1225        skip_metadata: bool,
1226        schema: Option<PyArrowType<Schema>>,
1227        file_sort_order: Option<Vec<Vec<PySortExpr>>>,
1228        py: Python,
1229    ) -> PyDataFusionResult<PyDataFrame> {
1230        let mut options = ParquetReadOptions::default()
1231            .table_partition_cols(
1232                table_partition_cols
1233                    .into_iter()
1234                    .map(|(name, ty)| (name, ty.0))
1235                    .collect::<Vec<(String, DataType)>>(),
1236            )
1237            .parquet_pruning(parquet_pruning)
1238            .skip_metadata(skip_metadata);
1239        options.file_extension = file_extension;
1240        options.schema = schema.as_ref().map(|x| &x.0);
1241        options.file_sort_order = file_sort_order
1242            .unwrap_or_default()
1243            .into_iter()
1244            .map(|e| e.into_iter().map(|f| f.into()).collect())
1245            .collect();
1246
1247        let result = self.ctx.read_parquet(path, options);
1248        let df = PyDataFrame::new(wait_for_future(py, result)??);
1249        Ok(df)
1250    }
1251
1252    #[allow(clippy::too_many_arguments)]
1253    #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro"))]
1254    pub fn read_avro(
1255        &self,
1256        path: &str,
1257        schema: Option<PyArrowType<Schema>>,
1258        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1259        file_extension: &str,
1260        py: Python,
1261    ) -> PyDataFusionResult<PyDataFrame> {
1262        let mut options = AvroReadOptions::default().table_partition_cols(
1263            table_partition_cols
1264                .into_iter()
1265                .map(|(name, ty)| (name, ty.0))
1266                .collect::<Vec<(String, DataType)>>(),
1267        );
1268        options.file_extension = file_extension;
1269        let df = if let Some(schema) = schema {
1270            options.schema = Some(&schema.0);
1271            let read_future = self.ctx.read_avro(path, options);
1272            wait_for_future(py, read_future)??
1273        } else {
1274            let read_future = self.ctx.read_avro(path, options);
1275            wait_for_future(py, read_future)??
1276        };
1277        Ok(PyDataFrame::new(df))
1278    }
1279
1280    #[pyo3(signature = (path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))]
1281    pub fn read_arrow(
1282        &self,
1283        path: &str,
1284        schema: Option<PyArrowType<Schema>>,
1285        file_extension: &str,
1286        table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1287        py: Python,
1288    ) -> PyDataFusionResult<PyDataFrame> {
1289        let mut options = ArrowReadOptions::default().table_partition_cols(
1290            table_partition_cols
1291                .into_iter()
1292                .map(|(name, ty)| (name, ty.0))
1293                .collect::<Vec<(String, DataType)>>(),
1294        );
1295        options.file_extension = file_extension;
1296        options.schema = schema.as_ref().map(|x| &x.0);
1297
1298        let result = self.ctx.read_arrow(path, options);
1299        let df = wait_for_future(py, result)??;
1300        Ok(PyDataFrame::new(df))
1301    }
1302
1303    pub fn read_table(&self, table: Bound<'_, PyAny>) -> PyDataFusionResult<PyDataFrame> {
1304        let session = self.clone().into_bound_py_any(table.py())?;
1305        let table = PyTable::new(table, Some(session))?;
1306        let df = self.ctx.read_table(table.table())?;
1307        Ok(PyDataFrame::new(df))
1308    }
1309
1310    fn __repr__(&self) -> PyResult<String> {
1311        let config = self.ctx.copied_config();
1312        let mut config_entries = config
1313            .options()
1314            .entries()
1315            .iter()
1316            .filter(|e| e.value.is_some())
1317            .map(|e| format!("{} = {}", e.key, e.value.as_ref().unwrap()))
1318            .collect::<Vec<_>>();
1319        config_entries.sort();
1320        Ok(format!(
1321            "SessionContext: id={}; configs=[\n\t{}]",
1322            self.session_id(),
1323            config_entries.join("\n\t")
1324        ))
1325    }
1326
1327    /// Execute a partition of an execution plan and return a stream of record batches
1328    pub fn execute(
1329        &self,
1330        plan: PyExecutionPlan,
1331        part: usize,
1332        py: Python,
1333    ) -> PyDataFusionResult<PyRecordBatchStream> {
1334        let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1335        let plan = plan.plan.clone();
1336        let stream = spawn_future(py, async move { plan.execute(part, Arc::new(ctx)) })?;
1337        Ok(PyRecordBatchStream::new(stream))
1338    }
1339
1340    pub fn __datafusion_task_context_provider__<'py>(
1341        &self,
1342        py: Python<'py>,
1343    ) -> PyResult<Bound<'py, PyCapsule>> {
1344        let name = cr"datafusion_task_context_provider".into();
1345
1346        let ctx_provider = Arc::clone(&self.ctx) as Arc<dyn TaskContextProvider>;
1347        let ffi_ctx_provider = FFI_TaskContextProvider::from(&ctx_provider);
1348
1349        PyCapsule::new(py, ffi_ctx_provider, Some(name))
1350    }
1351
1352    pub fn __datafusion_logical_extension_codec__<'py>(
1353        &self,
1354        py: Python<'py>,
1355    ) -> PyResult<Bound<'py, PyCapsule>> {
1356        create_logical_extension_capsule(py, self.logical_codec.as_ref())
1357    }
1358
1359    pub fn with_logical_extension_codec<'py>(
1360        &self,
1361        codec: Bound<'py, PyAny>,
1362    ) -> PyDataFusionResult<Self> {
1363        let logical_codec = Arc::new(ffi_logical_codec_from_pycapsule(codec)?);
1364
1365        Ok({
1366            Self {
1367                ctx: Arc::clone(&self.ctx),
1368                logical_codec,
1369            }
1370        })
1371    }
1372}
1373
1374impl PySessionContext {
1375    async fn _table(&self, name: &str) -> datafusion::common::Result<DataFrame> {
1376        self.ctx.table(name).await
1377    }
1378
1379    async fn register_csv_from_multiple_paths(
1380        &self,
1381        name: &str,
1382        table_paths: Vec<String>,
1383        options: CsvReadOptions<'_>,
1384    ) -> datafusion::common::Result<()> {
1385        let table_paths = table_paths.to_urls()?;
1386        let session_config = self.ctx.copied_config();
1387        let listing_options =
1388            options.to_listing_options(&session_config, self.ctx.copied_table_options());
1389
1390        let option_extension = listing_options.file_extension.clone();
1391
1392        if table_paths.is_empty() {
1393            return exec_err!("No table paths were provided");
1394        }
1395
1396        // check if the file extension matches the expected extension
1397        for path in &table_paths {
1398            let file_path = path.as_str();
1399            if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() {
1400                return exec_err!(
1401                    "File path '{file_path}' does not match the expected extension '{option_extension}'"
1402                );
1403            }
1404        }
1405
1406        let resolved_schema = options
1407            .get_resolved_schema(&session_config, self.ctx.state(), table_paths[0].clone())
1408            .await?;
1409
1410        let config = ListingTableConfig::new_with_multi_paths(table_paths)
1411            .with_listing_options(listing_options)
1412            .with_schema(resolved_schema);
1413        let table = ListingTable::try_new(config)?;
1414        self.ctx
1415            .register_table(TableReference::Bare { table: name.into() }, Arc::new(table))?;
1416        Ok(())
1417    }
1418
1419    fn default_logical_codec(ctx: &Arc<SessionContext>) -> Arc<FFI_LogicalExtensionCodec> {
1420        let codec = Arc::new(DefaultLogicalExtensionCodec {});
1421        let runtime = get_tokio_runtime().handle().clone();
1422        let ctx_provider = Arc::clone(ctx) as Arc<dyn TaskContextProvider>;
1423        Arc::new(FFI_LogicalExtensionCodec::new(
1424            codec,
1425            Some(runtime),
1426            &ctx_provider,
1427        ))
1428    }
1429}
1430
1431pub fn parse_file_compression_type(
1432    file_compression_type: Option<String>,
1433) -> Result<FileCompressionType, PyErr> {
1434    FileCompressionType::from_str(&*file_compression_type.unwrap_or("".to_string()).as_str())
1435        .map_err(|_| {
1436            PyValueError::new_err("file_compression_type must one of: gzip, bz2, xz, zstd")
1437        })
1438}
1439
1440impl From<PySessionContext> for SessionContext {
1441    fn from(ctx: PySessionContext) -> SessionContext {
1442        ctx.ctx.as_ref().clone()
1443    }
1444}
1445
1446impl From<SessionContext> for PySessionContext {
1447    fn from(ctx: SessionContext) -> PySessionContext {
1448        let ctx = Arc::new(ctx);
1449        let logical_codec = Self::default_logical_codec(&ctx);
1450
1451        PySessionContext { ctx, logical_codec }
1452    }
1453}