use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use polars_core::prelude::*;
#[cfg(any(feature = "cloud", feature = "parquet"))]
use polars_io::cloud::CloudOptions;
use crate::logical_plan::LogicalPlan::DataFrameScan;
use crate::prelude::*;
use crate::utils::{expr_to_leaf_column_names, get_single_leaf};
pub(crate) mod aexpr;
pub(crate) mod alp;
pub(crate) mod anonymous_scan;
mod apply;
mod builder;
mod builder_alp;
pub mod builder_functions;
pub(crate) mod conversion;
#[cfg(feature = "debugging")]
pub(crate) mod debug;
mod file_scan;
mod format;
mod functions;
pub(super) mod hive;
pub(crate) mod iterator;
mod lit;
pub(crate) mod optimizer;
pub(crate) mod options;
pub(crate) mod projection;
mod projection_expr;
#[cfg(feature = "python")]
mod pyarrow;
mod schema;
#[cfg(any(feature = "meta", feature = "cse"))]
pub(crate) mod tree_format;
pub mod visitor;
pub use aexpr::*;
pub use alp::*;
pub use anonymous_scan::*;
pub use apply::*;
pub use builder::*;
pub use builder_alp::*;
pub use conversion::*;
pub use file_scan::*;
pub use functions::*;
pub use iterator::*;
pub use lit::*;
pub use optimizer::*;
pub use schema::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "cse",
feature = "json"
))]
pub use crate::logical_plan::optimizer::file_caching::{
collect_fingerprints, find_column_union_and_fingerprints, FileCacher, FileFingerPrint,
};
#[derive(Clone, Copy, Debug)]
pub enum Context {
Aggregation,
Default,
}
#[derive(Debug)]
pub enum ErrorState {
NotYetEncountered { err: PolarsError },
AlreadyEncountered { prev_err_msg: String },
}
impl std::fmt::Display for ErrorState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ErrorState::NotYetEncountered { err } => write!(f, "NotYetEncountered({err})")?,
ErrorState::AlreadyEncountered { prev_err_msg } => {
write!(f, "AlreadyEncountered({prev_err_msg})")?
},
};
Ok(())
}
}
#[derive(Clone)]
pub struct ErrorStateSync(Arc<Mutex<ErrorState>>);
impl std::ops::Deref for ErrorStateSync {
type Target = Arc<Mutex<ErrorState>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::fmt::Debug for ErrorStateSync {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ErrorStateSync({})", &*self.0.lock().unwrap())
}
}
impl ErrorStateSync {
fn take(&self) -> PolarsError {
let mut curr_err = self.0.lock().unwrap();
match &*curr_err {
ErrorState::NotYetEncountered { err: polars_err } => {
let prev_err_msg = polars_err.to_string();
let prev_err = std::mem::replace(
&mut *curr_err,
ErrorState::AlreadyEncountered { prev_err_msg },
);
match prev_err {
ErrorState::NotYetEncountered { err } => err,
ErrorState::AlreadyEncountered { .. } => unreachable!(),
}
},
ErrorState::AlreadyEncountered { prev_err_msg } => {
polars_err!(
ComputeError: "LogicalPlan already failed with error: '{}'", prev_err_msg,
)
},
}
}
}
impl From<PolarsError> for ErrorStateSync {
fn from(err: PolarsError) -> Self {
Self(Arc::new(Mutex::new(ErrorState::NotYetEncountered { err })))
}
}
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum LogicalPlan {
#[cfg(feature = "python")]
PythonScan { options: PythonOptions },
Selection {
input: Box<LogicalPlan>,
predicate: Expr,
},
Cache {
input: Box<LogicalPlan>,
id: usize,
count: usize,
},
Scan {
paths: Arc<[PathBuf]>,
file_info: FileInfo,
predicate: Option<Expr>,
file_options: FileScanOptions,
scan_type: FileScan,
},
DataFrameScan {
df: Arc<DataFrame>,
schema: SchemaRef,
output_schema: Option<SchemaRef>,
projection: Option<Arc<Vec<String>>>,
selection: Option<Expr>,
},
Projection {
expr: Vec<Expr>,
input: Box<LogicalPlan>,
schema: SchemaRef,
options: ProjectionOptions,
},
Aggregate {
input: Box<LogicalPlan>,
keys: Arc<Vec<Expr>>,
aggs: Vec<Expr>,
schema: SchemaRef,
#[cfg_attr(feature = "serde", serde(skip))]
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
options: Arc<GroupbyOptions>,
},
Join {
input_left: Box<LogicalPlan>,
input_right: Box<LogicalPlan>,
schema: SchemaRef,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
options: Arc<JoinOptions>,
},
HStack {
input: Box<LogicalPlan>,
exprs: Vec<Expr>,
schema: SchemaRef,
options: ProjectionOptions,
},
Distinct {
input: Box<LogicalPlan>,
options: DistinctOptions,
},
Sort {
input: Box<LogicalPlan>,
by_column: Vec<Expr>,
args: SortArguments,
},
Slice {
input: Box<LogicalPlan>,
offset: i64,
len: IdxSize,
},
MapFunction {
input: Box<LogicalPlan>,
function: FunctionNode,
},
Union {
inputs: Vec<LogicalPlan>,
options: UnionOptions,
},
#[cfg(feature = "horizontal_concat")]
HConcat {
inputs: Vec<LogicalPlan>,
schema: SchemaRef,
options: HConcatOptions,
},
#[cfg_attr(feature = "serde", serde(skip))]
Error {
input: Box<LogicalPlan>,
err: ErrorStateSync,
},
ExtContext {
input: Box<LogicalPlan>,
contexts: Vec<LogicalPlan>,
schema: SchemaRef,
},
Sink {
input: Box<LogicalPlan>,
payload: SinkType,
},
}
impl Default for LogicalPlan {
fn default() -> Self {
let df = DataFrame::new::<Series>(vec![]).unwrap();
let schema = df.schema();
DataFrameScan {
df: Arc::new(df),
schema: Arc::new(schema),
output_schema: None,
projection: None,
selection: None,
}
}
}
impl LogicalPlan {
pub fn describe(&self) -> String {
format!("{self:#?}")
}
pub fn to_alp(self) -> PolarsResult<(Node, Arena<ALogicalPlan>, Arena<AExpr>)> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
let node = to_alp(self, &mut expr_arena, &mut lp_arena)?;
Ok((node, lp_arena, expr_arena))
}
}