#[cfg(feature = "csv-file")]
mod csv;
#[cfg(feature = "ipc")]
mod ipc;
#[cfg(feature = "json")]
mod ndjson;
#[cfg(feature = "parquet")]
mod parquet;
#[cfg(feature = "python")]
mod python;
mod anonymous_scan;
#[cfg(feature = "pivot")]
pub mod pivot;
use std::borrow::Cow;
#[cfg(feature = "parquet")]
use std::path::PathBuf;
use std::sync::Arc;
pub use anonymous_scan::*;
#[cfg(feature = "csv-file")]
pub use csv::*;
#[cfg(feature = "ipc")]
pub use ipc::*;
#[cfg(feature = "json")]
pub use ndjson::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use polars_arrow::prelude::QuantileInterpolOptions;
use polars_core::frame::explode::MeltArgs;
use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
use polars_io::RowCount;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
use polars_plan::logical_plan::collect_fingerprints;
use polars_plan::logical_plan::optimize;
use polars_plan::utils::{combine_predicates_expr, expr_to_leaf_column_names};
use crate::physical_plan::executors::Executor;
use crate::physical_plan::planner::create_physical_plan;
use crate::physical_plan::state::ExecutionState;
#[cfg(feature = "streaming")]
use crate::physical_plan::streaming::insert_streaming_nodes;
use crate::prelude::*;
pub trait IntoLazy {
fn lazy(self) -> LazyFrame;
}
impl IntoLazy for DataFrame {
fn lazy(self) -> LazyFrame {
let lp = LogicalPlanBuilder::from_existing_df(self).build();
LazyFrame {
logical_plan: lp,
opt_state: Default::default(),
}
}
}
#[derive(Clone, Default)]
#[must_use]
pub struct LazyFrame {
pub logical_plan: LogicalPlan,
pub(crate) opt_state: OptState,
}
impl From<LogicalPlan> for LazyFrame {
fn from(plan: LogicalPlan) -> Self {
Self {
logical_plan: plan,
opt_state: OptState {
file_caching: true,
..Default::default()
},
}
}
}
impl LazyFrame {
pub fn schema(&self) -> PolarsResult<SchemaRef> {
let logical_plan = self.clone().get_plan_builder().build();
logical_plan.schema().map(|schema| schema.into_owned())
}
pub(crate) fn get_plan_builder(self) -> LogicalPlanBuilder {
LogicalPlanBuilder::from(self.logical_plan)
}
fn get_opt_state(&self) -> OptState {
self.opt_state
}
fn from_logical_plan(logical_plan: LogicalPlan, opt_state: OptState) -> Self {
LazyFrame {
logical_plan,
opt_state,
}
}
pub fn with_optimizations(mut self, opt_state: OptState) -> Self {
self.opt_state = opt_state;
self
}
pub fn without_optimizations(self) -> Self {
self.with_optimizations(OptState {
projection_pushdown: false,
predicate_pushdown: false,
type_coercion: true,
simplify_expr: false,
slice_pushdown: false,
file_caching: false,
#[cfg(feature = "cse")]
common_subplan_elimination: false,
streaming: false,
})
}
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.projection_pushdown = toggle;
self
}
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.predicate_pushdown = toggle;
self
}
pub fn with_type_coercion(mut self, toggle: bool) -> Self {
self.opt_state.type_coercion = toggle;
self
}
pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
self.opt_state.simplify_expr = toggle;
self
}
#[cfg(feature = "cse")]
#[cfg_attr(docsrs, doc(cfg(feature = "cse")))]
pub fn with_common_subplan_elimination(mut self, toggle: bool) -> Self {
self.opt_state.common_subplan_elimination = toggle;
self
}
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.slice_pushdown = toggle;
self
}
pub fn with_streaming(mut self, toggle: bool) -> Self {
self.opt_state.streaming = toggle;
self
}
pub fn describe_plan(&self) -> String {
self.logical_plan.describe()
}
pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(64);
let lp_top = self.clone().optimize_with_scratch(
&mut lp_arena,
&mut expr_arena,
&mut vec![],
true,
)?;
let logical_plan = node_to_lp(lp_top, &expr_arena, &mut lp_arena);
Ok(logical_plan.describe())
}
pub fn sort(self, by_column: &str, options: SortOptions) -> Self {
let reverse = options.descending;
let nulls_last = options.nulls_last;
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.sort(vec![col(by_column)], vec![reverse], nulls_last)
.build();
Self::from_logical_plan(lp, opt_state)
}
pub fn sort_by_exprs<E: AsRef<[Expr]>, B: AsRef<[bool]>>(
self,
by_exprs: E,
reverse: B,
nulls_last: bool,
) -> Self {
let by_exprs = by_exprs.as_ref().to_vec();
let reverse = reverse.as_ref().to_vec();
if by_exprs.is_empty() {
self
} else {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.sort(by_exprs, reverse, nulls_last)
.build();
Self::from_logical_plan(lp, opt_state)
}
}
pub fn reverse(self) -> Self {
self.select_local(vec![col("*").reverse()])
}
fn rename_impl_swapping(self, mut existing: Vec<String>, mut new: Vec<String>) -> Self {
assert_eq!(new.len(), existing.len());
let mut removed = 0;
for mut idx in 0..existing.len() {
idx -= removed;
if existing[idx] == new[idx] {
existing.swap_remove(idx);
new.swap_remove(idx);
removed += 1;
}
}
let existing2 = existing.clone();
let new2 = new.clone();
let udf_schema = move |s: &Schema| {
let mut new_schema = s.clone();
for (old, new) in existing2.iter().zip(new2.iter()) {
new_schema
.rename(old, new.to_string())
.ok_or_else(|| PolarsError::NotFound(old.to_string().into()))?
}
Ok(Arc::new(new_schema))
};
let prefix = "__POLARS_TEMP_";
let new: Vec<String> = new.iter().map(|name| format!("{prefix}{name}")).collect();
self.with_columns(
existing
.iter()
.zip(&new)
.map(|(old, new)| col(old).alias(new))
.collect::<Vec<_>>(),
)
.map(
move |mut df: DataFrame| {
let mut cols = std::mem::take(df.get_columns_mut());
#[allow(clippy::needless_collect)]
let existing_idx = existing
.iter()
.map(|name| cols.iter().position(|s| s.name() == name.as_str()).unwrap())
.collect::<Vec<_>>();
let new_idx = new
.iter()
.map(|name| cols.iter().position(|s| s.name() == name.as_str()).unwrap())
.collect::<Vec<_>>();
for (existing_i, new_i) in existing_idx.into_iter().zip(new_idx) {
cols.swap(existing_i, new_i);
let s = &mut cols[existing_i];
let name = &s.name()[prefix.len()..].to_string();
s.rename(name);
}
cols.truncate(cols.len() - existing.len());
DataFrame::new(cols)
},
None,
Some(Arc::new(udf_schema)),
Some("RENAME_SWAPPING"),
)
}
fn rename_impl(self, existing: Vec<String>, new: Vec<String>) -> Self {
let existing2 = existing.clone();
let new2 = new.clone();
let udf_schema = move |s: &Schema| {
let mut new_schema = s.clone();
for (old, new) in existing2.iter().zip(&new2) {
let _ = new_schema.rename(old, new.clone());
}
Ok(Arc::new(new_schema))
};
self.with_columns(
existing
.iter()
.zip(&new)
.map(|(old, new)| col(old).alias(new.as_ref()))
.collect::<Vec<_>>(),
)
.map(
move |mut df: DataFrame| {
let cols = df.get_columns_mut();
let mut removed_count = 0;
for (existing, new) in existing.iter().zip(new.iter()) {
let idx_a = cols.iter().position(|s| s.name() == existing.as_str());
let idx_b = cols.iter().position(|s| s.name() == new.as_str());
match (idx_a, idx_b) {
(Some(idx_a), Some(idx_b)) => {
cols.swap(idx_a, idx_b);
}
_ => {
removed_count += 1;
continue;
}
}
}
cols.truncate(cols.len() - (existing.len() - removed_count));
Ok(df)
},
None,
Some(Arc::new(udf_schema)),
Some("RENAME"),
)
}
pub fn rename<I, J, T, S>(self, existing: I, new: J) -> Self
where
I: IntoIterator<Item = T>,
J: IntoIterator<Item = S>,
T: AsRef<str>,
S: AsRef<str>,
{
let existing = existing
.into_iter()
.map(|a| a.as_ref().to_string())
.collect::<Vec<_>>();
let new = new
.into_iter()
.map(|a| a.as_ref().to_string())
.collect::<Vec<_>>();
fn inner(lf: LazyFrame, existing: Vec<String>, new: Vec<String>) -> LazyFrame {
let (existing, new): (Vec<_>, Vec<_>) = existing
.into_iter()
.zip(new)
.flat_map(|(a, b)| if a == b { None } else { Some((a, b)) })
.unzip();
let schema = &*lf.schema().unwrap();
if new.iter().any(|name| schema.get(name).is_some()) {
lf.rename_impl_swapping(existing, new)
} else {
lf.rename_impl(existing, new)
}
}
inner(self, existing, new)
}
pub fn drop_columns<I, T>(self, columns: I) -> Self
where
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
let columns: Vec<String> = columns
.into_iter()
.map(|name| name.as_ref().to_string())
.collect();
self.drop_columns_impl(&columns)
}
#[allow(clippy::ptr_arg)]
fn drop_columns_impl(self, columns: &Vec<String>) -> Self {
self.select_local(vec![col("*").exclude(columns)])
}
pub fn shift(self, periods: i64) -> Self {
self.select_local(vec![col("*").shift(periods)])
}
pub fn shift_and_fill<E: Into<Expr>>(self, periods: i64, fill_value: E) -> Self {
self.select_local(vec![col("*").shift_and_fill(periods, fill_value.into())])
}
pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn cache(self) -> Self {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().cache().build();
Self::from_logical_plan(lp, opt_state)
}
pub fn fetch(self, n_rows: usize) -> PolarsResult<DataFrame> {
FETCH_ROWS.with(|fetch_rows| fetch_rows.set(Some(n_rows)));
let res = self.collect();
FETCH_ROWS.with(|fetch_rows| fetch_rows.set(None));
res
}
pub fn optimize(
self,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Node> {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}
pub(crate) fn optimize_with_scratch(
self,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
_fmt: bool,
) -> PolarsResult<Node> {
#[allow(unused_mut)]
let mut opt_state = self.opt_state;
let streaming = self.opt_state.streaming;
#[cfg(feature = "cse")]
if streaming && self.opt_state.common_subplan_elimination {
eprintln!("Cannot combine 'streaming' with 'common_subplan_elimination'. CSE will be turned off.");
opt_state.common_subplan_elimination = false;
}
let lp_top = optimize(self.logical_plan, opt_state, lp_arena, expr_arena, scratch)?;
if streaming {
#[cfg(feature = "streaming")]
{
insert_streaming_nodes(lp_top, lp_arena, expr_arena, scratch, _fmt)?;
}
#[cfg(not(feature = "streaming"))]
{
panic!("activate feature 'streaming'")
}
}
Ok(lp_top)
}
#[allow(unused_mut)]
fn prepare_collect(
mut self,
check_sink: bool,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
let file_caching = self.opt_state.file_caching;
let mut expr_arena = Arena::with_capacity(256);
let mut lp_arena = Arena::with_capacity(128);
let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
let finger_prints = if file_caching {
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
{
let mut fps = Vec::with_capacity(8);
collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena);
Some(fps)
}
#[cfg(not(any(feature = "ipc", feature = "parquet", feature = "csv-file")))]
{
None
}
} else {
None
};
let no_file_sink = if check_sink {
!matches!(lp_arena.get(lp_top), ALogicalPlan::FileSink { .. })
} else {
true
};
let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;
let state = ExecutionState::with_finger_prints(finger_prints);
Ok((state, physical_plan, no_file_sink))
}
pub fn collect(self) -> PolarsResult<DataFrame> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
let out = physical_plan.execute(&mut state);
#[cfg(debug_assertions)]
{
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
state.file_cache.assert_empty();
}
out
}
pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
state.time_nodes();
let out = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((out, timer_df))
}
#[cfg(feature = "parquet")]
pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.logical_plan = LogicalPlan::FileSink {
input: Box::new(self.logical_plan),
payload: FileSinkOptions {
path: Arc::new(path),
file_type: FileType::Parquet(options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
if is_streaming {
let _ = physical_plan.execute(&mut state)?;
Ok(())
} else {
Err(PolarsError::ComputeError("Cannot run whole the query in a streaming order. Use `collect().write_parquet()` instead.".into()))
}
}
pub fn filter(self, predicate: Expr) -> Self {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().filter(predicate).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.project(exprs.as_ref().to_vec())
.build();
Self::from_logical_plan(lp, opt_state)
}
fn select_local(self, exprs: Vec<Expr>) -> Self {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().project_local(exprs).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn groupby<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
let keys = by
.as_ref()
.iter()
.map(|e| e.clone().into())
.collect::<Vec<_>>();
let opt_state = self.get_opt_state();
#[cfg(feature = "dynamic_groupby")]
{
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys,
maintain_order: false,
dynamic_options: None,
rolling_options: None,
}
}
#[cfg(not(feature = "dynamic_groupby"))]
{
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys,
maintain_order: false,
}
}
}
#[cfg(feature = "dynamic_groupby")]
#[cfg_attr(docsrs, doc(cfg(feature = "dynamic_groupby")))]
pub fn groupby_rolling<E: AsRef<[Expr]>>(
self,
by: E,
options: RollingGroupOptions,
) -> LazyGroupBy {
let opt_state = self.get_opt_state();
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys: by.as_ref().to_vec(),
maintain_order: true,
dynamic_options: None,
rolling_options: Some(options),
}
}
#[cfg(feature = "dynamic_groupby")]
#[cfg_attr(docsrs, doc(cfg(feature = "dynamic_groupby")))]
pub fn groupby_dynamic<E: AsRef<[Expr]>>(
self,
by: E,
options: DynamicGroupOptions,
) -> LazyGroupBy {
let opt_state = self.get_opt_state();
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys: by.as_ref().to_vec(),
maintain_order: true,
dynamic_options: Some(options),
rolling_options: None,
}
}
pub fn groupby_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
let keys = by
.as_ref()
.iter()
.map(|e| e.clone().into())
.collect::<Vec<_>>();
let opt_state = self.get_opt_state();
#[cfg(feature = "dynamic_groupby")]
{
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys,
maintain_order: true,
dynamic_options: None,
rolling_options: None,
}
}
#[cfg(not(feature = "dynamic_groupby"))]
{
LazyGroupBy {
logical_plan: self.logical_plan,
opt_state,
keys,
maintain_order: true,
}
}
}
pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
self.join(other, [left_on.into()], [right_on.into()], JoinType::Left)
}
pub fn outer_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
self.join(other, [left_on.into()], [right_on.into()], JoinType::Outer)
}
pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
self.join(other, [left_on.into()], [right_on.into()], JoinType::Inner)
}
#[cfg(feature = "cross_join")]
pub fn cross_join(self, other: LazyFrame) -> LazyFrame {
self.join(other, vec![], vec![], JoinType::Cross)
}
pub fn join<E: AsRef<[Expr]>>(
mut self,
other: LazyFrame,
left_on: E,
right_on: E,
how: JoinType,
) -> LazyFrame {
self.opt_state.file_caching |= other.opt_state.file_caching;
let left_on = left_on.as_ref().to_vec();
let right_on = right_on.as_ref().to_vec();
self.join_builder()
.with(other)
.left_on(left_on)
.right_on(right_on)
.how(how)
.finish()
}
pub fn join_builder(self) -> JoinBuilder {
JoinBuilder::new(self)
}
pub fn with_column(self, expr: Expr) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_columns(vec![expr]).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
let exprs = exprs.as_ref().to_vec();
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_columns(exprs).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
let contexts = contexts
.as_ref()
.iter()
.map(|lf| lf.logical_plan.clone())
.collect();
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_context(contexts).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn max(self) -> LazyFrame {
self.select_local(vec![col("*").max()])
}
pub fn min(self) -> LazyFrame {
self.select_local(vec![col("*").min()])
}
pub fn sum(self) -> LazyFrame {
self.select_local(vec![col("*").sum()])
}
pub fn mean(self) -> LazyFrame {
self.select_local(vec![col("*").mean()])
}
pub fn median(self) -> LazyFrame {
self.select_local(vec![col("*").median()])
}
pub fn quantile(self, quantile: Expr, interpol: QuantileInterpolOptions) -> LazyFrame {
self.select_local(vec![col("*").quantile(quantile, interpol)])
}
pub fn std(self, ddof: u8) -> LazyFrame {
self.select_local(vec![col("*").std(ddof)])
}
pub fn var(self, ddof: u8) -> LazyFrame {
self.select_local(vec![col("*").var(ddof)])
}
pub fn explode<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, columns: E) -> LazyFrame {
let columns = columns
.as_ref()
.iter()
.map(|e| e.clone().into())
.collect::<Vec<_>>();
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().explode(columns).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn unique_stable(
self,
subset: Option<Vec<String>>,
keep_strategy: UniqueKeepStrategy,
) -> LazyFrame {
let opt_state = self.get_opt_state();
let options = DistinctOptions {
subset: subset.map(Arc::new),
maintain_order: true,
keep_strategy,
};
let lp = self.get_plan_builder().distinct(options).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn unique(
self,
subset: Option<Vec<String>>,
keep_strategy: UniqueKeepStrategy,
) -> LazyFrame {
let opt_state = self.get_opt_state();
let options = DistinctOptions {
subset: subset.map(Arc::new),
maintain_order: false,
keep_strategy,
};
let lp = self.get_plan_builder().distinct(options).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> LazyFrame {
match subset {
None => self.filter(col("*").is_not_null()),
Some(subset) => {
let it = subset.into_iter().map(|e| e.is_not_null());
let predicate = combine_predicates_expr(it);
self.filter(predicate)
}
}
}
pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().slice(offset, len).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn first(self) -> LazyFrame {
self.slice(0, 1)
}
pub fn last(self) -> LazyFrame {
self.slice(-1, 1)
}
pub fn tail(self, n: IdxSize) -> LazyFrame {
let neg_tail = -(n as i64);
self.slice(neg_tail, n)
}
pub fn melt(self, args: MeltArgs) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().melt(Arc::new(args)).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn limit(self, n: IdxSize) -> LazyFrame {
self.slice(0, n)
}
pub fn map<F>(
self,
function: F,
optimizations: Option<AllowedOptimizations>,
schema: Option<Arc<dyn UdfSchema>>,
name: Option<&'static str>,
) -> LazyFrame
where
F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
{
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.map(
function,
optimizations.unwrap_or_default(),
schema,
name.unwrap_or("ANONYMOUS UDF"),
)
.build();
Self::from_logical_plan(lp, opt_state)
}
pub(crate) fn map_private(self, function: FunctionNode) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().map_private(function).build();
Self::from_logical_plan(lp, opt_state)
}
pub fn with_row_count(mut self, name: &str, offset: Option<IdxSize>) -> LazyFrame {
let mut add_row_count_in_map = false;
match &mut self.logical_plan {
#[cfg(feature = "csv-file")]
LogicalPlan::CsvScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
}
#[cfg(feature = "ipc")]
LogicalPlan::IpcScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
}
#[cfg(feature = "parquet")]
LogicalPlan::ParquetScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
}
_ => {
add_row_count_in_map = true;
}
}
let name2 = name.to_string();
let udf_schema = move |s: &Schema| {
let new = s.insert_index(0, name2.clone(), IDX_DTYPE).unwrap();
Ok(Arc::new(new))
};
let name = name.to_owned();
let opt = if add_row_count_in_map {
AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
..Default::default()
}
} else {
AllowedOptimizations::default()
};
self.map(
move |df: DataFrame| {
if add_row_count_in_map {
df.with_row_count(&name, offset)
} else {
Ok(df)
}
},
Some(opt),
Some(Arc::new(udf_schema)),
Some("WITH ROW COUNT"),
)
}
#[cfg(feature = "dtype-struct")]
#[cfg_attr(docsrs, doc(cfg(feature = "dtype-struct")))]
pub fn unnest<I: IntoIterator<Item = S>, S: AsRef<str>>(self, cols: I) -> Self {
self.map_private(FunctionNode::Unnest {
columns: Arc::new(cols.into_iter().map(|s| Arc::from(s.as_ref())).collect()),
})
}
}
#[derive(Clone)]
pub struct LazyGroupBy {
pub logical_plan: LogicalPlan,
opt_state: OptState,
keys: Vec<Expr>,
maintain_order: bool,
#[cfg(feature = "dynamic_groupby")]
dynamic_options: Option<DynamicGroupOptions>,
#[cfg(feature = "dynamic_groupby")]
rolling_options: Option<RollingGroupOptions>,
}
impl LazyGroupBy {
pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
#[cfg(feature = "dynamic_groupby")]
let lp = LogicalPlanBuilder::from(self.logical_plan)
.groupby(
self.keys,
aggs,
None,
self.maintain_order,
self.dynamic_options,
self.rolling_options,
)
.build();
#[cfg(not(feature = "dynamic_groupby"))]
let lp = LogicalPlanBuilder::from(self.logical_plan)
.groupby(self.keys, aggs, None, self.maintain_order)
.build();
LazyFrame::from_logical_plan(lp, self.opt_state)
}
pub fn head(self, n: Option<usize>) -> LazyFrame {
let keys = self
.keys
.iter()
.flat_map(|k| expr_to_leaf_column_names(k).into_iter())
.collect::<Vec<_>>();
self.agg([col("*").exclude(&keys).head(n).list().keep_name()])
.explode([col("*").exclude(&keys)])
}
pub fn tail(self, n: Option<usize>) -> LazyFrame {
let keys = self
.keys
.iter()
.flat_map(|k| expr_to_leaf_column_names(k).into_iter())
.collect::<Vec<_>>();
self.agg([col("*").exclude(&keys).tail(n).keep_name()])
.explode([col("*").exclude(&keys)])
}
pub fn apply<F>(self, f: F, schema: SchemaRef) -> LazyFrame
where
F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
{
#[cfg(feature = "dynamic_groupby")]
let options = GroupbyOptions {
dynamic: None,
rolling: None,
slice: None,
};
#[cfg(not(feature = "dynamic_groupby"))]
let options = GroupbyOptions { slice: None };
let lp = LogicalPlan::Aggregate {
input: Box::new(self.logical_plan),
keys: Arc::new(self.keys),
aggs: vec![],
schema,
apply: Some(Arc::new(f)),
maintain_order: self.maintain_order,
options,
};
LazyFrame::from_logical_plan(lp, self.opt_state)
}
}
#[must_use]
pub struct JoinBuilder {
lf: LazyFrame,
how: JoinType,
other: Option<LazyFrame>,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
allow_parallel: bool,
force_parallel: bool,
suffix: Option<String>,
}
impl JoinBuilder {
pub fn new(lf: LazyFrame) -> Self {
Self {
lf,
other: None,
how: JoinType::Inner,
left_on: vec![],
right_on: vec![],
allow_parallel: true,
force_parallel: false,
suffix: None,
}
}
pub fn with(mut self, other: LazyFrame) -> Self {
self.other = Some(other);
self
}
pub fn how(mut self, how: JoinType) -> Self {
self.how = how;
self
}
pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
let on = on.as_ref().to_vec();
self.left_on = on.clone();
self.right_on = on;
self
}
pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
self.left_on = on.as_ref().to_vec();
self
}
pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
self.right_on = on.as_ref().to_vec();
self
}
pub fn allow_parallel(mut self, allow: bool) -> Self {
self.allow_parallel = allow;
self
}
pub fn force_parallel(mut self, allow: bool) -> Self {
self.allow_parallel = allow;
self
}
pub fn suffix<S: AsRef<str>>(mut self, suffix: S) -> Self {
self.suffix = Some(suffix.as_ref().to_string());
self
}
pub fn finish(self) -> LazyFrame {
let mut opt_state = self.lf.opt_state;
let other = self.other.expect("with not set");
opt_state.file_caching |= other.opt_state.file_caching;
let suffix = match self.suffix {
None => Cow::Borrowed("_right"),
Some(suffix) => Cow::Owned(suffix),
};
let lp = self
.lf
.get_plan_builder()
.join(
other.logical_plan,
self.left_on,
self.right_on,
JoinOptions {
allow_parallel: self.allow_parallel,
force_parallel: self.force_parallel,
how: self.how,
suffix,
..Default::default()
},
)
.build();
LazyFrame::from_logical_plan(lp, opt_state)
}
}