use std::fmt::Formatter;
use std::iter::FlatMap;
use std::sync::Arc;
use polars_core::prelude::*;
use crate::logical_plan::iterator::ArenaExprIter;
use crate::logical_plan::Context;
use crate::prelude::names::COUNT;
use crate::prelude::*;
pub fn column_delimited(mut s: String, items: &[String]) -> String {
s.push('(');
for c in items {
s.push_str(c);
s.push_str(", ");
}
s.pop();
s.pop();
s.push(')');
s
}
pub(crate) fn fmt_column_delimited<S: AsRef<str>>(
f: &mut Formatter<'_>,
items: &[S],
container_start: &str,
container_end: &str,
) -> std::fmt::Result {
write!(f, "{container_start}")?;
for (i, c) in items.iter().enumerate() {
write!(f, "{}", c.as_ref())?;
if i != (items.len() - 1) {
write!(f, ", ")?;
}
}
write!(f, "{container_end}")
}
pub trait PushNode {
fn push_node(&mut self, value: Node);
}
impl PushNode for Vec<Node> {
fn push_node(&mut self, value: Node) {
self.push(value)
}
}
impl PushNode for [Option<Node>; 2] {
fn push_node(&mut self, value: Node) {
match self {
[None, None] => self[0] = Some(value),
[Some(_), None] => self[1] = Some(value),
_ => panic!("cannot push more than 2 nodes"),
}
}
}
impl PushNode for [Option<Node>; 1] {
fn push_node(&mut self, value: Node) {
match self {
[None] => self[0] = Some(value),
_ => panic!("cannot push more than 1 node"),
}
}
}
pub(crate) fn is_scan(plan: &ALogicalPlan) -> bool {
match plan {
#[cfg(feature = "csv-file")]
ALogicalPlan::CsvScan { .. } => true,
ALogicalPlan::DataFrameScan { .. } => true,
#[cfg(feature = "parquet")]
ALogicalPlan::ParquetScan { .. } => true,
_ => false,
}
}
impl PushNode for &mut [Option<Node>] {
fn push_node(&mut self, value: Node) {
if self[0].is_some() {
self[1] = Some(value)
} else {
self[0] = Some(value)
}
}
}
pub(crate) fn aexpr_is_simple_projection(current_node: Node, arena: &Arena<AExpr>) -> bool {
arena
.iter(current_node)
.all(|(_node, e)| matches!(e, AExpr::Column(_) | AExpr::Alias(_, _)))
}
pub fn has_aexpr<F>(current_node: Node, arena: &Arena<AExpr>, matches: F) -> bool
where
F: Fn(&AExpr) -> bool,
{
arena.iter(current_node).any(|(_node, e)| matches(e))
}
pub fn has_aexpr_window(current_node: Node, arena: &Arena<AExpr>) -> bool {
has_aexpr(current_node, arena, |e| matches!(e, AExpr::Window { .. }))
}
pub(crate) fn has_expr<F>(current_expr: &Expr, matches: F) -> bool
where
F: Fn(&Expr) -> bool,
{
current_expr.into_iter().any(matches)
}
#[cfg(feature = "is_in")]
pub(crate) fn has_root_literal_expr(e: &Expr) -> bool {
match e {
Expr::Literal(_) => true,
_ => {
let roots = expr_to_root_column_exprs(e);
roots.iter().any(|e| matches!(e, Expr::Literal(_)))
}
}
}
pub fn has_null(current_expr: &Expr) -> bool {
has_expr(current_expr, |e| {
matches!(e, Expr::Literal(LiteralValue::Null))
})
}
pub(crate) fn expr_output_name(expr: &Expr) -> PolarsResult<Arc<str>> {
for e in expr {
match e {
Expr::Window { function, .. } => return expr_output_name(function),
Expr::Column(name) => return Ok(name.clone()),
Expr::Alias(_, name) => return Ok(name.clone()),
Expr::KeepName(_) | Expr::Wildcard | Expr::RenameAlias { .. } => {
return Err(PolarsError::ComputeError(
"Cannot determine an output column without a context for this expression"
.into(),
))
}
Expr::Columns(_) | Expr::DtypeColumn(_) => {
return Err(PolarsError::ComputeError(
"This expression might produce multiple output names".into(),
))
}
Expr::Count => return Ok(Arc::from(COUNT)),
_ => {}
}
}
Err(PolarsError::ComputeError(
format!(
"No root column name could be found for expr '{expr:?}' when calling 'output_name'",
)
.into(),
))
}
pub(crate) fn get_single_leaf(expr: &Expr) -> PolarsResult<Arc<str>> {
for e in expr {
match e {
Expr::Filter { input, .. } => return get_single_leaf(input),
Expr::Take { expr, .. } => return get_single_leaf(expr),
Expr::SortBy { expr, .. } => return get_single_leaf(expr),
Expr::Window { function, .. } => return get_single_leaf(function),
Expr::Column(name) => return Ok(name.clone()),
_ => {}
}
}
Err(PolarsError::ComputeError(
format!("no single leaf column found in {expr:?}").into(),
))
}
pub fn expr_to_leaf_column_names(expr: &Expr) -> Vec<Arc<str>> {
expr_to_root_column_exprs(expr)
.into_iter()
.map(|e| expr_to_leaf_column_name(&e).unwrap())
.collect()
}
pub fn expr_to_leaf_column_name(expr: &Expr) -> PolarsResult<Arc<str>> {
let mut roots = expr_to_root_column_exprs(expr);
match roots.len() {
0 => Err(PolarsError::ComputeError(
"no root column name found".into(),
)),
1 => match roots.pop().unwrap() {
Expr::Wildcard => Err(PolarsError::ComputeError(
"wildcard has not root column name".into(),
)),
Expr::Column(name) => Ok(name),
_ => {
unreachable!();
}
},
_ => Err(PolarsError::ComputeError(
"found more than one root column name".into(),
)),
}
}
fn is_leaf_aexpr(ae: &AExpr) -> bool {
matches!(ae, AExpr::Column(_) | AExpr::Wildcard)
}
#[allow(clippy::type_complexity)]
pub(crate) fn aexpr_to_leaf_nodes_iter<'a>(
root: Node,
arena: &'a Arena<AExpr>,
) -> FlatMap<AExprIter<'a>, Option<Node>, fn((Node, &'a AExpr)) -> Option<Node>> {
arena.iter(root).flat_map(
|(node, ae)| {
if is_leaf_aexpr(ae) {
Some(node)
} else {
None
}
},
)
}
pub(crate) fn aexpr_to_leaf_nodes(root: Node, arena: &Arena<AExpr>) -> Vec<Node> {
aexpr_to_leaf_nodes_iter(root, arena).collect()
}
pub(crate) fn rename_aexpr_leaf_names(
node: Node,
arena: &mut Arena<AExpr>,
new_name: Arc<str>,
) -> Node {
let mut new_expr = node_to_expr(node, arena);
new_expr.mutate().apply(|e| {
if let Expr::Column(name) = e {
*name = new_name.clone()
}
true
});
to_aexpr(new_expr, arena)
}
pub(crate) fn aexpr_assign_renamed_root(
node: Node,
arena: &mut Arena<AExpr>,
current: &str,
new_name: &str,
) -> Node {
let roots = aexpr_to_leaf_nodes(node, arena);
for node in roots {
match arena.get(node) {
AExpr::Column(name) if &**name == current => {
return arena.add(AExpr::Column(Arc::from(new_name)))
}
_ => {}
}
}
panic!("should be a root column that is renamed");
}
pub(crate) fn expr_to_root_column_exprs(expr: &Expr) -> Vec<Expr> {
let mut out = vec![];
expr.into_iter().for_each(|e| match e {
Expr::Column(_) | Expr::Wildcard => {
out.push(e.clone());
}
_ => {}
});
out
}
pub fn expressions_to_schema(
expr: &[Expr],
schema: &Schema,
ctxt: Context,
) -> PolarsResult<Schema> {
let mut expr_arena = Arena::with_capacity(4 * expr.len());
let fields = expr
.iter()
.map(|expr| expr.to_field_amortized(schema, ctxt, &mut expr_arena));
Schema::try_from_fallible(fields)
}
pub fn aexpr_to_leaf_names_iter(
node: Node,
arena: &Arena<AExpr>,
) -> impl Iterator<Item = Arc<str>> + '_ {
aexpr_to_leaf_nodes_iter(node, arena).map(|node| match arena.get(node) {
AExpr::Column(name) => name.clone(),
e => {
panic!("{e:?} not expected")
}
})
}
pub fn aexpr_to_leaf_names(node: Node, arena: &Arena<AExpr>) -> Vec<Arc<str>> {
aexpr_to_leaf_names_iter(node, arena).collect()
}
pub(crate) fn check_input_node(
node: Node,
input_schema: &Schema,
expr_arena: &Arena<AExpr>,
) -> bool {
aexpr_to_leaf_names(node, expr_arena)
.iter()
.all(|name| input_schema.index_of(name).is_some())
}
pub(crate) fn aexprs_to_schema(
expr: &[Node],
schema: &Schema,
ctxt: Context,
arena: &Arena<AExpr>,
) -> Schema {
let fields = expr
.iter()
.map(|expr| arena.get(*expr).to_field(schema, ctxt, arena).unwrap());
Schema::from(fields)
}
pub fn combine_predicates_expr<I>(iter: I) -> Expr
where
I: Iterator<Item = Expr>,
{
let mut single_pred = None;
for expr in iter {
single_pred = match single_pred {
None => Some(expr),
Some(e) => Some(e.and(expr)),
};
}
single_pred.expect("an empty iterator was passed")
}
pub fn expr_is_projected_upstream(
e: &Node,
input: Node,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &Arena<AExpr>,
projected_names: &PlHashSet<Arc<str>>,
) -> bool {
let input_schema = lp_arena.get(input).schema(lp_arena);
let output_field = expr_arena
.get(*e)
.to_field(input_schema.as_ref(), Context::Default, expr_arena)
.unwrap();
let output_name = output_field.name();
projected_names.contains(output_name.as_str())
}