1use std::collections::{HashMap, HashSet};
19use std::path::PathBuf;
20use std::ptr::NonNull;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use arrow::array::RecordBatchReader;
25use arrow::ffi_stream::ArrowArrayStreamReader;
26use arrow::pyarrow::FromPyArrow;
27use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
28use datafusion::arrow::pyarrow::PyArrowType;
29use datafusion::arrow::record_batch::RecordBatch;
30use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory};
31use datafusion::common::{DFSchema, ScalarValue, TableReference, exec_err};
32use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
33use datafusion::datasource::file_format::parquet::ParquetFormat;
34use datafusion::datasource::listing::{
35 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
36};
37use datafusion::datasource::{MemTable, TableProvider};
38use datafusion::execution::TaskContextProvider;
39use datafusion::execution::context::{
40 DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
41};
42use datafusion::execution::disk_manager::DiskManagerMode;
43use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
44use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
45use datafusion::execution::runtime_env::RuntimeEnvBuilder;
46use datafusion::execution::session_state::SessionStateBuilder;
47use datafusion::prelude::{
48 AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
49};
50use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
51use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
52use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
53use datafusion_ffi::execution::FFI_TaskContextProvider;
54use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
55use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
56use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
57use datafusion_python_util::{
58 create_logical_extension_capsule, ffi_logical_codec_from_pycapsule, get_global_ctx,
59 get_tokio_runtime, spawn_future, wait_for_future,
60};
61use object_store::ObjectStore;
62use pyo3::IntoPyObjectExt;
63use pyo3::exceptions::{PyKeyError, PyRuntimeError, PyValueError};
64use pyo3::prelude::*;
65use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple};
66use url::Url;
67use uuid::Uuid;
68
69use crate::catalog::{
70 PyCatalog, PyCatalogList, RustWrappedPyCatalogProvider, RustWrappedPyCatalogProviderList,
71};
72use crate::common::data_type::PyScalarValue;
73use crate::common::df_schema::PyDFSchema;
74use crate::dataframe::PyDataFrame;
75use crate::dataset::Dataset;
76use crate::errors::{
77 PyDataFusionError, PyDataFusionResult, from_datafusion_error, py_datafusion_err,
78};
79use crate::expr::PyExpr;
80use crate::expr::sort_expr::PySortExpr;
81use crate::options::PyCsvReadOptions;
82use crate::physical_plan::PyExecutionPlan;
83use crate::record_batch::PyRecordBatchStream;
84use crate::sql::logical::PyLogicalPlan;
85use crate::sql::util::replace_placeholders_with_strings;
86use crate::store::StorageContexts;
87use crate::table::{PyTable, RustWrappedPyTableProviderFactory};
88use crate::udaf::PyAggregateUDF;
89use crate::udf::PyScalarUDF;
90use crate::udtf::PyTableFunction;
91use crate::udwf::PyWindowUDF;
92
93#[pyclass(
95 from_py_object,
96 frozen,
97 name = "SessionConfig",
98 module = "datafusion",
99 subclass
100)]
101#[derive(Clone, Default)]
102pub struct PySessionConfig {
103 pub config: SessionConfig,
104}
105
106impl From<SessionConfig> for PySessionConfig {
107 fn from(config: SessionConfig) -> Self {
108 Self { config }
109 }
110}
111
112#[pymethods]
113impl PySessionConfig {
114 #[pyo3(signature = (config_options=None))]
115 #[new]
116 fn new(config_options: Option<HashMap<String, String>>) -> Self {
117 let mut config = SessionConfig::new();
118 if let Some(hash_map) = config_options {
119 for (k, v) in &hash_map {
120 config = config.set(k, &ScalarValue::Utf8(Some(v.clone())));
121 }
122 }
123
124 Self { config }
125 }
126
127 fn with_create_default_catalog_and_schema(&self, enabled: bool) -> Self {
128 Self::from(
129 self.config
130 .clone()
131 .with_create_default_catalog_and_schema(enabled),
132 )
133 }
134
135 fn with_default_catalog_and_schema(&self, catalog: &str, schema: &str) -> Self {
136 Self::from(
137 self.config
138 .clone()
139 .with_default_catalog_and_schema(catalog, schema),
140 )
141 }
142
143 fn with_information_schema(&self, enabled: bool) -> Self {
144 Self::from(self.config.clone().with_information_schema(enabled))
145 }
146
147 fn with_batch_size(&self, batch_size: usize) -> Self {
148 Self::from(self.config.clone().with_batch_size(batch_size))
149 }
150
151 fn with_target_partitions(&self, target_partitions: usize) -> Self {
152 Self::from(
153 self.config
154 .clone()
155 .with_target_partitions(target_partitions),
156 )
157 }
158
159 fn with_repartition_aggregations(&self, enabled: bool) -> Self {
160 Self::from(self.config.clone().with_repartition_aggregations(enabled))
161 }
162
163 fn with_repartition_joins(&self, enabled: bool) -> Self {
164 Self::from(self.config.clone().with_repartition_joins(enabled))
165 }
166
167 fn with_repartition_windows(&self, enabled: bool) -> Self {
168 Self::from(self.config.clone().with_repartition_windows(enabled))
169 }
170
171 fn with_repartition_sorts(&self, enabled: bool) -> Self {
172 Self::from(self.config.clone().with_repartition_sorts(enabled))
173 }
174
175 fn with_repartition_file_scans(&self, enabled: bool) -> Self {
176 Self::from(self.config.clone().with_repartition_file_scans(enabled))
177 }
178
179 fn with_repartition_file_min_size(&self, size: usize) -> Self {
180 Self::from(self.config.clone().with_repartition_file_min_size(size))
181 }
182
183 fn with_parquet_pruning(&self, enabled: bool) -> Self {
184 Self::from(self.config.clone().with_parquet_pruning(enabled))
185 }
186
187 fn set(&self, key: &str, value: &str) -> Self {
188 Self::from(self.config.clone().set_str(key, value))
189 }
190
191 pub fn with_extension(&self, extension: Bound<PyAny>) -> PyResult<Self> {
192 if !extension.hasattr("__datafusion_extension_options__")? {
193 return Err(pyo3::exceptions::PyAttributeError::new_err(
194 "Expected extension object to define __datafusion_extension_options__()",
195 ));
196 }
197 let capsule = extension.call_method0("__datafusion_extension_options__")?;
198 let capsule = capsule.cast::<PyCapsule>()?;
199
200 let extension: NonNull<FFI_ExtensionOptions> = capsule
201 .pointer_checked(Some(c"datafusion_extension_options"))?
202 .cast();
203 let mut extension = unsafe { extension.as_ref() }.clone();
204
205 let mut config = self.config.clone();
206 let options = config.options_mut();
207 if let Some(prior_extension) = options.extensions.get::<FFI_ExtensionOptions>() {
208 extension
209 .merge(prior_extension)
210 .map_err(py_datafusion_err)?;
211 }
212
213 options.extensions.insert(extension);
214
215 Ok(Self::from(config))
216 }
217}
218
219#[pyclass(
221 from_py_object,
222 frozen,
223 name = "RuntimeEnvBuilder",
224 module = "datafusion",
225 subclass
226)]
227#[derive(Clone)]
228pub struct PyRuntimeEnvBuilder {
229 pub builder: RuntimeEnvBuilder,
230}
231
232#[pymethods]
233impl PyRuntimeEnvBuilder {
234 #[new]
235 fn new() -> Self {
236 Self {
237 builder: RuntimeEnvBuilder::default(),
238 }
239 }
240
241 fn with_disk_manager_disabled(&self) -> Self {
242 let mut runtime_builder = self.builder.clone();
243
244 let mut disk_mgr_builder = runtime_builder
245 .disk_manager_builder
246 .clone()
247 .unwrap_or_default();
248 disk_mgr_builder.set_mode(DiskManagerMode::Disabled);
249
250 runtime_builder = runtime_builder.with_disk_manager_builder(disk_mgr_builder);
251 Self {
252 builder: runtime_builder,
253 }
254 }
255
256 fn with_disk_manager_os(&self) -> Self {
257 let mut runtime_builder = self.builder.clone();
258
259 let mut disk_mgr_builder = runtime_builder
260 .disk_manager_builder
261 .clone()
262 .unwrap_or_default();
263 disk_mgr_builder.set_mode(DiskManagerMode::OsTmpDirectory);
264
265 runtime_builder = runtime_builder.with_disk_manager_builder(disk_mgr_builder);
266 Self {
267 builder: runtime_builder,
268 }
269 }
270
271 fn with_disk_manager_specified(&self, paths: Vec<String>) -> Self {
272 let paths = paths.iter().map(|s| s.into()).collect();
273 let mut runtime_builder = self.builder.clone();
274
275 let mut disk_mgr_builder = runtime_builder
276 .disk_manager_builder
277 .clone()
278 .unwrap_or_default();
279 disk_mgr_builder.set_mode(DiskManagerMode::Directories(paths));
280
281 runtime_builder = runtime_builder.with_disk_manager_builder(disk_mgr_builder);
282 Self {
283 builder: runtime_builder,
284 }
285 }
286
287 fn with_unbounded_memory_pool(&self) -> Self {
288 let builder = self.builder.clone();
289 let builder = builder.with_memory_pool(Arc::new(UnboundedMemoryPool::default()));
290 Self { builder }
291 }
292
293 fn with_fair_spill_pool(&self, size: usize) -> Self {
294 let builder = self.builder.clone();
295 let builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(size)));
296 Self { builder }
297 }
298
299 fn with_greedy_memory_pool(&self, size: usize) -> Self {
300 let builder = self.builder.clone();
301 let builder = builder.with_memory_pool(Arc::new(GreedyMemoryPool::new(size)));
302 Self { builder }
303 }
304
305 fn with_temp_file_path(&self, path: &str) -> Self {
306 let builder = self.builder.clone();
307 let builder = builder.with_temp_file_path(path);
308 Self { builder }
309 }
310}
311
312#[pyclass(
314 from_py_object,
315 frozen,
316 name = "SQLOptions",
317 module = "datafusion",
318 subclass
319)]
320#[derive(Clone)]
321pub struct PySQLOptions {
322 pub options: SQLOptions,
323}
324
325impl From<SQLOptions> for PySQLOptions {
326 fn from(options: SQLOptions) -> Self {
327 Self { options }
328 }
329}
330
331#[pymethods]
332impl PySQLOptions {
333 #[new]
334 fn new() -> Self {
335 let options = SQLOptions::new();
336 Self { options }
337 }
338
339 fn with_allow_ddl(&self, allow: bool) -> Self {
341 Self::from(self.options.with_allow_ddl(allow))
342 }
343
344 pub fn with_allow_dml(&self, allow: bool) -> Self {
346 Self::from(self.options.with_allow_dml(allow))
347 }
348
349 pub fn with_allow_statements(&self, allow: bool) -> Self {
351 Self::from(self.options.with_allow_statements(allow))
352 }
353}
354
355#[pyclass(
359 from_py_object,
360 frozen,
361 name = "SessionContext",
362 module = "datafusion",
363 subclass
364)]
365#[derive(Clone)]
366pub struct PySessionContext {
367 pub ctx: Arc<SessionContext>,
368 logical_codec: Arc<FFI_LogicalExtensionCodec>,
369}
370
371#[pymethods]
372impl PySessionContext {
373 #[pyo3(signature = (config=None, runtime=None))]
374 #[new]
375 pub fn new(
376 config: Option<PySessionConfig>,
377 runtime: Option<PyRuntimeEnvBuilder>,
378 ) -> PyDataFusionResult<Self> {
379 let config = if let Some(c) = config {
380 c.config
381 } else {
382 SessionConfig::default().with_information_schema(true)
383 };
384 let runtime_env_builder = if let Some(c) = runtime {
385 c.builder
386 } else {
387 RuntimeEnvBuilder::default()
388 };
389 let runtime = Arc::new(runtime_env_builder.build()?);
390 let session_state = SessionStateBuilder::new()
391 .with_config(config)
392 .with_runtime_env(runtime)
393 .with_default_features()
394 .build();
395 let ctx = Arc::new(SessionContext::new_with_state(session_state));
396 let logical_codec = Self::default_logical_codec(&ctx);
397 Ok(PySessionContext { ctx, logical_codec })
398 }
399
400 pub fn enable_url_table(&self) -> PyResult<Self> {
401 Ok(PySessionContext {
402 ctx: Arc::new(self.ctx.as_ref().clone().enable_url_table()),
403 logical_codec: Arc::clone(&self.logical_codec),
404 })
405 }
406
407 #[staticmethod]
408 #[pyo3(signature = ())]
409 pub fn global_ctx() -> PyResult<Self> {
410 let ctx = get_global_ctx().clone();
411 let logical_codec = Self::default_logical_codec(&ctx);
412 Ok(Self { ctx, logical_codec })
413 }
414
415 #[pyo3(signature = (scheme, store, host=None))]
417 pub fn register_object_store(
418 &self,
419 scheme: &str,
420 store: StorageContexts,
421 host: Option<&str>,
422 ) -> PyResult<()> {
423 let (store, upstream_host): (Arc<dyn ObjectStore>, String) = match store {
425 StorageContexts::AmazonS3(s3) => (s3.inner, s3.bucket_name),
426 StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name),
427 StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name),
428 StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()),
429 StorageContexts::HTTP(http) => (http.store, http.url),
430 };
431
432 let derived_host = if let Some(host) = host {
434 host
435 } else {
436 &upstream_host
437 };
438 let url_string = format!("{scheme}{derived_host}");
439 let url = Url::parse(&url_string).map_err(|e| PyValueError::new_err(e.to_string()))?;
440 self.ctx.runtime_env().register_object_store(&url, store);
441 Ok(())
442 }
443
444 #[pyo3(signature = (scheme, host=None))]
446 pub fn deregister_object_store(
447 &self,
448 scheme: &str,
449 host: Option<&str>,
450 ) -> PyDataFusionResult<()> {
451 let host = host.unwrap_or("");
452 let url_string = format!("{scheme}{host}");
453 let url = Url::parse(&url_string).map_err(|e| PyDataFusionError::Common(e.to_string()))?;
454 self.ctx.runtime_env().deregister_object_store(&url)?;
455 Ok(())
456 }
457
458 #[allow(clippy::too_many_arguments)]
459 #[pyo3(signature = (name, path, table_partition_cols=vec![],
460 file_extension=".parquet",
461 schema=None,
462 file_sort_order=None))]
463 pub fn register_listing_table(
464 &self,
465 name: &str,
466 path: &str,
467 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
468 file_extension: &str,
469 schema: Option<PyArrowType<Schema>>,
470 file_sort_order: Option<Vec<Vec<PySortExpr>>>,
471 py: Python,
472 ) -> PyDataFusionResult<()> {
473 let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
474 .with_file_extension(file_extension)
475 .with_table_partition_cols(
476 table_partition_cols
477 .into_iter()
478 .map(|(name, ty)| (name, ty.0))
479 .collect::<Vec<(String, DataType)>>(),
480 )
481 .with_file_sort_order(
482 file_sort_order
483 .unwrap_or_default()
484 .into_iter()
485 .map(|e| e.into_iter().map(|f| f.into()).collect())
486 .collect(),
487 );
488 let table_path = ListingTableUrl::parse(path)?;
489 let resolved_schema: SchemaRef = match schema {
490 Some(s) => Arc::new(s.0),
491 None => {
492 let state = self.ctx.state();
493 let schema = options.infer_schema(&state, &table_path);
494 wait_for_future(py, schema)??
495 }
496 };
497 let config = ListingTableConfig::new(table_path)
498 .with_listing_options(options)
499 .with_schema(resolved_schema);
500 let table = ListingTable::try_new(config)?;
501 self.ctx.register_table(name, Arc::new(table))?;
502 Ok(())
503 }
504
505 pub fn register_udtf(&self, func: PyTableFunction) {
506 let name = func.name.clone();
507 let func = Arc::new(func);
508 self.ctx.register_udtf(&name, func);
509 }
510
511 pub fn deregister_udtf(&self, name: &str) {
512 self.ctx.deregister_udtf(name);
513 }
514
515 #[pyo3(signature = (query, options=None, param_values=HashMap::default(), param_strings=HashMap::default()))]
516 pub fn sql_with_options(
517 &self,
518 py: Python,
519 mut query: String,
520 options: Option<PySQLOptions>,
521 param_values: HashMap<String, PyScalarValue>,
522 param_strings: HashMap<String, String>,
523 ) -> PyDataFusionResult<PyDataFrame> {
524 let options = if let Some(options) = options {
525 options.options
526 } else {
527 SQLOptions::new()
528 };
529
530 let param_values = param_values
531 .into_iter()
532 .map(|(name, value)| (name, ScalarValue::from(value)))
533 .collect::<HashMap<_, _>>();
534
535 let state = self.ctx.state();
536 let dialect = state.config().options().sql_parser.dialect.as_ref();
537
538 if !param_strings.is_empty() {
539 query = replace_placeholders_with_strings(&query, dialect, param_strings)?;
540 }
541
542 let mut df = wait_for_future(py, async {
543 self.ctx.sql_with_options(&query, options).await
544 })?
545 .map_err(from_datafusion_error)?;
546
547 if !param_values.is_empty() {
548 df = df.with_param_values(param_values)?;
549 }
550
551 Ok(PyDataFrame::new(df))
552 }
553
554 #[pyo3(signature = (partitions, name=None, schema=None))]
555 pub fn create_dataframe(
556 &self,
557 partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
558 name: Option<&str>,
559 schema: Option<PyArrowType<Schema>>,
560 py: Python,
561 ) -> PyDataFusionResult<PyDataFrame> {
562 let schema = if let Some(schema) = schema {
563 SchemaRef::from(schema.0)
564 } else {
565 partitions.0[0][0].schema()
566 };
567
568 let table = MemTable::try_new(schema, partitions.0)?;
569
570 let table_name = match name {
573 Some(val) => val.to_owned(),
574 None => {
575 "c".to_owned()
576 + Uuid::new_v4()
577 .simple()
578 .encode_lower(&mut Uuid::encode_buffer())
579 }
580 };
581
582 self.ctx.register_table(&*table_name, Arc::new(table))?;
583
584 let table = wait_for_future(py, self._table(&table_name))??;
585
586 let df = PyDataFrame::new(table);
587 Ok(df)
588 }
589
590 pub fn create_dataframe_from_logical_plan(&self, plan: PyLogicalPlan) -> PyDataFrame {
592 PyDataFrame::new(DataFrame::new(self.ctx.state(), plan.plan.as_ref().clone()))
593 }
594
595 #[pyo3(signature = (data, name=None))]
597 pub fn from_pylist(
598 &self,
599 data: Bound<'_, PyList>,
600 name: Option<&str>,
601 ) -> PyResult<PyDataFrame> {
602 let py = data.py();
604
605 let table_class = py.import("pyarrow")?.getattr("Table")?;
607 let args = PyTuple::new(py, &[data])?;
608 let table = table_class.call_method1("from_pylist", args)?;
609
610 let df = self.from_arrow(table, name, py)?;
612 Ok(df)
613 }
614
615 #[pyo3(signature = (data, name=None))]
617 pub fn from_pydict(
618 &self,
619 data: Bound<'_, PyDict>,
620 name: Option<&str>,
621 ) -> PyResult<PyDataFrame> {
622 let py = data.py();
624
625 let table_class = py.import("pyarrow")?.getattr("Table")?;
627 let args = PyTuple::new(py, &[data])?;
628 let table = table_class.call_method1("from_pydict", args)?;
629
630 let df = self.from_arrow(table, name, py)?;
632 Ok(df)
633 }
634
635 #[pyo3(signature = (data, name=None))]
637 pub fn from_arrow(
638 &self,
639 data: Bound<'_, PyAny>,
640 name: Option<&str>,
641 py: Python,
642 ) -> PyDataFusionResult<PyDataFrame> {
643 let (schema, batches) =
644 if let Ok(stream_reader) = ArrowArrayStreamReader::from_pyarrow_bound(&data) {
645 let schema = stream_reader.schema().as_ref().to_owned();
648 let batches = stream_reader
649 .collect::<Result<Vec<RecordBatch>, arrow::error::ArrowError>>()?;
650
651 (schema, batches)
652 } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
653 (array.schema().as_ref().to_owned(), vec![array])
657 } else {
658 return Err(PyDataFusionError::Common(
659 "Expected either a Arrow Array or Arrow Stream in from_arrow().".to_string(),
660 ));
661 };
662
663 let list_of_batches = PyArrowType::from(vec![batches]);
666 self.create_dataframe(list_of_batches, name, Some(schema.into()), py)
667 }
668
669 #[allow(clippy::wrong_self_convention)]
671 #[pyo3(signature = (data, name=None))]
672 pub fn from_pandas(&self, data: Bound<'_, PyAny>, name: Option<&str>) -> PyResult<PyDataFrame> {
673 let py = data.py();
675
676 let table_class = py.import("pyarrow")?.getattr("Table")?;
678 let args = PyTuple::new(py, &[data])?;
679 let table = table_class.call_method1("from_pandas", args)?;
680
681 let df = self.from_arrow(table, name, py)?;
683 Ok(df)
684 }
685
686 #[pyo3(signature = (data, name=None))]
688 pub fn from_polars(&self, data: Bound<'_, PyAny>, name: Option<&str>) -> PyResult<PyDataFrame> {
689 let table = data.call_method0("to_arrow")?;
691
692 let df = self.from_arrow(table, name, data.py())?;
694 Ok(df)
695 }
696
697 pub fn register_table(&self, name: &str, table: Bound<'_, PyAny>) -> PyDataFusionResult<()> {
698 let session = self.clone().into_bound_py_any(table.py())?;
699 let table = PyTable::new(table, Some(session))?;
700
701 self.ctx.register_table(name, table.table)?;
702 Ok(())
703 }
704
705 pub fn deregister_table(&self, name: &str) -> PyDataFusionResult<()> {
706 self.ctx.deregister_table(name)?;
707 Ok(())
708 }
709
710 pub fn register_table_factory(
711 &self,
712 format: &str,
713 mut factory: Bound<'_, PyAny>,
714 ) -> PyDataFusionResult<()> {
715 if factory.hasattr("__datafusion_table_provider_factory__")? {
716 let py = factory.py();
717 let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
718 factory = factory
719 .getattr("__datafusion_table_provider_factory__")?
720 .call1((codec_capsule,))?;
721 }
722
723 let factory: Arc<dyn TableProviderFactory> =
724 if let Ok(capsule) = factory.cast::<PyCapsule>().map_err(py_datafusion_err) {
725 let data: NonNull<FFI_TableProviderFactory> = capsule
726 .pointer_checked(Some(c"datafusion_table_provider_factory"))?
727 .cast();
728 let factory = unsafe { data.as_ref() };
729 factory.into()
730 } else {
731 Arc::new(RustWrappedPyTableProviderFactory::new(
732 factory.into(),
733 self.logical_codec.clone(),
734 ))
735 };
736
737 let st = self.ctx.state_ref();
738 let mut lock = st.write();
739 lock.table_factories_mut()
740 .insert(format.to_owned(), factory);
741
742 Ok(())
743 }
744
745 pub fn register_catalog_provider_list(
746 &self,
747 mut provider: Bound<PyAny>,
748 ) -> PyDataFusionResult<()> {
749 if provider.hasattr("__datafusion_catalog_provider_list__")? {
750 let py = provider.py();
751 let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
752 provider = provider
753 .getattr("__datafusion_catalog_provider_list__")?
754 .call1((codec_capsule,))?;
755 }
756
757 let provider = if let Ok(capsule) = provider.cast::<PyCapsule>() {
758 let data: NonNull<FFI_CatalogProviderList> = capsule
759 .pointer_checked(Some(c"datafusion_catalog_provider_list"))?
760 .cast();
761 let provider = unsafe { data.as_ref() };
762 let provider: Arc<dyn CatalogProviderList + Send> = provider.into();
763 provider as Arc<dyn CatalogProviderList>
764 } else {
765 match provider.extract::<PyCatalogList>() {
766 Ok(py_catalog_list) => py_catalog_list.catalog_list,
767 Err(_) => Arc::new(RustWrappedPyCatalogProviderList::new(
768 provider.into(),
769 Arc::clone(&self.logical_codec),
770 )) as Arc<dyn CatalogProviderList>,
771 }
772 };
773
774 self.ctx.register_catalog_list(provider);
775
776 Ok(())
777 }
778
779 pub fn register_catalog_provider(
780 &self,
781 name: &str,
782 mut provider: Bound<'_, PyAny>,
783 ) -> PyDataFusionResult<()> {
784 if provider.hasattr("__datafusion_catalog_provider__")? {
785 let py = provider.py();
786 let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
787 provider = provider
788 .getattr("__datafusion_catalog_provider__")?
789 .call1((codec_capsule,))?;
790 }
791
792 let provider = if let Ok(capsule) = provider.cast::<PyCapsule>() {
793 let data: NonNull<FFI_CatalogProvider> = capsule
794 .pointer_checked(Some(c"datafusion_catalog_provider"))?
795 .cast();
796 let provider = unsafe { data.as_ref() };
797 let provider: Arc<dyn CatalogProvider + Send> = provider.into();
798 provider as Arc<dyn CatalogProvider>
799 } else {
800 match provider.extract::<PyCatalog>() {
801 Ok(py_catalog) => py_catalog.catalog,
802 Err(_) => Arc::new(RustWrappedPyCatalogProvider::new(
803 provider.into(),
804 Arc::clone(&self.logical_codec),
805 )) as Arc<dyn CatalogProvider>,
806 }
807 };
808
809 let _ = self.ctx.register_catalog(name, provider);
810
811 Ok(())
812 }
813
814 pub fn register_table_provider(
816 &self,
817 name: &str,
818 provider: Bound<'_, PyAny>,
819 ) -> PyDataFusionResult<()> {
820 self.register_table(name, provider)
822 }
823
824 pub fn register_record_batches(
825 &self,
826 name: &str,
827 partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
828 ) -> PyDataFusionResult<()> {
829 let schema = partitions.0[0][0].schema();
830 let table = MemTable::try_new(schema, partitions.0)?;
831 self.ctx.register_table(name, Arc::new(table))?;
832 Ok(())
833 }
834
835 #[allow(clippy::too_many_arguments)]
836 #[pyo3(signature = (name, path, table_partition_cols=vec![],
837 parquet_pruning=true,
838 file_extension=".parquet",
839 skip_metadata=true,
840 schema=None,
841 file_sort_order=None))]
842 pub fn register_parquet(
843 &self,
844 name: &str,
845 path: &str,
846 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
847 parquet_pruning: bool,
848 file_extension: &str,
849 skip_metadata: bool,
850 schema: Option<PyArrowType<Schema>>,
851 file_sort_order: Option<Vec<Vec<PySortExpr>>>,
852 py: Python,
853 ) -> PyDataFusionResult<()> {
854 let mut options = ParquetReadOptions::default()
855 .table_partition_cols(
856 table_partition_cols
857 .into_iter()
858 .map(|(name, ty)| (name, ty.0))
859 .collect::<Vec<(String, DataType)>>(),
860 )
861 .parquet_pruning(parquet_pruning)
862 .skip_metadata(skip_metadata);
863 options.file_extension = file_extension;
864 options.schema = schema.as_ref().map(|x| &x.0);
865 options.file_sort_order = file_sort_order
866 .unwrap_or_default()
867 .into_iter()
868 .map(|e| e.into_iter().map(|f| f.into()).collect())
869 .collect();
870
871 let result = self.ctx.register_parquet(name, path, options);
872 wait_for_future(py, result)??;
873 Ok(())
874 }
875
876 #[pyo3(signature = (name,
877 path,
878 options=None))]
879 pub fn register_csv(
880 &self,
881 name: &str,
882 path: &Bound<'_, PyAny>,
883 options: Option<&PyCsvReadOptions>,
884 py: Python,
885 ) -> PyDataFusionResult<()> {
886 let options = options
887 .map(|opts| opts.try_into())
888 .transpose()?
889 .unwrap_or_default();
890
891 if path.is_instance_of::<PyList>() {
892 let paths = path.extract::<Vec<String>>()?;
893 let result = self.register_csv_from_multiple_paths(name, paths, options);
894 wait_for_future(py, result)??;
895 } else {
896 let path = path.extract::<String>()?;
897 let result = self.ctx.register_csv(name, &path, options);
898 wait_for_future(py, result)??;
899 }
900
901 Ok(())
902 }
903
904 #[allow(clippy::too_many_arguments)]
905 #[pyo3(signature = (name,
906 path,
907 schema=None,
908 schema_infer_max_records=1000,
909 file_extension=".json",
910 table_partition_cols=vec![],
911 file_compression_type=None))]
912 pub fn register_json(
913 &self,
914 name: &str,
915 path: PathBuf,
916 schema: Option<PyArrowType<Schema>>,
917 schema_infer_max_records: usize,
918 file_extension: &str,
919 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
920 file_compression_type: Option<String>,
921 py: Python,
922 ) -> PyDataFusionResult<()> {
923 let path = path
924 .to_str()
925 .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
926
927 let mut options = JsonReadOptions::default()
928 .file_compression_type(parse_file_compression_type(file_compression_type)?)
929 .table_partition_cols(
930 table_partition_cols
931 .into_iter()
932 .map(|(name, ty)| (name, ty.0))
933 .collect::<Vec<(String, DataType)>>(),
934 );
935 options.schema_infer_max_records = schema_infer_max_records;
936 options.file_extension = file_extension;
937 options.schema = schema.as_ref().map(|x| &x.0);
938
939 let result = self.ctx.register_json(name, path, options);
940 wait_for_future(py, result)??;
941
942 Ok(())
943 }
944
945 #[allow(clippy::too_many_arguments)]
946 #[pyo3(signature = (name,
947 path,
948 schema=None,
949 file_extension=".avro",
950 table_partition_cols=vec![]))]
951 pub fn register_avro(
952 &self,
953 name: &str,
954 path: PathBuf,
955 schema: Option<PyArrowType<Schema>>,
956 file_extension: &str,
957 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
958 py: Python,
959 ) -> PyDataFusionResult<()> {
960 let path = path
961 .to_str()
962 .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
963
964 let mut options = AvroReadOptions::default().table_partition_cols(
965 table_partition_cols
966 .into_iter()
967 .map(|(name, ty)| (name, ty.0))
968 .collect::<Vec<(String, DataType)>>(),
969 );
970 options.file_extension = file_extension;
971 options.schema = schema.as_ref().map(|x| &x.0);
972
973 let result = self.ctx.register_avro(name, path, options);
974 wait_for_future(py, result)??;
975
976 Ok(())
977 }
978
979 #[pyo3(signature = (name, path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))]
980 pub fn register_arrow(
981 &self,
982 name: &str,
983 path: &str,
984 schema: Option<PyArrowType<Schema>>,
985 file_extension: &str,
986 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
987 py: Python,
988 ) -> PyDataFusionResult<()> {
989 let mut options = ArrowReadOptions::default().table_partition_cols(
990 table_partition_cols
991 .into_iter()
992 .map(|(name, ty)| (name, ty.0))
993 .collect::<Vec<(String, DataType)>>(),
994 );
995 options.file_extension = file_extension;
996 options.schema = schema.as_ref().map(|x| &x.0);
997
998 let result = self.ctx.register_arrow(name, path, options);
999 wait_for_future(py, result)??;
1000 Ok(())
1001 }
1002
1003 pub fn register_batch(
1004 &self,
1005 name: &str,
1006 batch: PyArrowType<RecordBatch>,
1007 ) -> PyDataFusionResult<()> {
1008 self.ctx.register_batch(name, batch.0)?;
1009 Ok(())
1010 }
1011
1012 pub fn register_dataset(
1014 &self,
1015 name: &str,
1016 dataset: &Bound<'_, PyAny>,
1017 py: Python,
1018 ) -> PyDataFusionResult<()> {
1019 let table: Arc<dyn TableProvider> = Arc::new(Dataset::new(dataset, py)?);
1020
1021 self.ctx.register_table(name, table)?;
1022
1023 Ok(())
1024 }
1025
1026 pub fn register_udf(&self, udf: PyScalarUDF) -> PyResult<()> {
1027 self.ctx.register_udf(udf.function);
1028 Ok(())
1029 }
1030
1031 pub fn deregister_udf(&self, name: &str) {
1032 self.ctx.deregister_udf(name);
1033 }
1034
1035 pub fn register_udaf(&self, udaf: PyAggregateUDF) -> PyResult<()> {
1036 self.ctx.register_udaf(udaf.function);
1037 Ok(())
1038 }
1039
1040 pub fn deregister_udaf(&self, name: &str) {
1041 self.ctx.deregister_udaf(name);
1042 }
1043
1044 pub fn register_udwf(&self, udwf: PyWindowUDF) -> PyResult<()> {
1045 self.ctx.register_udwf(udwf.function);
1046 Ok(())
1047 }
1048
1049 pub fn deregister_udwf(&self, name: &str) {
1050 self.ctx.deregister_udwf(name);
1051 }
1052
1053 #[pyo3(signature = (name="datafusion"))]
1054 pub fn catalog(&self, py: Python, name: &str) -> PyResult<Py<PyAny>> {
1055 let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(
1056 "Catalog with name {name} doesn't exist."
1057 )))?;
1058
1059 match catalog
1060 .as_any()
1061 .downcast_ref::<RustWrappedPyCatalogProvider>()
1062 {
1063 Some(wrapped_schema) => Ok(wrapped_schema.catalog_provider.clone_ref(py)),
1064 None => Ok(
1065 PyCatalog::new_from_parts(catalog, Arc::clone(&self.logical_codec))
1066 .into_py_any(py)?,
1067 ),
1068 }
1069 }
1070
1071 pub fn catalog_names(&self) -> HashSet<String> {
1072 self.ctx.catalog_names().into_iter().collect()
1073 }
1074
1075 pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
1076 let res = wait_for_future(py, self.ctx.table(name))
1077 .map_err(|e| PyKeyError::new_err(e.to_string()))?;
1078 match res {
1079 Ok(df) => Ok(PyDataFrame::new(df)),
1080 Err(e) => {
1081 if let datafusion::error::DataFusionError::Plan(msg) = &e
1082 && msg.contains("No table named")
1083 {
1084 return Err(PyKeyError::new_err(msg.to_string()));
1085 }
1086 Err(py_datafusion_err(e))
1087 }
1088 }
1089 }
1090
1091 pub fn table_exist(&self, name: &str) -> PyDataFusionResult<bool> {
1092 Ok(self.ctx.table_exist(name)?)
1093 }
1094
1095 pub fn empty_table(&self) -> PyDataFusionResult<PyDataFrame> {
1096 Ok(PyDataFrame::new(self.ctx.read_empty()?))
1097 }
1098
1099 pub fn session_id(&self) -> String {
1100 self.ctx.session_id()
1101 }
1102
1103 pub fn session_start_time(&self) -> String {
1104 self.ctx.session_start_time().to_rfc3339()
1105 }
1106
1107 pub fn enable_ident_normalization(&self) -> bool {
1108 self.ctx.enable_ident_normalization()
1109 }
1110
1111 pub fn parse_sql_expr(&self, sql: &str, schema: PyDFSchema) -> PyDataFusionResult<PyExpr> {
1112 let df_schema: DFSchema = schema.into();
1113 Ok(self.ctx.parse_sql_expr(sql, &df_schema)?.into())
1114 }
1115
1116 pub fn execute_logical_plan(
1117 &self,
1118 plan: PyLogicalPlan,
1119 py: Python,
1120 ) -> PyDataFusionResult<PyDataFrame> {
1121 let df = wait_for_future(
1122 py,
1123 self.ctx.execute_logical_plan(plan.plan.as_ref().clone()),
1124 )??;
1125 Ok(PyDataFrame::new(df))
1126 }
1127
1128 pub fn refresh_catalogs(&self, py: Python) -> PyDataFusionResult<()> {
1129 wait_for_future(py, self.ctx.refresh_catalogs())??;
1130 Ok(())
1131 }
1132
1133 pub fn remove_optimizer_rule(&self, name: &str) -> bool {
1134 self.ctx.remove_optimizer_rule(name)
1135 }
1136
1137 pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
1138 let provider = wait_for_future(py, self.ctx.table_provider(name))
1139 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?
1141 .map_err(|e| PyKeyError::new_err(e.to_string()))?;
1143 Ok(PyTable { table: provider })
1144 }
1145
1146 #[allow(clippy::too_many_arguments)]
1147 #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))]
1148 pub fn read_json(
1149 &self,
1150 path: PathBuf,
1151 schema: Option<PyArrowType<Schema>>,
1152 schema_infer_max_records: usize,
1153 file_extension: &str,
1154 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1155 file_compression_type: Option<String>,
1156 py: Python,
1157 ) -> PyDataFusionResult<PyDataFrame> {
1158 let path = path
1159 .to_str()
1160 .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
1161 let mut options = JsonReadOptions::default()
1162 .table_partition_cols(
1163 table_partition_cols
1164 .into_iter()
1165 .map(|(name, ty)| (name, ty.0))
1166 .collect::<Vec<(String, DataType)>>(),
1167 )
1168 .file_compression_type(parse_file_compression_type(file_compression_type)?);
1169 options.schema_infer_max_records = schema_infer_max_records;
1170 options.file_extension = file_extension;
1171 let df = if let Some(schema) = schema {
1172 options.schema = Some(&schema.0);
1173 let result = self.ctx.read_json(path, options);
1174 wait_for_future(py, result)??
1175 } else {
1176 let result = self.ctx.read_json(path, options);
1177 wait_for_future(py, result)??
1178 };
1179 Ok(PyDataFrame::new(df))
1180 }
1181
1182 #[pyo3(signature = (
1183 path,
1184 options=None))]
1185 pub fn read_csv(
1186 &self,
1187 path: &Bound<'_, PyAny>,
1188 options: Option<&PyCsvReadOptions>,
1189 py: Python,
1190 ) -> PyDataFusionResult<PyDataFrame> {
1191 let options = options
1192 .map(|opts| opts.try_into())
1193 .transpose()?
1194 .unwrap_or_default();
1195
1196 if path.is_instance_of::<PyList>() {
1197 let paths = path.extract::<Vec<String>>()?;
1198 let paths = paths.iter().map(|p| p as &str).collect::<Vec<&str>>();
1199 let result = self.ctx.read_csv(paths, options);
1200 let df = PyDataFrame::new(wait_for_future(py, result)??);
1201 Ok(df)
1202 } else {
1203 let path = path.extract::<String>()?;
1204 let result = self.ctx.read_csv(path, options);
1205 let df = PyDataFrame::new(wait_for_future(py, result)??);
1206 Ok(df)
1207 }
1208 }
1209
1210 #[allow(clippy::too_many_arguments)]
1211 #[pyo3(signature = (
1212 path,
1213 table_partition_cols=vec![],
1214 parquet_pruning=true,
1215 file_extension=".parquet",
1216 skip_metadata=true,
1217 schema=None,
1218 file_sort_order=None))]
1219 pub fn read_parquet(
1220 &self,
1221 path: &str,
1222 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1223 parquet_pruning: bool,
1224 file_extension: &str,
1225 skip_metadata: bool,
1226 schema: Option<PyArrowType<Schema>>,
1227 file_sort_order: Option<Vec<Vec<PySortExpr>>>,
1228 py: Python,
1229 ) -> PyDataFusionResult<PyDataFrame> {
1230 let mut options = ParquetReadOptions::default()
1231 .table_partition_cols(
1232 table_partition_cols
1233 .into_iter()
1234 .map(|(name, ty)| (name, ty.0))
1235 .collect::<Vec<(String, DataType)>>(),
1236 )
1237 .parquet_pruning(parquet_pruning)
1238 .skip_metadata(skip_metadata);
1239 options.file_extension = file_extension;
1240 options.schema = schema.as_ref().map(|x| &x.0);
1241 options.file_sort_order = file_sort_order
1242 .unwrap_or_default()
1243 .into_iter()
1244 .map(|e| e.into_iter().map(|f| f.into()).collect())
1245 .collect();
1246
1247 let result = self.ctx.read_parquet(path, options);
1248 let df = PyDataFrame::new(wait_for_future(py, result)??);
1249 Ok(df)
1250 }
1251
1252 #[allow(clippy::too_many_arguments)]
1253 #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro"))]
1254 pub fn read_avro(
1255 &self,
1256 path: &str,
1257 schema: Option<PyArrowType<Schema>>,
1258 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1259 file_extension: &str,
1260 py: Python,
1261 ) -> PyDataFusionResult<PyDataFrame> {
1262 let mut options = AvroReadOptions::default().table_partition_cols(
1263 table_partition_cols
1264 .into_iter()
1265 .map(|(name, ty)| (name, ty.0))
1266 .collect::<Vec<(String, DataType)>>(),
1267 );
1268 options.file_extension = file_extension;
1269 let df = if let Some(schema) = schema {
1270 options.schema = Some(&schema.0);
1271 let read_future = self.ctx.read_avro(path, options);
1272 wait_for_future(py, read_future)??
1273 } else {
1274 let read_future = self.ctx.read_avro(path, options);
1275 wait_for_future(py, read_future)??
1276 };
1277 Ok(PyDataFrame::new(df))
1278 }
1279
1280 #[pyo3(signature = (path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))]
1281 pub fn read_arrow(
1282 &self,
1283 path: &str,
1284 schema: Option<PyArrowType<Schema>>,
1285 file_extension: &str,
1286 table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
1287 py: Python,
1288 ) -> PyDataFusionResult<PyDataFrame> {
1289 let mut options = ArrowReadOptions::default().table_partition_cols(
1290 table_partition_cols
1291 .into_iter()
1292 .map(|(name, ty)| (name, ty.0))
1293 .collect::<Vec<(String, DataType)>>(),
1294 );
1295 options.file_extension = file_extension;
1296 options.schema = schema.as_ref().map(|x| &x.0);
1297
1298 let result = self.ctx.read_arrow(path, options);
1299 let df = wait_for_future(py, result)??;
1300 Ok(PyDataFrame::new(df))
1301 }
1302
1303 pub fn read_table(&self, table: Bound<'_, PyAny>) -> PyDataFusionResult<PyDataFrame> {
1304 let session = self.clone().into_bound_py_any(table.py())?;
1305 let table = PyTable::new(table, Some(session))?;
1306 let df = self.ctx.read_table(table.table())?;
1307 Ok(PyDataFrame::new(df))
1308 }
1309
1310 fn __repr__(&self) -> PyResult<String> {
1311 let config = self.ctx.copied_config();
1312 let mut config_entries = config
1313 .options()
1314 .entries()
1315 .iter()
1316 .filter(|e| e.value.is_some())
1317 .map(|e| format!("{} = {}", e.key, e.value.as_ref().unwrap()))
1318 .collect::<Vec<_>>();
1319 config_entries.sort();
1320 Ok(format!(
1321 "SessionContext: id={}; configs=[\n\t{}]",
1322 self.session_id(),
1323 config_entries.join("\n\t")
1324 ))
1325 }
1326
1327 pub fn execute(
1329 &self,
1330 plan: PyExecutionPlan,
1331 part: usize,
1332 py: Python,
1333 ) -> PyDataFusionResult<PyRecordBatchStream> {
1334 let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1335 let plan = plan.plan.clone();
1336 let stream = spawn_future(py, async move { plan.execute(part, Arc::new(ctx)) })?;
1337 Ok(PyRecordBatchStream::new(stream))
1338 }
1339
1340 pub fn __datafusion_task_context_provider__<'py>(
1341 &self,
1342 py: Python<'py>,
1343 ) -> PyResult<Bound<'py, PyCapsule>> {
1344 let name = cr"datafusion_task_context_provider".into();
1345
1346 let ctx_provider = Arc::clone(&self.ctx) as Arc<dyn TaskContextProvider>;
1347 let ffi_ctx_provider = FFI_TaskContextProvider::from(&ctx_provider);
1348
1349 PyCapsule::new(py, ffi_ctx_provider, Some(name))
1350 }
1351
1352 pub fn __datafusion_logical_extension_codec__<'py>(
1353 &self,
1354 py: Python<'py>,
1355 ) -> PyResult<Bound<'py, PyCapsule>> {
1356 create_logical_extension_capsule(py, self.logical_codec.as_ref())
1357 }
1358
1359 pub fn with_logical_extension_codec<'py>(
1360 &self,
1361 codec: Bound<'py, PyAny>,
1362 ) -> PyDataFusionResult<Self> {
1363 let logical_codec = Arc::new(ffi_logical_codec_from_pycapsule(codec)?);
1364
1365 Ok({
1366 Self {
1367 ctx: Arc::clone(&self.ctx),
1368 logical_codec,
1369 }
1370 })
1371 }
1372}
1373
1374impl PySessionContext {
1375 async fn _table(&self, name: &str) -> datafusion::common::Result<DataFrame> {
1376 self.ctx.table(name).await
1377 }
1378
1379 async fn register_csv_from_multiple_paths(
1380 &self,
1381 name: &str,
1382 table_paths: Vec<String>,
1383 options: CsvReadOptions<'_>,
1384 ) -> datafusion::common::Result<()> {
1385 let table_paths = table_paths.to_urls()?;
1386 let session_config = self.ctx.copied_config();
1387 let listing_options =
1388 options.to_listing_options(&session_config, self.ctx.copied_table_options());
1389
1390 let option_extension = listing_options.file_extension.clone();
1391
1392 if table_paths.is_empty() {
1393 return exec_err!("No table paths were provided");
1394 }
1395
1396 for path in &table_paths {
1398 let file_path = path.as_str();
1399 if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() {
1400 return exec_err!(
1401 "File path '{file_path}' does not match the expected extension '{option_extension}'"
1402 );
1403 }
1404 }
1405
1406 let resolved_schema = options
1407 .get_resolved_schema(&session_config, self.ctx.state(), table_paths[0].clone())
1408 .await?;
1409
1410 let config = ListingTableConfig::new_with_multi_paths(table_paths)
1411 .with_listing_options(listing_options)
1412 .with_schema(resolved_schema);
1413 let table = ListingTable::try_new(config)?;
1414 self.ctx
1415 .register_table(TableReference::Bare { table: name.into() }, Arc::new(table))?;
1416 Ok(())
1417 }
1418
1419 fn default_logical_codec(ctx: &Arc<SessionContext>) -> Arc<FFI_LogicalExtensionCodec> {
1420 let codec = Arc::new(DefaultLogicalExtensionCodec {});
1421 let runtime = get_tokio_runtime().handle().clone();
1422 let ctx_provider = Arc::clone(ctx) as Arc<dyn TaskContextProvider>;
1423 Arc::new(FFI_LogicalExtensionCodec::new(
1424 codec,
1425 Some(runtime),
1426 &ctx_provider,
1427 ))
1428 }
1429}
1430
1431pub fn parse_file_compression_type(
1432 file_compression_type: Option<String>,
1433) -> Result<FileCompressionType, PyErr> {
1434 FileCompressionType::from_str(&*file_compression_type.unwrap_or("".to_string()).as_str())
1435 .map_err(|_| {
1436 PyValueError::new_err("file_compression_type must one of: gzip, bz2, xz, zstd")
1437 })
1438}
1439
1440impl From<PySessionContext> for SessionContext {
1441 fn from(ctx: PySessionContext) -> SessionContext {
1442 ctx.ctx.as_ref().clone()
1443 }
1444}
1445
1446impl From<SessionContext> for PySessionContext {
1447 fn from(ctx: SessionContext) -> PySessionContext {
1448 let ctx = Arc::new(ctx);
1449 let logical_codec = Self::default_logical_codec(&ctx);
1450
1451 PySessionContext { ctx, logical_codec }
1452 }
1453}