datafusion_physical_expr/statistics/
stats_solver.rs1#![allow(deprecated)]
24
25use std::sync::Arc;
26
27use crate::expressions::Literal;
28use crate::intervals::cp_solver::PropagationResult;
29use crate::physical_expr::PhysicalExpr;
30use crate::utils::{ExprTreeNode, build_dag};
31
32use arrow::datatypes::{DataType, Schema};
33use datafusion_common::{Result, ScalarValue};
34use datafusion_expr::statistics::Distribution;
35use datafusion_expr_common::interval_arithmetic::Interval;
36
37use petgraph::Outgoing;
38use petgraph::adj::DefaultIx;
39use petgraph::prelude::Bfs;
40use petgraph::stable_graph::{NodeIndex, StableGraph};
41use petgraph::visit::DfsPostOrder;
42
43#[deprecated(
46 since = "54.0.0",
47 note = "Part of the unused Statistics V2 framework; see https://github.com/apache/datafusion/pull/22071"
48)]
49#[derive(Clone, Debug)]
50pub struct ExprStatisticsGraph {
51 graph: StableGraph<ExprStatisticsGraphNode, usize>,
52 root: NodeIndex,
53}
54
55#[deprecated(
58 since = "54.0.0",
59 note = "Part of the unused Statistics V2 framework; see https://github.com/apache/datafusion/pull/22071"
60)]
61#[derive(Clone, Debug)]
62pub struct ExprStatisticsGraphNode {
63 expr: Arc<dyn PhysicalExpr>,
64 dist: Distribution,
65}
66
67impl ExprStatisticsGraphNode {
68 fn new_uniform(expr: Arc<dyn PhysicalExpr>, interval: Interval) -> Result<Self> {
71 Distribution::new_uniform(interval)
72 .map(|dist| ExprStatisticsGraphNode { expr, dist })
73 }
74
75 fn new_bernoulli(expr: Arc<dyn PhysicalExpr>) -> Result<Self> {
78 Distribution::new_bernoulli(ScalarValue::Float64(None))
79 .map(|dist| ExprStatisticsGraphNode { expr, dist })
80 }
81
82 fn new_generic(expr: Arc<dyn PhysicalExpr>, dt: &DataType) -> Result<Self> {
85 let interval = Interval::make_unbounded(dt)?;
86 let dist = Distribution::new_from_interval(interval)?;
87 Ok(ExprStatisticsGraphNode { expr, dist })
88 }
89
90 pub fn distribution(&self) -> &Distribution {
93 &self.dist
94 }
95
96 pub fn make_node(node: &ExprTreeNode<NodeIndex>, schema: &Schema) -> Result<Self> {
103 let expr = Arc::clone(&node.expr);
104 if let Some(literal) = expr.downcast_ref::<Literal>() {
105 let value = literal.value();
106 Interval::try_new(value.clone(), value.clone())
107 .and_then(|interval| Self::new_uniform(expr, interval))
108 } else {
109 expr.data_type(schema).and_then(|dt| {
110 if dt.eq(&DataType::Boolean) {
111 Self::new_bernoulli(expr)
112 } else {
113 Self::new_generic(expr, &dt)
114 }
115 })
116 }
117 }
118}
119
120impl ExprStatisticsGraph {
121 pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &Schema) -> Result<Self> {
122 let (root, graph) = build_dag(expr, &|node| {
124 ExprStatisticsGraphNode::make_node(node, schema)
125 })?;
126 Ok(Self { graph, root })
127 }
128
129 pub fn assign_statistics(&mut self, assignments: &[(usize, Distribution)]) {
133 for (index, stats) in assignments {
134 let node_index = NodeIndex::from(*index as DefaultIx);
135 self.graph[node_index].dist = stats.clone();
136 }
137 }
138
139 pub fn evaluate_statistics(&mut self) -> Result<&Distribution> {
142 let mut dfs = DfsPostOrder::new(&self.graph, self.root);
143 while let Some(idx) = dfs.next(&self.graph) {
144 let neighbors = self.graph.neighbors_directed(idx, Outgoing);
145 let mut children_statistics = neighbors
146 .map(|child| self.graph[child].distribution())
147 .collect::<Vec<_>>();
148 if !children_statistics.is_empty() {
150 children_statistics.reverse();
152 self.graph[idx].dist = self.graph[idx]
153 .expr
154 .evaluate_statistics(&children_statistics)?;
155 }
156 }
157 Ok(self.graph[self.root].distribution())
158 }
159
160 pub fn propagate_statistics(
163 &mut self,
164 given_stats: Distribution,
165 ) -> Result<PropagationResult> {
166 let root_range = self.graph[self.root].dist.range()?;
168 let given_range = given_stats.range()?;
169 if let Some(interval) = root_range.intersect(&given_range)? {
170 if interval != root_range {
171 let subset = root_range.contains(given_range)?;
174 self.graph[self.root].dist = if subset == Interval::TRUE {
175 given_stats
177 } else {
178 Distribution::new_from_interval(interval)?
180 };
181 }
182 } else {
183 return Ok(PropagationResult::Infeasible);
184 }
185
186 let mut bfs = Bfs::new(&self.graph, self.root);
187
188 while let Some(node) = bfs.next(&self.graph) {
189 let neighbors = self.graph.neighbors_directed(node, Outgoing);
190 let mut children = neighbors.collect::<Vec<_>>();
191 if children.is_empty() {
194 continue;
195 }
196 children.reverse();
198 let children_stats = children
199 .iter()
200 .map(|child| self.graph[*child].distribution())
201 .collect::<Vec<_>>();
202 let node_statistics = self.graph[node].distribution();
203 let propagated_statistics = self.graph[node]
204 .expr
205 .propagate_statistics(node_statistics, &children_stats)?;
206 if let Some(propagated_stats) = propagated_statistics {
207 for (child_idx, stats) in children.into_iter().zip(propagated_stats) {
208 self.graph[child_idx].dist = stats;
209 }
210 } else {
211 return Ok(PropagationResult::Infeasible);
213 }
214 }
215 Ok(PropagationResult::Success)
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::sync::Arc;
222
223 use crate::expressions::{Column, binary, try_cast};
224 use crate::intervals::cp_solver::PropagationResult;
225 use crate::statistics::stats_solver::ExprStatisticsGraph;
226
227 use arrow::datatypes::{DataType, Field, Schema};
228 use datafusion_common::{Result, ScalarValue};
229 use datafusion_expr_common::interval_arithmetic::Interval;
230 use datafusion_expr_common::operator::Operator;
231 use datafusion_expr_common::statistics::Distribution;
232 use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
233 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
234
235 pub fn binary_expr(
236 left: Arc<dyn PhysicalExpr>,
237 op: Operator,
238 right: Arc<dyn PhysicalExpr>,
239 schema: &Schema,
240 ) -> Result<Arc<dyn PhysicalExpr>> {
241 let left_type = left.data_type(schema)?;
242 let right_type = right.data_type(schema)?;
243 let binary_type_coercer = BinaryTypeCoercer::new(&left_type, &op, &right_type);
244 let (lhs, rhs) = binary_type_coercer.get_input_types()?;
245
246 let left_expr = try_cast(left, schema, lhs)?;
247 let right_expr = try_cast(right, schema, rhs)?;
248 binary(left_expr, op, right_expr, schema)
249 }
250
251 #[test]
252 fn test_stats_integration() -> Result<()> {
253 let schema = &Schema::new(vec![
254 Field::new("a", DataType::Float64, false),
255 Field::new("b", DataType::Float64, false),
256 Field::new("c", DataType::Float64, false),
257 Field::new("d", DataType::Float64, false),
258 ]);
259
260 let a = Arc::new(Column::new("a", 0)) as _;
261 let b = Arc::new(Column::new("b", 1)) as _;
262 let c = Arc::new(Column::new("c", 2)) as _;
263 let d = Arc::new(Column::new("d", 3)) as _;
264
265 let left = binary_expr(a, Operator::Plus, b, schema)?;
266 let right = binary_expr(c, Operator::Minus, d, schema)?;
267 let expr = binary_expr(left, Operator::Eq, right, schema)?;
268
269 let mut graph = ExprStatisticsGraph::try_new(expr, schema)?;
270 graph.assign_statistics(&[
272 (
273 0usize,
274 Distribution::new_uniform(Interval::make(Some(0.), Some(1.))?)?,
275 ),
276 (
277 1usize,
278 Distribution::new_uniform(Interval::make(Some(0.), Some(2.))?)?,
279 ),
280 (
281 3usize,
282 Distribution::new_uniform(Interval::make(Some(1.), Some(3.))?)?,
283 ),
284 (
285 4usize,
286 Distribution::new_uniform(Interval::make(Some(1.), Some(5.))?)?,
287 ),
288 ]);
289 let ev_stats = graph.evaluate_statistics()?;
290 assert_eq!(
291 ev_stats,
292 &Distribution::new_bernoulli(ScalarValue::Float64(None))?
293 );
294
295 let one = ScalarValue::new_one(&DataType::Float64)?;
296 assert_eq!(
297 graph.propagate_statistics(Distribution::new_bernoulli(one)?)?,
298 PropagationResult::Success
299 );
300 Ok(())
301 }
302}