vegafusion_runtime/transform/
utils.rs

1use crate::expression::compiler::builtin_functions::date_time::date_format::d3_to_chrono_format;
2use crate::expression::compiler::utils::ExprHelpers;
3use datafusion::arrow::datatypes::{DataType, TimeUnit};
4use datafusion_common::DFSchema;
5use datafusion_expr::{lit, when, Expr, ExprSchemable};
6use datafusion_functions::expr_fn::{make_date, regexp_like, to_timestamp_millis};
7use vegafusion_common::arrow::record_batch::RecordBatch;
8use vegafusion_common::datatypes::is_numeric_datatype;
9use vegafusion_common::error::{Result, VegaFusionError};
10
11pub trait RecordBatchUtils {
12    fn equals(&self, other: &RecordBatch) -> bool;
13}
14
15impl RecordBatchUtils for RecordBatch {
16    fn equals(&self, other: &RecordBatch) -> bool {
17        if self.schema() != other.schema() {
18            // Schema's are not equal
19            return false;
20        }
21
22        // Schema's equal, check columns
23        let schema = self.schema();
24
25        for (i, _field) in schema.fields().iter().enumerate() {
26            let self_array = self.column(i);
27            let other_array = other.column(i);
28            if self_array != other_array {
29                return false;
30            }
31        }
32
33        true
34    }
35}
36
37pub fn make_timestamp_parse_formats() -> Vec<Expr> {
38    vec![
39        // ISO 8601 with and without time and 'T' separator
40        "%Y-%m-%d",
41        "%Y-%m-%dT%H:%M:%S",
42        "%Y-%m-%dT%H:%M:%S%.3f",
43        "%Y-%m-%dT%H:%M",
44        "%Y-%m-%d %H:%M:%S",
45        "%Y-%m-%d %H:%M:%S%.3f",
46        "%Y-%m-%d %H:%M",
47        // With UTC timezone offset
48        "%Y-%m-%dT%H:%M:%S%:z",
49        "%Y-%m-%dT%H:%M:%SZ",
50        "%Y-%m-%dT%H:%M:%S%.3f%:z",
51        "%Y-%m-%dT%H:%M:%S%.3fZ",
52        "%Y-%m-%dT%H:%M%:z",
53        "%Y-%m-%d %H:%M:%S%:z",
54        "%Y-%m-%d %H:%M:%SZ",
55        "%Y-%m-%d %H:%M:%S%.3f%:z",
56        "%Y-%m-%d %H:%M:%S%.3fZ",
57        "%Y-%m-%d %H:%M%:z",
58        // ISO 8601 with forward slashes
59        "%Y/%m/%d",
60        "%Y/%m/%d %H:%M:%S",
61        "%Y/%m/%d %H:%M",
62        // month/day/year
63        "%m/%d/%Y",
64        "%m/%d/%Y %H:%M:%S",
65        "%m/%d/%Y %H:%M",
66        // e.g. May 1 2003
67        "%b %-d %Y",
68        "%b %-d %Y %H:%M:%S",
69        "%b %-d %Y %H:%M",
70        // ctime format (e.g. Sun Jul 8 00:34:60 2001)
71        "%a %b %-d %H:%M:%S %Y",
72        "%a %b %-d %H:%M %Y",
73        // e.g. 01 Jan 2012 00:00:00
74        "%d %b %Y",
75        "%d %b %Y %H:%M:%S",
76        "%d %b %Y %H:%M",
77        // e.g. Sun, 01 Jan 2012 00:00:00
78        "%a, %d %b %Y",
79        "%a, %d %b %Y %H:%M:%S",
80        "%a, %d %b %Y %H:%M",
81        // e.g. December 17, 1995 03:00:00
82        "%B %d, %Y",
83        "%B %d, %Y %H:%M:%S",
84        "%B %d, %Y %H:%M",
85    ]
86    .into_iter()
87    .map(lit)
88    .collect()
89}
90
91/// Build an expression that converts string to timestamps, following the browser's unfortunate
92/// convention where ISO8601 dates (not timestamps) are always interpreted as UTC,
93/// but all other formats are interpreted as the local timezone.
94pub fn str_to_timestamp(
95    s: Expr,
96    default_input_tz: &str,
97    schema: &DFSchema,
98    fmt: Option<&str>,
99) -> Result<Expr> {
100    if let Some(fmt) = fmt {
101        // Parse with single explicit format, in the specified timezone
102        let chrono_fmt = d3_to_chrono_format(fmt);
103
104        if chrono_fmt == "%Y" {
105            // Chrono won't parse this as years by itself, since it's not technically enough info
106            // to make a timestamp, so instead we'll make date on the first of the year
107            Ok(make_date(
108                s.try_cast_to(&DataType::Int64, schema)?,
109                lit(1),
110                lit(1),
111            ))
112        } else {
113            // Interpret as utc if the input has a timezone offset
114            let is_utc_condition = regexp_like(s.clone(), lit(r"[+-]\d{2}:\d{2}$"), None)
115                .or(regexp_like(s.clone(), lit(r"Z$"), None));
116
117            let naive_timestamp = to_timestamp_millis(vec![s, lit(chrono_fmt)]);
118
119            // Interpret as UTC then convert to default_iput
120            let if_true = naive_timestamp
121                .clone()
122                .try_cast_to(
123                    &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
124                    schema,
125                )?
126                .try_cast_to(
127                    &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
128                    schema,
129                )?;
130
131            // Interpret as default input
132            let if_false = naive_timestamp.try_cast_to(
133                &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
134                schema,
135            )?;
136
137            let expr = when(is_utc_condition, if_true).otherwise(if_false)?;
138            Ok(expr)
139        }
140    } else {
141        // Auto formatting;
142        // Create condition for whether the parsed timestamp (which always starts as naive) should
143        // be interpreted as UTC, or as the default_input_tz.
144        // There are two cases where we always use UTC:
145        //   1. To follow the browser, timestamps of the form 2020-01-01 are always interpreted as UTC
146        //   2. Timestamps that have an offset suffix (e.g. '+05:00', '-09:00', or 'Z') are parsed by
147        //      datafusion as UTC
148        let is_utc_condition = regexp_like(s.clone(), lit(r"^\d{4}-\d{2}-\d{2}$"), None)
149            .or(regexp_like(s.clone(), lit(r"[+-]\d{2}:\d{2}$"), None))
150            .or(regexp_like(s.clone(), lit(r"Z$"), None));
151
152        // Note: it's important for the express to always return values in the same timezone,
153        // so we cast the UTC case back to the local timezone
154        let if_true = to_timestamp_millis(vec![s.clone()])
155            .try_cast_to(
156                &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
157                schema,
158            )?
159            .try_cast_to(
160                &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
161                schema,
162            )?;
163
164        let if_false = to_timestamp_millis([vec![s], make_timestamp_parse_formats()].concat())
165            .try_cast_to(
166                &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
167                schema,
168            )?;
169
170        let expr = when(is_utc_condition, if_true).otherwise(if_false)?;
171        Ok(expr)
172    }
173}
174
175pub fn from_epoch_millis(expr: Expr, schema: &DFSchema) -> Result<Expr> {
176    Ok(expr.try_cast_to(&DataType::Int64, schema)?.try_cast_to(
177        &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
178        schema,
179    )?)
180}
181
182pub fn to_epoch_millis(expr: Expr, default_input_tz: &str, schema: &DFSchema) -> Result<Expr> {
183    // Dispatch handling on data type
184    Ok(match expr.get_type(schema)? {
185        DataType::Timestamp(TimeUnit::Millisecond, None) | DataType::Date64 => {
186            expr.cast_to(&DataType::Int64, schema)?
187        }
188        DataType::Date32 | DataType::Timestamp(_, None) => expr
189            .try_cast_to(&DataType::Timestamp(TimeUnit::Millisecond, None), schema)?
190            .cast_to(&DataType::Int64, schema)?,
191        DataType::Timestamp(_, Some(_)) => {
192            // Convert to UTC, then drop timezone
193            expr.try_cast_to(
194                &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
195                schema,
196            )?
197            .cast_to(&DataType::Int64, schema)?
198        }
199        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
200            str_to_timestamp(expr.clone(), default_input_tz, schema, None)?
201                .try_cast_to(
202                    &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
203                    schema,
204                )?
205                .cast_to(&DataType::Int64, schema)?
206        }
207        DataType::Int64 => {
208            // Keep int argument as-is
209            expr.clone()
210        }
211        dtype if is_numeric_datatype(&dtype) || matches!(dtype, DataType::Boolean) => {
212            // Cast other numeric types to Int64
213            expr.clone().try_cast_to(&DataType::Int64, schema)?
214        }
215        dtype => {
216            return Err(VegaFusionError::internal(format!(
217                "Invalid argument type to time function: {dtype:?}"
218            )))
219        }
220    })
221}