use std::sync::Arc;
use polars::prelude::default_values::DefaultFieldValues;
use polars::prelude::deletion::DeletionFilesList;
use polars::prelude::{
CastColumnsPolicy, ColumnMapping, ExtraColumnsPolicy, MissingColumnsPolicy, PlSmallStr, Schema,
TableStatistics, UnifiedScanArgs,
};
use polars_io::{HiveOptions, RowIndex};
use polars_utils::IdxSize;
use polars_utils::plpath::PlPathRef;
use polars_utils::slice_enum::Slice;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyAnyMethods;
use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, intern};
use crate::PyDataFrame;
use crate::functions::parse_cloud_options;
use crate::prelude::Wrap;
pub struct PyScanOptions<'py>(Bound<'py, pyo3::PyAny>);
impl<'py> FromPyObject<'py> for PyScanOptions<'py> {
fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
Ok(Self(ob.clone()))
}
}
impl<'py> FromPyObject<'py> for Wrap<TableStatistics> {
fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
let py = ob.py();
Ok(Wrap(TableStatistics(Arc::new(
PyDataFrame::extract_bound(&ob.getattr(intern!(py, "_df"))?)?
.df
.into_inner(),
))))
}
}
impl PyScanOptions<'_> {
pub fn extract_unified_scan_args(
&self,
first_path: Option<PlPathRef>,
) -> PyResult<UnifiedScanArgs> {
#[derive(FromPyObject)]
struct Extract {
row_index: Option<(Wrap<PlSmallStr>, IdxSize)>,
pre_slice: Option<(i64, usize)>,
cast_options: Wrap<CastColumnsPolicy>,
extra_columns: Wrap<ExtraColumnsPolicy>,
missing_columns: Wrap<MissingColumnsPolicy>,
include_file_paths: Option<Wrap<PlSmallStr>>,
glob: bool,
hidden_file_prefix: Option<Vec<PyBackedStr>>,
column_mapping: Option<Wrap<ColumnMapping>>,
default_values: Option<Wrap<DefaultFieldValues>>,
hive_partitioning: Option<bool>,
hive_schema: Option<Wrap<Schema>>,
try_parse_hive_dates: bool,
rechunk: bool,
cache: bool,
storage_options: Option<Vec<(String, String)>>,
credential_provider: Option<Py<PyAny>>,
retries: usize,
deletion_files: Option<Wrap<DeletionFilesList>>,
table_statistics: Option<Wrap<TableStatistics>>,
row_count: Option<(u64, u64)>,
}
let Extract {
row_index,
pre_slice,
cast_options,
extra_columns,
missing_columns,
include_file_paths,
column_mapping,
default_values,
glob,
hidden_file_prefix,
hive_partitioning,
hive_schema,
try_parse_hive_dates,
rechunk,
cache,
storage_options,
credential_provider,
retries,
deletion_files,
table_statistics,
row_count,
} = self.0.extract()?;
let cloud_options =
parse_cloud_options(first_path, storage_options, credential_provider, retries)?;
let hive_schema = hive_schema.map(|s| Arc::new(s.0));
let row_index = row_index.map(|(name, offset)| RowIndex {
name: name.0,
offset,
});
let hive_options = HiveOptions {
enabled: hive_partitioning,
hive_start_idx: 0,
schema: hive_schema,
try_parse_dates: try_parse_hive_dates,
};
let unified_scan_args = UnifiedScanArgs {
schema: None,
cloud_options,
hive_options,
rechunk,
cache,
glob,
hidden_file_prefix: hidden_file_prefix
.map(|x| x.into_iter().map(|x| (*x).into()).collect()),
projection: None,
column_mapping: column_mapping.map(|x| x.0),
default_values: default_values
.map(|x| x.0)
.filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty()),
row_index,
pre_slice: pre_slice.map(Slice::from),
cast_columns_policy: cast_options.0,
missing_columns_policy: missing_columns.0,
extra_columns_policy: extra_columns.0,
include_file_paths: include_file_paths.map(|x| x.0),
deletion_files: DeletionFilesList::filter_empty(deletion_files.map(|x| x.0)),
table_statistics: table_statistics.map(|x| x.0),
row_count,
};
Ok(unified_scan_args)
}
}