vegafusion_runtime/expression/compiler/
utils.rs1use crate::datafusion::context::make_datafusion_context;
2use datafusion::physical_expr::PhysicalExpr;
3use datafusion::physical_plan::ColumnarValue;
4use datafusion_common::{ExprSchema, ScalarValue};
5use datafusion_expr::utils::expr_to_columns;
6use datafusion_expr::{Expr, ExprSchemable, TryCast};
7use datafusion_optimizer::simplify_expressions::SimplifyInfo;
8use datafusion_physical_expr::execution_props::ExecutionProps;
9use std::collections::HashSet;
10use std::convert::TryFrom;
11use std::sync::Arc;
12use vegafusion_common::arrow::array::{ArrayRef, BooleanArray};
13use vegafusion_common::arrow::datatypes::DataType;
14use vegafusion_common::arrow::record_batch::RecordBatch;
15use vegafusion_common::datafusion_common::{Column, DFSchema, DataFusionError};
16use vegafusion_core::error::{Result, ResultWithContext, VegaFusionError};
17
18lazy_static! {
19 pub static ref UNIT_RECORD_BATCH: RecordBatch = RecordBatch::try_from_iter(vec![(
20 "__unit__",
21 Arc::new(BooleanArray::from(vec![true])) as ArrayRef
22 )])
23 .unwrap();
24 pub static ref UNIT_SCHEMA: DFSchema =
25 DFSchema::try_from(UNIT_RECORD_BATCH.schema().as_ref().clone()).unwrap();
26}
27
28pub trait ExprHelpers {
29 fn columns(&self) -> Result<HashSet<Column>>;
30 fn to_phys_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
31 fn eval_to_scalar(&self) -> Result<ScalarValue>;
32 fn try_cast_to(
33 self,
34 cast_to_type: &DataType,
35 schema: &dyn ExprSchema,
36 ) -> datafusion_common::Result<Expr>;
37}
38
39impl ExprHelpers for Expr {
40 fn columns(&self) -> Result<HashSet<Column>> {
41 let mut columns: HashSet<Column> = HashSet::new();
42 expr_to_columns(self, &mut columns)
43 .with_context(|| format!("Failed to collect columns from expression: {self:?}"))?;
44 Ok(columns)
45 }
46
47 fn to_phys_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
48 let ctx = make_datafusion_context();
49 let phys_expr = ctx.create_physical_expr(self.clone(), &UNIT_SCHEMA)?;
50 Ok(phys_expr)
51 }
52
53 fn eval_to_scalar(&self) -> Result<ScalarValue> {
54 if !self.columns()?.is_empty() {
55 return Err(VegaFusionError::compilation(format!(
56 "Cannot eval_to_scalar for Expr with column references: {self:?}"
57 )));
58 }
59
60 let phys_expr = self.to_phys_expr()?;
61 let col_result = phys_expr.evaluate(&UNIT_RECORD_BATCH)?;
62 match col_result {
63 ColumnarValue::Scalar(scalar) => Ok(scalar),
64 ColumnarValue::Array(array) => {
65 if array.len() != 1 {
66 return Err(VegaFusionError::compilation(format!(
67 "Unexpected non-scalar array result when evaluate expr: {self:?}"
68 )));
69 }
70 ScalarValue::try_from_array(&array, 0).with_context(|| {
71 format!(
72 "Failed to convert scalar array result to ScalarValue in expr: {self:?}"
73 )
74 })
75 }
76 }
77 }
78
79 fn try_cast_to(
80 self,
81 cast_to_type: &DataType,
82 schema: &dyn ExprSchema,
83 ) -> datafusion_common::Result<Expr> {
84 let this_type = self.get_type(schema)?;
86 if this_type == *cast_to_type {
87 return Ok(self);
88 }
89 Ok(Expr::TryCast(TryCast::new(
90 Box::new(self),
91 cast_to_type.clone(),
92 )))
93 }
94}
95
96pub struct VfSimplifyInfo {
102 schema: DFSchema,
104
105 execution_props: ExecutionProps,
108}
109
110impl SimplifyInfo for VfSimplifyInfo {
111 fn is_boolean_type(&self, expr: &Expr) -> std::result::Result<bool, DataFusionError> {
112 Ok(matches!(expr.get_type(&self.schema)?, DataType::Boolean))
113 }
114
115 fn nullable(&self, expr: &Expr) -> std::result::Result<bool, DataFusionError> {
116 expr.nullable(&self.schema)
117 }
118
119 fn execution_props(&self) -> &ExecutionProps {
120 &self.execution_props
121 }
122
123 fn get_data_type(&self, expr: &Expr) -> std::result::Result<DataType, DataFusionError> {
124 expr.get_type(&self.schema)
125 }
126}
127
128impl From<DFSchema> for VfSimplifyInfo {
129 fn from(schema: DFSchema) -> Self {
130 Self {
131 schema,
132 execution_props: ExecutionProps::new(),
133 }
134 }
135}