vegafusion_runtime/transform/
utils.rs1use crate::expression::compiler::builtin_functions::date_time::date_format::d3_to_chrono_format;
2use crate::expression::compiler::utils::ExprHelpers;
3use datafusion::arrow::datatypes::{DataType, TimeUnit};
4use datafusion_common::DFSchema;
5use datafusion_expr::{lit, when, Expr, ExprSchemable};
6use datafusion_functions::expr_fn::{make_date, regexp_like, to_timestamp_millis};
7use vegafusion_common::arrow::record_batch::RecordBatch;
8use vegafusion_common::datatypes::is_numeric_datatype;
9use vegafusion_common::error::{Result, VegaFusionError};
10
11pub trait RecordBatchUtils {
12 fn equals(&self, other: &RecordBatch) -> bool;
13}
14
15impl RecordBatchUtils for RecordBatch {
16 fn equals(&self, other: &RecordBatch) -> bool {
17 if self.schema() != other.schema() {
18 return false;
20 }
21
22 let schema = self.schema();
24
25 for (i, _field) in schema.fields().iter().enumerate() {
26 let self_array = self.column(i);
27 let other_array = other.column(i);
28 if self_array != other_array {
29 return false;
30 }
31 }
32
33 true
34 }
35}
36
37pub fn make_timestamp_parse_formats() -> Vec<Expr> {
38 vec![
39 "%Y-%m-%d",
41 "%Y-%m-%dT%H:%M:%S",
42 "%Y-%m-%dT%H:%M:%S%.3f",
43 "%Y-%m-%dT%H:%M",
44 "%Y-%m-%d %H:%M:%S",
45 "%Y-%m-%d %H:%M:%S%.3f",
46 "%Y-%m-%d %H:%M",
47 "%Y-%m-%dT%H:%M:%S%:z",
49 "%Y-%m-%dT%H:%M:%SZ",
50 "%Y-%m-%dT%H:%M:%S%.3f%:z",
51 "%Y-%m-%dT%H:%M:%S%.3fZ",
52 "%Y-%m-%dT%H:%M%:z",
53 "%Y-%m-%d %H:%M:%S%:z",
54 "%Y-%m-%d %H:%M:%SZ",
55 "%Y-%m-%d %H:%M:%S%.3f%:z",
56 "%Y-%m-%d %H:%M:%S%.3fZ",
57 "%Y-%m-%d %H:%M%:z",
58 "%Y/%m/%d",
60 "%Y/%m/%d %H:%M:%S",
61 "%Y/%m/%d %H:%M",
62 "%m/%d/%Y",
64 "%m/%d/%Y %H:%M:%S",
65 "%m/%d/%Y %H:%M",
66 "%b %-d %Y",
68 "%b %-d %Y %H:%M:%S",
69 "%b %-d %Y %H:%M",
70 "%a %b %-d %H:%M:%S %Y",
72 "%a %b %-d %H:%M %Y",
73 "%d %b %Y",
75 "%d %b %Y %H:%M:%S",
76 "%d %b %Y %H:%M",
77 "%a, %d %b %Y",
79 "%a, %d %b %Y %H:%M:%S",
80 "%a, %d %b %Y %H:%M",
81 "%B %d, %Y",
83 "%B %d, %Y %H:%M:%S",
84 "%B %d, %Y %H:%M",
85 ]
86 .into_iter()
87 .map(lit)
88 .collect()
89}
90
91pub fn str_to_timestamp(
95 s: Expr,
96 default_input_tz: &str,
97 schema: &DFSchema,
98 fmt: Option<&str>,
99) -> Result<Expr> {
100 if let Some(fmt) = fmt {
101 let chrono_fmt = d3_to_chrono_format(fmt);
103
104 if chrono_fmt == "%Y" {
105 Ok(make_date(
108 s.try_cast_to(&DataType::Int64, schema)?,
109 lit(1),
110 lit(1),
111 ))
112 } else {
113 let is_utc_condition = regexp_like(s.clone(), lit(r"[+-]\d{2}:\d{2}$"), None)
115 .or(regexp_like(s.clone(), lit(r"Z$"), None));
116
117 let naive_timestamp = to_timestamp_millis(vec![s, lit(chrono_fmt)]);
118
119 let if_true = naive_timestamp
121 .clone()
122 .try_cast_to(
123 &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
124 schema,
125 )?
126 .try_cast_to(
127 &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
128 schema,
129 )?;
130
131 let if_false = naive_timestamp.try_cast_to(
133 &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
134 schema,
135 )?;
136
137 let expr = when(is_utc_condition, if_true).otherwise(if_false)?;
138 Ok(expr)
139 }
140 } else {
141 let is_utc_condition = regexp_like(s.clone(), lit(r"^\d{4}-\d{2}-\d{2}$"), None)
149 .or(regexp_like(s.clone(), lit(r"[+-]\d{2}:\d{2}$"), None))
150 .or(regexp_like(s.clone(), lit(r"Z$"), None));
151
152 let if_true = to_timestamp_millis(vec![s.clone()])
155 .try_cast_to(
156 &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
157 schema,
158 )?
159 .try_cast_to(
160 &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
161 schema,
162 )?;
163
164 let if_false = to_timestamp_millis([vec![s], make_timestamp_parse_formats()].concat())
165 .try_cast_to(
166 &DataType::Timestamp(TimeUnit::Millisecond, Some(default_input_tz.into())),
167 schema,
168 )?;
169
170 let expr = when(is_utc_condition, if_true).otherwise(if_false)?;
171 Ok(expr)
172 }
173}
174
175pub fn from_epoch_millis(expr: Expr, schema: &DFSchema) -> Result<Expr> {
176 Ok(expr.try_cast_to(&DataType::Int64, schema)?.try_cast_to(
177 &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
178 schema,
179 )?)
180}
181
182pub fn to_epoch_millis(expr: Expr, default_input_tz: &str, schema: &DFSchema) -> Result<Expr> {
183 Ok(match expr.get_type(schema)? {
185 DataType::Timestamp(TimeUnit::Millisecond, None) | DataType::Date64 => {
186 expr.cast_to(&DataType::Int64, schema)?
187 }
188 DataType::Date32 | DataType::Timestamp(_, None) => expr
189 .try_cast_to(&DataType::Timestamp(TimeUnit::Millisecond, None), schema)?
190 .cast_to(&DataType::Int64, schema)?,
191 DataType::Timestamp(_, Some(_)) => {
192 expr.try_cast_to(
194 &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
195 schema,
196 )?
197 .cast_to(&DataType::Int64, schema)?
198 }
199 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
200 str_to_timestamp(expr.clone(), default_input_tz, schema, None)?
201 .try_cast_to(
202 &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
203 schema,
204 )?
205 .cast_to(&DataType::Int64, schema)?
206 }
207 DataType::Int64 => {
208 expr.clone()
210 }
211 dtype if is_numeric_datatype(&dtype) || matches!(dtype, DataType::Boolean) => {
212 expr.clone().try_cast_to(&DataType::Int64, schema)?
214 }
215 dtype => {
216 return Err(VegaFusionError::internal(format!(
217 "Invalid argument type to time function: {dtype:?}"
218 )))
219 }
220 })
221}