use std::borrow::Cow;
use polars_core::prelude::*;
use polars_plan::constants::CSE_REPLACED;
use super::*;
use crate::expressions::{AggregationContext, PhysicalExpr};
#[derive(Debug)]
pub struct ColumnExpr {
name: PlSmallStr,
expr: Expr,
schema: SchemaRef,
}
impl ColumnExpr {
pub fn new(name: PlSmallStr, expr: Expr, schema: SchemaRef) -> Self {
Self { name, expr, schema }
}
}
impl ColumnExpr {
fn check_external_context(
&self,
out: PolarsResult<Column>,
state: &ExecutionState,
) -> PolarsResult<Column> {
match out {
Ok(col) => Ok(col),
Err(e) => {
if state.ext_contexts.is_empty() {
Err(e)
} else {
for df in state.ext_contexts.as_ref() {
let out = df.column(&self.name);
if out.is_ok() {
return out.cloned();
}
}
Err(e)
}
},
}
}
fn process_by_idx(
&self,
out: &Column,
_state: &ExecutionState,
_schema: &Schema,
df: &DataFrame,
check_state_schema: bool,
) -> PolarsResult<Column> {
if out.name() != &*self.name {
if check_state_schema {
if let Some(schema) = _state.get_schema() {
return self.process_from_state_schema(df, _state, &schema);
}
}
df.column(&self.name).cloned()
} else {
Ok(out.clone())
}
}
fn process_by_linear_search(
&self,
df: &DataFrame,
_state: &ExecutionState,
_panic_during_test: bool,
) -> PolarsResult<Column> {
df.column(&self.name).cloned()
}
fn process_from_state_schema(
&self,
df: &DataFrame,
state: &ExecutionState,
schema: &Schema,
) -> PolarsResult<Column> {
match schema.get_full(&self.name) {
None => self.process_by_linear_search(df, state, true),
Some((idx, _, _)) => match df.columns().get(idx) {
Some(out) => self.process_by_idx(out, state, schema, df, false),
None => self.process_by_linear_search(df, state, true),
},
}
}
fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult<Column> {
let offset = schema.len();
let columns = &df.columns()[offset..];
Ok(columns
.iter()
.find(|s| s.name() == &self.name)
.unwrap()
.clone())
}
}
impl PhysicalExpr for ColumnExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let out = match self.schema.get_full(&self.name) {
Some((idx, _, _)) => {
match df.columns().get(idx) {
Some(out) => self.process_by_idx(out, state, &self.schema, df, true),
None => {
if let Some(schema) = state.get_schema() {
self.process_from_state_schema(df, state, &schema)
} else {
self.process_by_linear_search(df, state, true)
}
},
}
},
None => {
if self.name.starts_with(CSE_REPLACED) {
return self.process_cse(df, &self.schema);
}
self.process_by_linear_search(df, state, true)
},
};
self.check_external_context(out, state)
}
#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
df: &DataFrame,
groups: &'a GroupPositions,
state: &ExecutionState,
) -> PolarsResult<AggregationContext<'a>> {
let c = self.evaluate(df, state)?;
Ok(AggregationContext::new(c, Cow::Borrowed(groups), false))
}
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
input_schema.get_field(&self.name).ok_or_else(|| {
polars_err!(
ColumnNotFound: "could not find {:?} in schema: {:?}", self.name, &input_schema
)
})
}
fn is_scalar(&self) -> bool {
false
}
fn as_column(&self) -> Option<PlSmallStr> {
Some(self.name.clone())
}
}