use std::sync::Arc;
#[cfg(feature = "pivot")]
use polars_core::frame::PivotColumnNaming;
use polars_core::prelude::*;
#[cfg(feature = "csv")]
use polars_io::csv::read::CsvReadOptions;
#[cfg(feature = "ipc")]
use polars_io::ipc::IpcScanOptions;
#[cfg(feature = "parquet")]
use polars_io::parquet::read::ParquetOptions;
use polars_utils::unique_id::UniqueId;
use crate::dsl::functions::lit;
#[cfg(feature = "python")]
use crate::dsl::python_dsl::PythonFunction;
use crate::prelude::*;
pub struct DslBuilder(pub DslPlan);
impl From<DslPlan> for DslBuilder {
fn from(lp: DslPlan) -> Self {
DslBuilder(lp)
}
}
impl DslBuilder {
pub fn anonymous_scan(
function: Arc<dyn AnonymousScan>,
options: AnonymousScanOptions,
unified_scan_args: UnifiedScanArgs,
) -> PolarsResult<Self> {
let schema = unified_scan_args.schema.clone().ok_or_else(|| {
polars_err!(
ComputeError:
"anonymous scan requires schema to be specified in unified_scan_args"
)
})?;
Ok(DslPlan::Scan {
sources: ScanSources::default(),
unified_scan_args: Box::new(unified_scan_args),
scan_type: Box::new(FileScanDsl::Anonymous {
function,
options: Arc::new(options),
file_info: FileInfo {
schema: schema.clone(),
reader_schema: Some(either::Either::Right(schema)),
..Default::default()
},
}),
cached_ir: Default::default(),
}
.into())
}
#[cfg(feature = "parquet")]
pub fn scan_parquet(
sources: ScanSources,
options: ParquetOptions,
unified_scan_args: UnifiedScanArgs,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources,
unified_scan_args: Box::new(unified_scan_args),
scan_type: Box::new(FileScanDsl::Parquet { options }),
cached_ir: Default::default(),
}
.into())
}
#[cfg(feature = "ipc")]
pub fn scan_ipc(
sources: ScanSources,
options: IpcScanOptions,
unified_scan_args: UnifiedScanArgs,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources,
unified_scan_args: Box::new(unified_scan_args),
scan_type: Box::new(FileScanDsl::Ipc { options }),
cached_ir: Default::default(),
}
.into())
}
#[cfg(feature = "scan_lines")]
pub fn scan_lines(
sources: ScanSources,
unified_scan_args: UnifiedScanArgs,
name: PlSmallStr,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources,
unified_scan_args: Box::new(unified_scan_args),
scan_type: Box::new(FileScanDsl::Lines { name }),
cached_ir: Default::default(),
}
.into())
}
pub fn expand_paths(
sources: ScanSources,
unified_scan_args: UnifiedScanArgs,
name: PlSmallStr,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources,
unified_scan_args: Box::new(unified_scan_args),
scan_type: Box::new(FileScanDsl::ExpandedPaths { name }),
cached_ir: Default::default(),
}
.into())
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "csv")]
pub fn scan_csv(
sources: ScanSources,
options: impl Into<Arc<CsvReadOptions>>,
unified_scan_args: UnifiedScanArgs,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources,
unified_scan_args: Box::new(unified_scan_args),
scan_type: Box::new(FileScanDsl::Csv {
options: options.into(),
}),
cached_ir: Default::default(),
}
.into())
}
#[cfg(feature = "python")]
pub fn scan_python_dataset(
dataset_object: polars_utils::python_function::PythonObject,
) -> DslBuilder {
use super::python_dataset::PythonDatasetProvider;
DslPlan::Scan {
sources: ScanSources::default(),
unified_scan_args: Default::default(),
scan_type: Box::new(FileScanDsl::PythonDataset {
dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),
}),
cached_ir: Default::default(),
}
.into()
}
pub fn cache(self) -> Self {
let input = Arc::new(self.0);
DslPlan::Cache {
input,
id: UniqueId::new(),
}
.into()
}
pub fn drop(self, columns: Selector) -> Self {
self.project(vec![Expr::Selector(!columns)], ProjectionOptions::default())
}
pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
DslPlan::Select {
expr: exprs,
input: Arc::new(self.0),
options,
}
.into()
}
pub fn fill_null(self, fill_value: Expr) -> Self {
self.project(
vec![functions::all().as_expr().fill_null(fill_value)],
ProjectionOptions {
duplicate_check: false,
..Default::default()
},
)
}
pub fn drop_nans(self, subset: Option<Selector>) -> Self {
let is_nan = subset
.unwrap_or(DataTypeSelector::Float.as_selector())
.as_expr()
.is_nan();
self.remove(functions::any_horizontal([is_nan]).unwrap())
}
pub fn drop_nulls(self, subset: Option<Selector>) -> Self {
let is_not_null = subset.unwrap_or(Selector::Wildcard).as_expr().is_not_null();
self.filter(functions::all_horizontal([is_not_null]).unwrap())
}
pub fn fill_nan(self, fill_value: Expr) -> Self {
self.map_private(DslFunction::FillNan(fill_value))
}
pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
if exprs.is_empty() {
return self;
}
DslPlan::HStack {
input: Arc::new(self.0),
exprs,
options,
}
.into()
}
pub fn match_to_schema(
self,
match_schema: SchemaRef,
per_column: Arc<[MatchToSchemaPerColumn]>,
extra_columns: ExtraColumnsPolicy,
) -> Self {
DslPlan::MatchToSchema {
input: Arc::new(self.0),
match_schema,
per_column,
extra_columns,
}
.into()
}
pub fn pipe_with_schema(
self,
others: Vec<DslPlan>,
callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
) -> Self {
let mut input = vec![self.0];
input.extend(others);
DslPlan::PipeWithSchema {
input: Arc::from(input),
callback,
}
.into()
}
pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
DslPlan::ExtContext {
input: Arc::new(self.0),
contexts,
}
.into()
}
pub fn filter(self, predicate: Expr) -> Self {
DslPlan::Filter {
predicate,
input: Arc::new(self.0),
}
.into()
}
pub fn remove(self, predicate: Expr) -> Self {
DslPlan::Filter {
predicate: predicate.neq_missing(lit(true)),
input: Arc::new(self.0),
}
.into()
}
#[allow(clippy::too_many_arguments)]
pub fn group_by<E: AsRef<[Expr]>>(
self,
keys: Vec<Expr>,
predicates: Vec<Expr>,
aggs: E,
apply: Option<(PlanCallback<DataFrame, DataFrame>, SchemaRef)>,
maintain_order: bool,
#[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
#[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
) -> Self {
let aggs = aggs.as_ref().to_vec();
let options = GroupbyOptions {
#[cfg(feature = "dynamic_group_by")]
dynamic: dynamic_options,
#[cfg(feature = "dynamic_group_by")]
rolling: rolling_options,
slice: None,
};
DslPlan::GroupBy {
input: Arc::new(self.0),
keys,
predicates,
aggs,
apply,
maintain_order,
options: Arc::new(options),
}
.into()
}
pub fn build(self) -> DslPlan {
self.0
}
pub fn from_existing_df(df: DataFrame) -> Self {
let schema = df.schema().clone();
DslPlan::DataFrameScan {
df: Arc::new(df),
schema,
}
.into()
}
pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
DslPlan::Sort {
input: Arc::new(self.0),
by_column,
slice: None,
sort_options,
}
.into()
}
pub fn explode(self, columns: Selector, options: ExplodeOptions, allow_empty: bool) -> Self {
DslPlan::MapFunction {
input: Arc::new(self.0),
function: DslFunction::Explode {
columns,
options,
allow_empty,
},
}
.into()
}
#[cfg(feature = "pivot")]
#[expect(clippy::too_many_arguments)]
pub fn pivot(
self,
on: Selector,
on_columns: Arc<DataFrame>,
index: Selector,
values: Selector,
agg: Expr,
maintain_order: bool,
separator: PlSmallStr,
column_naming: PivotColumnNaming,
) -> Self {
DslPlan::Pivot {
input: Arc::new(self.0),
on,
on_columns,
index,
values,
agg,
maintain_order,
separator,
column_naming,
}
.into()
}
#[cfg(feature = "pivot")]
pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
DslPlan::MapFunction {
input: Arc::new(self.0),
function: DslFunction::Unpivot { args },
}
.into()
}
pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
DslPlan::MapFunction {
input: Arc::new(self.0),
function: DslFunction::RowIndex { name, offset },
}
.into()
}
pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
DslPlan::Distinct {
input: Arc::new(self.0),
options,
}
.into()
}
pub fn slice(self, offset: i64, len: IdxSize) -> Self {
DslPlan::Slice {
input: Arc::new(self.0),
offset,
len,
}
.into()
}
pub fn join(
self,
other: DslPlan,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
options: Arc<JoinOptions>,
) -> Self {
DslPlan::Join {
input_left: Arc::new(self.0),
input_right: Arc::new(other),
left_on,
right_on,
predicates: Default::default(),
options,
}
.into()
}
pub fn gather(self, idxs: DslPlan, null_on_oob: bool) -> Self {
DslPlan::Gather {
input: Arc::new(self.0),
idxs: Arc::new(idxs),
null_on_oob,
}
.into()
}
pub fn map_private(self, function: DslFunction) -> Self {
DslPlan::MapFunction {
input: Arc::new(self.0),
function,
}
.into()
}
#[cfg(feature = "python")]
pub fn map_python(
self,
function: PythonFunction,
optimizations: AllowedOptimizations,
schema: Option<SchemaRef>,
validate_output: bool,
) -> Self {
DslPlan::MapFunction {
input: Arc::new(self.0),
function: DslFunction::OpaquePython(OpaquePythonUdf {
function,
schema,
predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
streamable: optimizations.contains(OptFlags::STREAMING),
validate_output,
}),
}
.into()
}
pub fn map<F>(
self,
function: F,
optimizations: AllowedOptimizations,
schema: Option<Arc<dyn UdfSchema>>,
name: PlSmallStr,
) -> Self
where
F: DataFrameUdf + 'static,
{
let function = Arc::new(function);
DslPlan::MapFunction {
input: Arc::new(self.0),
function: DslFunction::FunctionIR(FunctionIR::Opaque {
function,
schema,
predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
streamable: optimizations.contains(OptFlags::STREAMING),
fmt_str: name,
}),
}
.into()
}
}