1use std::sync::Arc;
2
3use polars::prelude::default_values::DefaultFieldValues;
4use polars::prelude::deletion::DeletionFilesList;
5use polars::prelude::{
6 CastColumnsPolicy, ColumnMapping, ExtraColumnsPolicy, MissingColumnsPolicy, PlSmallStr, Schema,
7 TableStatistics, UnifiedScanArgs,
8};
9use polars_io::{HiveOptions, RowIndex};
10use polars_utils::IdxSize;
11use polars_utils::plpath::PlPathRef;
12use polars_utils::slice_enum::Slice;
13use pyo3::pybacked::PyBackedStr;
14use pyo3::types::PyAnyMethods;
15use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, intern};
16
17use crate::PyDataFrame;
18use crate::functions::parse_cloud_options;
19use crate::prelude::Wrap;
20
21pub struct PyScanOptions<'py>(Bound<'py, pyo3::PyAny>);
23
24impl<'py> FromPyObject<'py> for PyScanOptions<'py> {
25 fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
26 Ok(Self(ob.clone()))
27 }
28}
29
30impl<'py> FromPyObject<'py> for Wrap<TableStatistics> {
31 fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
32 let py = ob.py();
33 Ok(Wrap(TableStatistics(Arc::new(
34 PyDataFrame::extract_bound(&ob.getattr(intern!(py, "_df"))?)?
35 .df
36 .into_inner(),
37 ))))
38 }
39}
40
41impl PyScanOptions<'_> {
42 pub fn extract_unified_scan_args(
43 &self,
44 first_path: Option<PlPathRef>,
46 ) -> PyResult<UnifiedScanArgs> {
47 #[derive(FromPyObject)]
48 struct Extract {
49 row_index: Option<(Wrap<PlSmallStr>, IdxSize)>,
50 pre_slice: Option<(i64, usize)>,
51 cast_options: Wrap<CastColumnsPolicy>,
52 extra_columns: Wrap<ExtraColumnsPolicy>,
53 missing_columns: Wrap<MissingColumnsPolicy>,
54 include_file_paths: Option<Wrap<PlSmallStr>>,
55 glob: bool,
56 hidden_file_prefix: Option<Vec<PyBackedStr>>,
57 column_mapping: Option<Wrap<ColumnMapping>>,
58 default_values: Option<Wrap<DefaultFieldValues>>,
59 hive_partitioning: Option<bool>,
60 hive_schema: Option<Wrap<Schema>>,
61 try_parse_hive_dates: bool,
62 rechunk: bool,
63 cache: bool,
64 storage_options: Option<Vec<(String, String)>>,
65 credential_provider: Option<Py<PyAny>>,
66 retries: usize,
67 deletion_files: Option<Wrap<DeletionFilesList>>,
68 table_statistics: Option<Wrap<TableStatistics>>,
69 row_count: Option<(u64, u64)>,
70 }
71
72 let Extract {
73 row_index,
74 pre_slice,
75 cast_options,
76 extra_columns,
77 missing_columns,
78 include_file_paths,
79 column_mapping,
80 default_values,
81 glob,
82 hidden_file_prefix,
83 hive_partitioning,
84 hive_schema,
85 try_parse_hive_dates,
86 rechunk,
87 cache,
88 storage_options,
89 credential_provider,
90 retries,
91 deletion_files,
92 table_statistics,
93 row_count,
94 } = self.0.extract()?;
95
96 let cloud_options =
97 parse_cloud_options(first_path, storage_options, credential_provider, retries)?;
98
99 let hive_schema = hive_schema.map(|s| Arc::new(s.0));
100
101 let row_index = row_index.map(|(name, offset)| RowIndex {
102 name: name.0,
103 offset,
104 });
105
106 let hive_options = HiveOptions {
107 enabled: hive_partitioning,
108 hive_start_idx: 0,
109 schema: hive_schema,
110 try_parse_dates: try_parse_hive_dates,
111 };
112
113 let unified_scan_args = UnifiedScanArgs {
114 schema: None,
117 cloud_options,
118 hive_options,
119 rechunk,
120 cache,
121 glob,
122 hidden_file_prefix: hidden_file_prefix
123 .map(|x| x.into_iter().map(|x| (*x).into()).collect()),
124 projection: None,
125 column_mapping: column_mapping.map(|x| x.0),
126 default_values: default_values
127 .map(|x| x.0)
128 .filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty()),
129 row_index,
130 pre_slice: pre_slice.map(Slice::from),
131 cast_columns_policy: cast_options.0,
132 missing_columns_policy: missing_columns.0,
133 extra_columns_policy: extra_columns.0,
134 include_file_paths: include_file_paths.map(|x| x.0),
135 deletion_files: DeletionFilesList::filter_empty(deletion_files.map(|x| x.0)),
136 table_statistics: table_statistics.map(|x| x.0),
137 row_count,
138 };
139
140 Ok(unified_scan_args)
141 }
142}