vegafusion_runtime/expression/compiler/
utils.rs

1use crate::datafusion::context::make_datafusion_context;
2use datafusion::physical_expr::PhysicalExpr;
3use datafusion::physical_plan::ColumnarValue;
4use datafusion_common::{ExprSchema, ScalarValue};
5use datafusion_expr::utils::expr_to_columns;
6use datafusion_expr::{Expr, ExprSchemable, TryCast};
7use datafusion_optimizer::simplify_expressions::SimplifyInfo;
8use datafusion_physical_expr::execution_props::ExecutionProps;
9use std::collections::HashSet;
10use std::convert::TryFrom;
11use std::sync::Arc;
12use vegafusion_common::arrow::array::{ArrayRef, BooleanArray};
13use vegafusion_common::arrow::datatypes::DataType;
14use vegafusion_common::arrow::record_batch::RecordBatch;
15use vegafusion_common::datafusion_common::{Column, DFSchema, DataFusionError};
16use vegafusion_core::error::{Result, ResultWithContext, VegaFusionError};
17
18lazy_static! {
19    pub static ref UNIT_RECORD_BATCH: RecordBatch = RecordBatch::try_from_iter(vec![(
20        "__unit__",
21        Arc::new(BooleanArray::from(vec![true])) as ArrayRef
22    )])
23    .unwrap();
24    pub static ref UNIT_SCHEMA: DFSchema =
25        DFSchema::try_from(UNIT_RECORD_BATCH.schema().as_ref().clone()).unwrap();
26}
27
28pub trait ExprHelpers {
29    fn columns(&self) -> Result<HashSet<Column>>;
30    fn to_phys_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
31    fn eval_to_scalar(&self) -> Result<ScalarValue>;
32    fn try_cast_to(
33        self,
34        cast_to_type: &DataType,
35        schema: &dyn ExprSchema,
36    ) -> datafusion_common::Result<Expr>;
37}
38
39impl ExprHelpers for Expr {
40    fn columns(&self) -> Result<HashSet<Column>> {
41        let mut columns: HashSet<Column> = HashSet::new();
42        expr_to_columns(self, &mut columns)
43            .with_context(|| format!("Failed to collect columns from expression: {self:?}"))?;
44        Ok(columns)
45    }
46
47    fn to_phys_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
48        let ctx = make_datafusion_context();
49        let phys_expr = ctx.create_physical_expr(self.clone(), &UNIT_SCHEMA)?;
50        Ok(phys_expr)
51    }
52
53    fn eval_to_scalar(&self) -> Result<ScalarValue> {
54        if !self.columns()?.is_empty() {
55            return Err(VegaFusionError::compilation(format!(
56                "Cannot eval_to_scalar for Expr with column references: {self:?}"
57            )));
58        }
59
60        let phys_expr = self.to_phys_expr()?;
61        let col_result = phys_expr.evaluate(&UNIT_RECORD_BATCH)?;
62        match col_result {
63            ColumnarValue::Scalar(scalar) => Ok(scalar),
64            ColumnarValue::Array(array) => {
65                if array.len() != 1 {
66                    return Err(VegaFusionError::compilation(format!(
67                        "Unexpected non-scalar array result when evaluate expr: {self:?}"
68                    )));
69                }
70                ScalarValue::try_from_array(&array, 0).with_context(|| {
71                    format!(
72                        "Failed to convert scalar array result to ScalarValue in expr: {self:?}"
73                    )
74                })
75            }
76        }
77    }
78
79    fn try_cast_to(
80        self,
81        cast_to_type: &DataType,
82        schema: &dyn ExprSchema,
83    ) -> datafusion_common::Result<Expr> {
84        // Based on cast_to, using TryCast instead of Cast
85        let this_type = self.get_type(schema)?;
86        if this_type == *cast_to_type {
87            return Ok(self);
88        }
89        Ok(Expr::TryCast(TryCast::new(
90            Box::new(self),
91            cast_to_type.clone(),
92        )))
93    }
94}
95
96/// In order to simplify expressions, DataFusion must have information
97/// about the expressions.
98///
99/// You can provide that information using DataFusion [DFSchema]
100/// objects or from some other implemention
101pub struct VfSimplifyInfo {
102    /// The input schema
103    schema: DFSchema,
104
105    /// Execution specific details needed for constant evaluation such
106    /// as the current time for `now()` and [VariableProviders]
107    execution_props: ExecutionProps,
108}
109
110impl SimplifyInfo for VfSimplifyInfo {
111    fn is_boolean_type(&self, expr: &Expr) -> std::result::Result<bool, DataFusionError> {
112        Ok(matches!(expr.get_type(&self.schema)?, DataType::Boolean))
113    }
114
115    fn nullable(&self, expr: &Expr) -> std::result::Result<bool, DataFusionError> {
116        expr.nullable(&self.schema)
117    }
118
119    fn execution_props(&self) -> &ExecutionProps {
120        &self.execution_props
121    }
122
123    fn get_data_type(&self, expr: &Expr) -> std::result::Result<DataType, DataFusionError> {
124        expr.get_type(&self.schema)
125    }
126}
127
128impl From<DFSchema> for VfSimplifyInfo {
129    fn from(schema: DFSchema) -> Self {
130        Self {
131            schema,
132            execution_props: ExecutionProps::new(),
133        }
134    }
135}