polars_python/io/
mod.rs

1use std::sync::Arc;
2
3use polars::prelude::default_values::DefaultFieldValues;
4use polars::prelude::deletion::DeletionFilesList;
5use polars::prelude::{
6    CastColumnsPolicy, ColumnMapping, ExtraColumnsPolicy, MissingColumnsPolicy, PlSmallStr, Schema,
7    TableStatistics, UnifiedScanArgs,
8};
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::IdxSize;
11use polars_utils::plpath::PlPathRef;
12use polars_utils::slice_enum::Slice;
13use pyo3::pybacked::PyBackedStr;
14use pyo3::types::PyAnyMethods;
15use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, intern};
16
17use crate::PyDataFrame;
18use crate::functions::parse_cloud_options;
19use crate::prelude::Wrap;
20
21/// Interface to `class ScanOptions` on the Python side
22pub struct PyScanOptions<'py>(Bound<'py, pyo3::PyAny>);
23
24impl<'py> FromPyObject<'py> for PyScanOptions<'py> {
25    fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
26        Ok(Self(ob.clone()))
27    }
28}
29
30impl<'py> FromPyObject<'py> for Wrap<TableStatistics> {
31    fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
32        let py = ob.py();
33        Ok(Wrap(TableStatistics(Arc::new(
34            PyDataFrame::extract_bound(&ob.getattr(intern!(py, "_df"))?)?
35                .df
36                .into_inner(),
37        ))))
38    }
39}
40
41impl PyScanOptions<'_> {
42    pub fn extract_unified_scan_args(
43        &self,
44        // For cloud_options init
45        first_path: Option<PlPathRef>,
46    ) -> PyResult<UnifiedScanArgs> {
47        #[derive(FromPyObject)]
48        struct Extract {
49            row_index: Option<(Wrap<PlSmallStr>, IdxSize)>,
50            pre_slice: Option<(i64, usize)>,
51            cast_options: Wrap<CastColumnsPolicy>,
52            extra_columns: Wrap<ExtraColumnsPolicy>,
53            missing_columns: Wrap<MissingColumnsPolicy>,
54            include_file_paths: Option<Wrap<PlSmallStr>>,
55            glob: bool,
56            hidden_file_prefix: Option<Vec<PyBackedStr>>,
57            column_mapping: Option<Wrap<ColumnMapping>>,
58            default_values: Option<Wrap<DefaultFieldValues>>,
59            hive_partitioning: Option<bool>,
60            hive_schema: Option<Wrap<Schema>>,
61            try_parse_hive_dates: bool,
62            rechunk: bool,
63            cache: bool,
64            storage_options: Option<Vec<(String, String)>>,
65            credential_provider: Option<Py<PyAny>>,
66            retries: usize,
67            deletion_files: Option<Wrap<DeletionFilesList>>,
68            table_statistics: Option<Wrap<TableStatistics>>,
69            row_count: Option<(u64, u64)>,
70        }
71
72        let Extract {
73            row_index,
74            pre_slice,
75            cast_options,
76            extra_columns,
77            missing_columns,
78            include_file_paths,
79            column_mapping,
80            default_values,
81            glob,
82            hidden_file_prefix,
83            hive_partitioning,
84            hive_schema,
85            try_parse_hive_dates,
86            rechunk,
87            cache,
88            storage_options,
89            credential_provider,
90            retries,
91            deletion_files,
92            table_statistics,
93            row_count,
94        } = self.0.extract()?;
95
96        let cloud_options =
97            parse_cloud_options(first_path, storage_options, credential_provider, retries)?;
98
99        let hive_schema = hive_schema.map(|s| Arc::new(s.0));
100
101        let row_index = row_index.map(|(name, offset)| RowIndex {
102            name: name.0,
103            offset,
104        });
105
106        let hive_options = HiveOptions {
107            enabled: hive_partitioning,
108            hive_start_idx: 0,
109            schema: hive_schema,
110            try_parse_dates: try_parse_hive_dates,
111        };
112
113        let unified_scan_args = UnifiedScanArgs {
114            // Schema is currently still stored inside the options per scan type, but we do eventually
115            // want to put it here instead.
116            schema: None,
117            cloud_options,
118            hive_options,
119            rechunk,
120            cache,
121            glob,
122            hidden_file_prefix: hidden_file_prefix
123                .map(|x| x.into_iter().map(|x| (*x).into()).collect()),
124            projection: None,
125            column_mapping: column_mapping.map(|x| x.0),
126            default_values: default_values
127                .map(|x| x.0)
128                .filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty()),
129            row_index,
130            pre_slice: pre_slice.map(Slice::from),
131            cast_columns_policy: cast_options.0,
132            missing_columns_policy: missing_columns.0,
133            extra_columns_policy: extra_columns.0,
134            include_file_paths: include_file_paths.map(|x| x.0),
135            deletion_files: DeletionFilesList::filter_empty(deletion_files.map(|x| x.0)),
136            table_statistics: table_statistics.map(|x| x.0),
137            row_count,
138        };
139
140        Ok(unified_scan_args)
141    }
142}