polars_plan/plans/
iterator.rs

1use std::sync::Arc;
2
3use polars_core::error::PolarsResult;
4use polars_utils::idx_vec::UnitVec;
5use polars_utils::unitvec;
6use visitor::{RewritingVisitor, TreeWalker};
7
8use crate::prelude::*;
9
10macro_rules! push_expr {
11    ($current_expr:expr, $c:ident, $push:ident, $push_owned:ident, $iter:ident) => {{
12        use Expr::*;
13        match $current_expr {
14            DataTypeFunction(_) | Column(_) | Literal(_) | Len => {},
15            #[cfg(feature = "dtype-struct")]
16            Field(_) => {},
17            Alias(e, _) => $push($c, e),
18            BinaryExpr { left, op: _, right } => {
19                // reverse order so that left is popped first
20                $push($c, right);
21                $push($c, left);
22            },
23            Cast { expr, .. } => $push($c, expr),
24            Sort { expr, .. } => $push($c, expr),
25            Gather { expr, idx, .. } => {
26                $push($c, idx);
27                $push($c, expr);
28            },
29            Filter { input, by } => {
30                $push($c, by);
31                // latest, so that it is popped first
32                $push($c, input);
33            },
34            SortBy { expr, by, .. } => {
35                for e in by {
36                    $push_owned($c, e)
37                }
38                // latest, so that it is popped first
39                $push($c, expr);
40            },
41            Agg(agg_e) => {
42                use AggExpr::*;
43                match agg_e {
44                    Max { input, .. } => $push($c, input),
45                    Min { input, .. } => $push($c, input),
46                    Mean(e) => $push($c, e),
47                    Median(e) => $push($c, e),
48                    NUnique(e) => $push($c, e),
49                    First(e) => $push($c, e),
50                    Last(e) => $push($c, e),
51                    Implode(e) => $push($c, e),
52                    Count(e, _) => $push($c, e),
53                    Quantile { expr, .. } => $push($c, expr),
54                    Sum(e) => $push($c, e),
55                    AggGroups(e) => $push($c, e),
56                    Std(e, _) => $push($c, e),
57                    Var(e, _) => $push($c, e),
58                }
59            },
60            Ternary {
61                truthy,
62                falsy,
63                predicate,
64            } => {
65                $push($c, predicate);
66                $push($c, falsy);
67                // latest, so that it is popped first
68                $push($c, truthy);
69            },
70            // we iterate in reverse order, so that the lhs is popped first and will be found
71            // as the root columns/ input columns by `_suffix` and `_keep_name` etc.
72            AnonymousFunction { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),
73            Eval {
74                expr, evaluation, ..
75            } => {
76                $push($c, evaluation);
77                $push($c, expr);
78            },
79            Function { input, .. } => input.$iter().rev().for_each(|e| $push_owned($c, e)),
80            Explode { input, .. } => $push($c, input),
81            Window {
82                function,
83                partition_by,
84                ..
85            } => {
86                for e in partition_by.into_iter().rev() {
87                    $push_owned($c, e)
88                }
89                // latest so that it is popped first
90                $push($c, function);
91            },
92            Slice {
93                input,
94                offset,
95                length,
96            } => {
97                $push($c, length);
98                $push($c, offset);
99                // latest, so that it is popped first
100                $push($c, input);
101            },
102            KeepName(e) => $push($c, e),
103            RenameAlias { expr, .. } => $push($c, expr),
104            SubPlan { .. } => {},
105            // pass
106            Selector(_) => {},
107        }
108    }};
109}
110
111pub struct ExprIter<'a> {
112    stack: UnitVec<&'a Expr>,
113}
114
115impl<'a> Iterator for ExprIter<'a> {
116    type Item = &'a Expr;
117
118    fn next(&mut self) -> Option<Self::Item> {
119        self.stack
120            .pop()
121            .inspect(|current_expr| current_expr.nodes(&mut self.stack))
122    }
123}
124
125pub struct ExprMapper<F> {
126    f: F,
127}
128
129impl<F: FnMut(Expr) -> PolarsResult<Expr>> RewritingVisitor for ExprMapper<F> {
130    type Node = Expr;
131    type Arena = ();
132
133    fn mutate(&mut self, node: Self::Node, _arena: &mut Self::Arena) -> PolarsResult<Self::Node> {
134        (self.f)(node)
135    }
136}
137
138impl Expr {
139    pub fn nodes<'a>(&'a self, container: &mut UnitVec<&'a Expr>) {
140        let push = |c: &mut UnitVec<&'a Expr>, e: &'a Expr| c.push(e);
141        push_expr!(self, container, push, push, iter);
142    }
143
144    pub fn nodes_owned(self, container: &mut Vec<Expr>) {
145        let push_arc = |c: &mut Vec<Expr>, e: Arc<Expr>| c.push(Arc::unwrap_or_clone(e));
146        let push_owned = |c: &mut Vec<Expr>, e: Expr| c.push(e);
147        push_expr!(self, container, push_arc, push_owned, into_iter);
148    }
149
150    pub fn map_expr<F: FnMut(Self) -> Self>(self, mut f: F) -> Self {
151        self.rewrite(&mut ExprMapper { f: |e| Ok(f(e)) }, &mut ())
152            .unwrap()
153    }
154
155    pub fn try_map_expr<F: FnMut(Self) -> PolarsResult<Self>>(self, f: F) -> PolarsResult<Self> {
156        self.rewrite(&mut ExprMapper { f }, &mut ())
157    }
158}
159
160impl<'a> IntoIterator for &'a Expr {
161    type Item = &'a Expr;
162    type IntoIter = ExprIter<'a>;
163
164    fn into_iter(self) -> Self::IntoIter {
165        let stack = unitvec!(self);
166        ExprIter { stack }
167    }
168}
169
170pub struct AExprIter<'a> {
171    stack: UnitVec<Node>,
172    arena: Option<&'a Arena<AExpr>>,
173}
174
175impl<'a> Iterator for AExprIter<'a> {
176    type Item = (Node, &'a AExpr);
177
178    fn next(&mut self) -> Option<Self::Item> {
179        self.stack.pop().map(|node| {
180            // take the arena because the bchk doesn't allow a mutable borrow to the field.
181            let arena = self.arena.unwrap();
182            let current_expr = arena.get(node);
183            current_expr.inputs_rev(&mut self.stack);
184
185            self.arena = Some(arena);
186            (node, current_expr)
187        })
188    }
189}
190
191pub trait ArenaExprIter<'a> {
192    fn iter(&'a self, root: Node) -> AExprIter<'a>;
193}
194
195impl<'a> ArenaExprIter<'a> for Arena<AExpr> {
196    fn iter(&'a self, root: Node) -> AExprIter<'a> {
197        let stack = unitvec![root];
198        AExprIter {
199            stack,
200            arena: Some(self),
201        }
202    }
203}
204
205pub struct AlpIter<'a> {
206    stack: UnitVec<Node>,
207    arena: &'a Arena<IR>,
208}
209
210pub trait ArenaLpIter<'a> {
211    fn iter(&'a self, root: Node) -> AlpIter<'a>;
212}
213
214impl<'a> ArenaLpIter<'a> for Arena<IR> {
215    fn iter(&'a self, root: Node) -> AlpIter<'a> {
216        let stack = unitvec![root];
217        AlpIter { stack, arena: self }
218    }
219}
220
221impl<'a> Iterator for AlpIter<'a> {
222    type Item = (Node, &'a IR);
223
224    fn next(&mut self) -> Option<Self::Item> {
225        self.stack.pop().map(|node| {
226            let lp = self.arena.get(node);
227            lp.copy_inputs(&mut self.stack);
228            (node, lp)
229        })
230    }
231}