Skip to main content

datafusion_physical_expr/statistics/
stats_solver.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DAG-based statistics propagation for the Statistics V2 framework.
19//!
20//! All public items in this module are **deprecated** as of `54.0.0`.
21//! See <https://github.com/apache/datafusion/pull/22071> for details.
22
23#![allow(deprecated)]
24
25use std::sync::Arc;
26
27use crate::expressions::Literal;
28use crate::intervals::cp_solver::PropagationResult;
29use crate::physical_expr::PhysicalExpr;
30use crate::utils::{ExprTreeNode, build_dag};
31
32use arrow::datatypes::{DataType, Schema};
33use datafusion_common::{Result, ScalarValue};
34use datafusion_expr::statistics::Distribution;
35use datafusion_expr_common::interval_arithmetic::Interval;
36
37use petgraph::Outgoing;
38use petgraph::adj::DefaultIx;
39use petgraph::prelude::Bfs;
40use petgraph::stable_graph::{NodeIndex, StableGraph};
41use petgraph::visit::DfsPostOrder;
42
43/// This object implements a directed acyclic expression graph (DAEG) that
44/// is used to compute statistics/distributions for expressions hierarchically.
45#[deprecated(
46    since = "54.0.0",
47    note = "Part of the unused Statistics V2 framework; see https://github.com/apache/datafusion/pull/22071"
48)]
49#[derive(Clone, Debug)]
50pub struct ExprStatisticsGraph {
51    graph: StableGraph<ExprStatisticsGraphNode, usize>,
52    root: NodeIndex,
53}
54
55/// This is a node in the DAEG; it encapsulates a reference to the actual
56/// [`PhysicalExpr`] as well as its statistics/distribution.
57#[deprecated(
58    since = "54.0.0",
59    note = "Part of the unused Statistics V2 framework; see https://github.com/apache/datafusion/pull/22071"
60)]
61#[derive(Clone, Debug)]
62pub struct ExprStatisticsGraphNode {
63    expr: Arc<dyn PhysicalExpr>,
64    dist: Distribution,
65}
66
67impl ExprStatisticsGraphNode {
68    /// Constructs a new DAEG node based on the given interval with a
69    /// `Uniform` distribution.
70    fn new_uniform(expr: Arc<dyn PhysicalExpr>, interval: Interval) -> Result<Self> {
71        Distribution::new_uniform(interval)
72            .map(|dist| ExprStatisticsGraphNode { expr, dist })
73    }
74
75    /// Constructs a new DAEG node with a `Bernoulli` distribution having an
76    /// unknown success probability.
77    fn new_bernoulli(expr: Arc<dyn PhysicalExpr>) -> Result<Self> {
78        Distribution::new_bernoulli(ScalarValue::Float64(None))
79            .map(|dist| ExprStatisticsGraphNode { expr, dist })
80    }
81
82    /// Constructs a new DAEG node with a `Generic` distribution having no
83    /// definite summary statistics.
84    fn new_generic(expr: Arc<dyn PhysicalExpr>, dt: &DataType) -> Result<Self> {
85        let interval = Interval::make_unbounded(dt)?;
86        let dist = Distribution::new_from_interval(interval)?;
87        Ok(ExprStatisticsGraphNode { expr, dist })
88    }
89
90    /// Get the [`Distribution`] object representing the statistics of the
91    /// expression.
92    pub fn distribution(&self) -> &Distribution {
93        &self.dist
94    }
95
96    /// This function creates a DAEG node from DataFusion's [`ExprTreeNode`]
97    /// object. Literals are created with `Uniform` distributions with a
98    /// definite, singleton interval. Expressions with a `Boolean` data type
99    /// result in a`Bernoulli` distribution with an unknown success probability.
100    /// Any other expression starts with an `Unknown` distribution with an
101    /// indefinite range (i.e. `[-∞, ∞]`).
102    pub fn make_node(node: &ExprTreeNode<NodeIndex>, schema: &Schema) -> Result<Self> {
103        let expr = Arc::clone(&node.expr);
104        if let Some(literal) = expr.downcast_ref::<Literal>() {
105            let value = literal.value();
106            Interval::try_new(value.clone(), value.clone())
107                .and_then(|interval| Self::new_uniform(expr, interval))
108        } else {
109            expr.data_type(schema).and_then(|dt| {
110                if dt.eq(&DataType::Boolean) {
111                    Self::new_bernoulli(expr)
112                } else {
113                    Self::new_generic(expr, &dt)
114                }
115            })
116        }
117    }
118}
119
120impl ExprStatisticsGraph {
121    pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &Schema) -> Result<Self> {
122        // Build the full graph:
123        let (root, graph) = build_dag(expr, &|node| {
124            ExprStatisticsGraphNode::make_node(node, schema)
125        })?;
126        Ok(Self { graph, root })
127    }
128
129    /// This function assigns given distributions to expressions in the DAEG.
130    /// The argument `assignments` associates indices of sought expressions
131    /// with their corresponding new distributions.
132    pub fn assign_statistics(&mut self, assignments: &[(usize, Distribution)]) {
133        for (index, stats) in assignments {
134            let node_index = NodeIndex::from(*index as DefaultIx);
135            self.graph[node_index].dist = stats.clone();
136        }
137    }
138
139    /// Computes statistics/distributions for an expression via a bottom-up
140    /// traversal.
141    pub fn evaluate_statistics(&mut self) -> Result<&Distribution> {
142        let mut dfs = DfsPostOrder::new(&self.graph, self.root);
143        while let Some(idx) = dfs.next(&self.graph) {
144            let neighbors = self.graph.neighbors_directed(idx, Outgoing);
145            let mut children_statistics = neighbors
146                .map(|child| self.graph[child].distribution())
147                .collect::<Vec<_>>();
148            // Note that all distributions are assumed to be independent.
149            if !children_statistics.is_empty() {
150                // Reverse to align with `PhysicalExpr`'s children:
151                children_statistics.reverse();
152                self.graph[idx].dist = self.graph[idx]
153                    .expr
154                    .evaluate_statistics(&children_statistics)?;
155            }
156        }
157        Ok(self.graph[self.root].distribution())
158    }
159
160    /// Runs a propagation mechanism in a top-down manner to update statistics
161    /// of leaf nodes.
162    pub fn propagate_statistics(
163        &mut self,
164        given_stats: Distribution,
165    ) -> Result<PropagationResult> {
166        // Adjust the root node with the given statistics:
167        let root_range = self.graph[self.root].dist.range()?;
168        let given_range = given_stats.range()?;
169        if let Some(interval) = root_range.intersect(&given_range)? {
170            if interval != root_range {
171                // If the given statistics enable us to obtain a more precise
172                // range for the root, update it:
173                let subset = root_range.contains(given_range)?;
174                self.graph[self.root].dist = if subset == Interval::TRUE {
175                    // Given statistics is strictly more informative, use it as is:
176                    given_stats
177                } else {
178                    // Intersecting ranges gives us a more precise range:
179                    Distribution::new_from_interval(interval)?
180                };
181            }
182        } else {
183            return Ok(PropagationResult::Infeasible);
184        }
185
186        let mut bfs = Bfs::new(&self.graph, self.root);
187
188        while let Some(node) = bfs.next(&self.graph) {
189            let neighbors = self.graph.neighbors_directed(node, Outgoing);
190            let mut children = neighbors.collect::<Vec<_>>();
191            // If the current expression is a leaf, its statistics is now final.
192            // So, just continue with the propagation procedure:
193            if children.is_empty() {
194                continue;
195            }
196            // Reverse to align with `PhysicalExpr`'s children:
197            children.reverse();
198            let children_stats = children
199                .iter()
200                .map(|child| self.graph[*child].distribution())
201                .collect::<Vec<_>>();
202            let node_statistics = self.graph[node].distribution();
203            let propagated_statistics = self.graph[node]
204                .expr
205                .propagate_statistics(node_statistics, &children_stats)?;
206            if let Some(propagated_stats) = propagated_statistics {
207                for (child_idx, stats) in children.into_iter().zip(propagated_stats) {
208                    self.graph[child_idx].dist = stats;
209                }
210            } else {
211                // The constraint is infeasible, report:
212                return Ok(PropagationResult::Infeasible);
213            }
214        }
215        Ok(PropagationResult::Success)
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::sync::Arc;
222
223    use crate::expressions::{Column, binary, try_cast};
224    use crate::intervals::cp_solver::PropagationResult;
225    use crate::statistics::stats_solver::ExprStatisticsGraph;
226
227    use arrow::datatypes::{DataType, Field, Schema};
228    use datafusion_common::{Result, ScalarValue};
229    use datafusion_expr_common::interval_arithmetic::Interval;
230    use datafusion_expr_common::operator::Operator;
231    use datafusion_expr_common::statistics::Distribution;
232    use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
233    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
234
235    pub fn binary_expr(
236        left: Arc<dyn PhysicalExpr>,
237        op: Operator,
238        right: Arc<dyn PhysicalExpr>,
239        schema: &Schema,
240    ) -> Result<Arc<dyn PhysicalExpr>> {
241        let left_type = left.data_type(schema)?;
242        let right_type = right.data_type(schema)?;
243        let binary_type_coercer = BinaryTypeCoercer::new(&left_type, &op, &right_type);
244        let (lhs, rhs) = binary_type_coercer.get_input_types()?;
245
246        let left_expr = try_cast(left, schema, lhs)?;
247        let right_expr = try_cast(right, schema, rhs)?;
248        binary(left_expr, op, right_expr, schema)
249    }
250
251    #[test]
252    fn test_stats_integration() -> Result<()> {
253        let schema = &Schema::new(vec![
254            Field::new("a", DataType::Float64, false),
255            Field::new("b", DataType::Float64, false),
256            Field::new("c", DataType::Float64, false),
257            Field::new("d", DataType::Float64, false),
258        ]);
259
260        let a = Arc::new(Column::new("a", 0)) as _;
261        let b = Arc::new(Column::new("b", 1)) as _;
262        let c = Arc::new(Column::new("c", 2)) as _;
263        let d = Arc::new(Column::new("d", 3)) as _;
264
265        let left = binary_expr(a, Operator::Plus, b, schema)?;
266        let right = binary_expr(c, Operator::Minus, d, schema)?;
267        let expr = binary_expr(left, Operator::Eq, right, schema)?;
268
269        let mut graph = ExprStatisticsGraph::try_new(expr, schema)?;
270        // 2, 5 and 6 are BinaryExpr
271        graph.assign_statistics(&[
272            (
273                0usize,
274                Distribution::new_uniform(Interval::make(Some(0.), Some(1.))?)?,
275            ),
276            (
277                1usize,
278                Distribution::new_uniform(Interval::make(Some(0.), Some(2.))?)?,
279            ),
280            (
281                3usize,
282                Distribution::new_uniform(Interval::make(Some(1.), Some(3.))?)?,
283            ),
284            (
285                4usize,
286                Distribution::new_uniform(Interval::make(Some(1.), Some(5.))?)?,
287            ),
288        ]);
289        let ev_stats = graph.evaluate_statistics()?;
290        assert_eq!(
291            ev_stats,
292            &Distribution::new_bernoulli(ScalarValue::Float64(None))?
293        );
294
295        let one = ScalarValue::new_one(&DataType::Float64)?;
296        assert_eq!(
297            graph.propagate_statistics(Distribution::new_bernoulli(one)?)?,
298            PropagationResult::Success
299        );
300        Ok(())
301    }
302}