1use std::any::Any;
19use std::sync::Arc;
20
21use crate::datetime::common::*;
22use arrow::array::timezone::Tz;
23use arrow::array::{
24 Array, Decimal128Array, Float16Array, Float32Array, Float64Array,
25 TimestampNanosecondArray,
26};
27use arrow::datatypes::DataType::*;
28use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
29use arrow::datatypes::{
30 ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
31 TimestampNanosecondType, TimestampSecondType,
32};
33use datafusion_common::config::ConfigOptions;
34use datafusion_common::{Result, ScalarType, ScalarValue, exec_err};
35use datafusion_expr::{
36 ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
37};
38use datafusion_macros::user_doc;
39
40#[user_doc(
41 doc_section(label = "Time and Date Functions"),
42 description = r#"
43Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000<TZ>`) in the session time zone. Supports strings,
44integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00')
45if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided.
46Strings that parse without a time zone are treated as if they are in the
47session time zone, or UTC if no session time zone is set.
48Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`).
49
50Note: `to_timestamp` returns `Timestamp(ns, TimeZone)` where the time zone is the session time zone. The supported range
51for integer input is between`-9223372037` and `9223372036`. Supported range for string input is between
52`1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds`
53for the input outside of supported bounds.
54
55The session time zone can be set using the statement `SET TIMEZONE = 'desired time zone'`.
56The time zone can be a value like +00:00, 'Europe/London' etc.
57"#,
58 syntax_example = "to_timestamp(expression[, ..., format_n])",
59 sql_example = r#"```sql
60> select to_timestamp('2023-01-31T09:26:56.123456789-05:00');
61+-----------------------------------------------------------+
62| to_timestamp(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
63+-----------------------------------------------------------+
64| 2023-01-31T14:26:56.123456789 |
65+-----------------------------------------------------------+
66> select to_timestamp('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
67+--------------------------------------------------------------------------------------------------------+
68| to_timestamp(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
69+--------------------------------------------------------------------------------------------------------+
70| 2023-05-17T03:59:00.123456789 |
71+--------------------------------------------------------------------------------------------------------+
72```
73Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/builtin_functions/date_time.rs)
74"#,
75 argument(
76 name = "expression",
77 description = "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators."
78 ),
79 argument(
80 name = "format_n",
81 description = r#"
82Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression.
83Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully
84parse the expression an error will be returned. Note: parsing of named timezones (e.g. 'America/New_York') using %Z is
85only supported at the end of the string preceded by a space.
86"#
87 )
88)]
89#[derive(Debug, PartialEq, Eq, Hash)]
90pub struct ToTimestampFunc {
91 signature: Signature,
92 timezone: Option<Arc<str>>,
93}
94
95#[user_doc(
96 doc_section(label = "Time and Date Functions"),
97 description = r#"
98Converts a value to a timestamp (`YYYY-MM-DDT00:00:00<TZ>`) in the session time zone. Supports strings,
99integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00')
100if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided.
101Strings that parse without a time zone are treated as if they are in the
102session time zone, or UTC if no session time zone is set.
103Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`).
104
105The session time zone can be set using the statement `SET TIMEZONE = 'desired time zone'`.
106The time zone can be a value like +00:00, 'Europe/London' etc.
107"#,
108 syntax_example = "to_timestamp_seconds(expression[, ..., format_n])",
109 sql_example = r#"```sql
110> select to_timestamp_seconds('2023-01-31T09:26:56.123456789-05:00');
111+-------------------------------------------------------------------+
112| to_timestamp_seconds(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
113+-------------------------------------------------------------------+
114| 2023-01-31T14:26:56 |
115+-------------------------------------------------------------------+
116> select to_timestamp_seconds('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
117+----------------------------------------------------------------------------------------------------------------+
118| to_timestamp_seconds(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
119+----------------------------------------------------------------------------------------------------------------+
120| 2023-05-17T03:59:00 |
121+----------------------------------------------------------------------------------------------------------------+
122```
123Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/builtin_functions/date_time.rs)
124"#,
125 argument(
126 name = "expression",
127 description = "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators."
128 ),
129 argument(
130 name = "format_n",
131 description = r#"
132Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression.
133Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully
134parse the expression an error will be returned. Note: parsing of named timezones (e.g. 'America/New_York') using %Z is
135only supported at the end of the string preceded by a space.
136"#
137 )
138)]
139#[derive(Debug, PartialEq, Eq, Hash)]
140pub struct ToTimestampSecondsFunc {
141 signature: Signature,
142 timezone: Option<Arc<str>>,
143}
144
145#[user_doc(
146 doc_section(label = "Time and Date Functions"),
147 description = r#"
148Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000<TZ>`) in the session time zone. Supports strings,
149integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00')
150if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided.
151Strings that parse without a time zone are treated as if they are in the
152session time zone, or UTC if no session time zone is set.
153Integers, unsigned integers, and doubles are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`).
154
155The session time zone can be set using the statement `SET TIMEZONE = 'desired time zone'`.
156The time zone can be a value like +00:00, 'Europe/London' etc.
157"#,
158 syntax_example = "to_timestamp_millis(expression[, ..., format_n])",
159 sql_example = r#"```sql
160> select to_timestamp_millis('2023-01-31T09:26:56.123456789-05:00');
161+------------------------------------------------------------------+
162| to_timestamp_millis(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
163+------------------------------------------------------------------+
164| 2023-01-31T14:26:56.123 |
165+------------------------------------------------------------------+
166> select to_timestamp_millis('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
167+---------------------------------------------------------------------------------------------------------------+
168| to_timestamp_millis(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
169+---------------------------------------------------------------------------------------------------------------+
170| 2023-05-17T03:59:00.123 |
171+---------------------------------------------------------------------------------------------------------------+
172```
173Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/builtin_functions/date_time.rs)
174"#,
175 argument(
176 name = "expression",
177 description = "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators."
178 ),
179 argument(
180 name = "format_n",
181 description = r#"
182Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression.
183Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully
184parse the expression an error will be returned. Note: parsing of named timezones (e.g. 'America/New_York') using %Z is
185only supported at the end of the string preceded by a space.
186"#
187 )
188)]
189#[derive(Debug, PartialEq, Eq, Hash)]
190pub struct ToTimestampMillisFunc {
191 signature: Signature,
192 timezone: Option<Arc<str>>,
193}
194
195#[user_doc(
196 doc_section(label = "Time and Date Functions"),
197 description = r#"
198Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000<TZ>`) in the session time zone. Supports strings,
199integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00')
200if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided.
201Strings that parse without a time zone are treated as if they are in the
202session time zone, or UTC if no session time zone is set.
203Integers, unsigned integers, and doubles are interpreted as microseconds since the unix epoch (`1970-01-01T00:00:00Z`).
204
205The session time zone can be set using the statement `SET TIMEZONE = 'desired time zone'`.
206The time zone can be a value like +00:00, 'Europe/London' etc.
207"#,
208 syntax_example = "to_timestamp_micros(expression[, ..., format_n])",
209 sql_example = r#"```sql
210> select to_timestamp_micros('2023-01-31T09:26:56.123456789-05:00');
211+------------------------------------------------------------------+
212| to_timestamp_micros(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
213+------------------------------------------------------------------+
214| 2023-01-31T14:26:56.123456 |
215+------------------------------------------------------------------+
216> select to_timestamp_micros('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
217+---------------------------------------------------------------------------------------------------------------+
218| to_timestamp_micros(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
219+---------------------------------------------------------------------------------------------------------------+
220| 2023-05-17T03:59:00.123456 |
221+---------------------------------------------------------------------------------------------------------------+
222```
223Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/builtin_functions/date_time.rs)
224"#,
225 argument(
226 name = "expression",
227 description = "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators."
228 ),
229 argument(
230 name = "format_n",
231 description = r#"
232Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression.
233Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully
234parse the expression an error will be returned. Note: parsing of named timezones (e.g. 'America/New_York') using %Z is
235only supported at the end of the string preceded by a space.
236"#
237 )
238)]
239#[derive(Debug, PartialEq, Eq, Hash)]
240pub struct ToTimestampMicrosFunc {
241 signature: Signature,
242 timezone: Option<Arc<str>>,
243}
244
245#[user_doc(
246 doc_section(label = "Time and Date Functions"),
247 description = r#"
248Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000000<TZ>`) in the session time zone. Supports strings,
249integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00')
250if no [Chrono formats](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) are provided.
251Strings that parse without a time zone are treated as if they are in the
252session time zone. Integers, unsigned integers, and doubles are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`).
253
254The session time zone can be set using the statement `SET TIMEZONE = 'desired time zone'`.
255The time zone can be a value like +00:00, 'Europe/London' etc.
256"#,
257 syntax_example = "to_timestamp_nanos(expression[, ..., format_n])",
258 sql_example = r#"```sql
259> select to_timestamp_nanos('2023-01-31T09:26:56.123456789-05:00');
260+-----------------------------------------------------------------+
261| to_timestamp_nanos(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
262+-----------------------------------------------------------------+
263| 2023-01-31T14:26:56.123456789 |
264+-----------------------------------------------------------------+
265> select to_timestamp_nanos('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
266+--------------------------------------------------------------------------------------------------------------+
267| to_timestamp_nanos(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
268+--------------------------------------------------------------------------------------------------------------+
269| 2023-05-17T03:59:00.123456789 |
270+---------------------------------------------------------------------------------------------------------------+
271```
272Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/builtin_functions/date_time.rs)
273"#,
274 argument(
275 name = "expression",
276 description = "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators."
277 ),
278 argument(
279 name = "format_n",
280 description = r#"
281Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression.
282Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully
283parse the expression an error will be returned. Note: parsing of named timezones (e.g. 'America/New_York') using %Z is
284only supported at the end of the string preceded by a space.
285"#
286 )
287)]
288#[derive(Debug, PartialEq, Eq, Hash)]
289pub struct ToTimestampNanosFunc {
290 signature: Signature,
291 timezone: Option<Arc<str>>,
292}
293
294macro_rules! impl_to_timestamp_constructors {
297 ($func:ty) => {
298 impl Default for $func {
299 fn default() -> Self {
300 Self::new_with_config(&ConfigOptions::default())
301 }
302 }
303
304 impl $func {
305 #[deprecated(since = "52.0.0", note = "use `new_with_config` instead")]
306 pub fn new() -> Self {
312 Self::new_with_config(&ConfigOptions::default())
313 }
314
315 pub fn new_with_config(config: &ConfigOptions) -> Self {
316 Self {
317 signature: Signature::variadic_any(Volatility::Immutable),
318 timezone: config
319 .execution
320 .time_zone
321 .as_ref()
322 .map(|tz| Arc::from(tz.as_str())),
323 }
324 }
325 }
326 };
327}
328
329impl_to_timestamp_constructors!(ToTimestampFunc);
330impl_to_timestamp_constructors!(ToTimestampSecondsFunc);
331impl_to_timestamp_constructors!(ToTimestampMillisFunc);
332impl_to_timestamp_constructors!(ToTimestampMicrosFunc);
333impl_to_timestamp_constructors!(ToTimestampNanosFunc);
334
335fn decimal_to_nanoseconds(value: i128, scale: i8) -> i64 {
336 let nanos_exponent = 9_i16 - scale as i16;
337 let timestamp_nanos = if nanos_exponent >= 0 {
338 value * 10_i128.pow(nanos_exponent as u32)
339 } else {
340 value / 10_i128.pow(nanos_exponent.unsigned_abs() as u32)
341 };
342 timestamp_nanos as i64
343}
344
345fn decimal128_to_timestamp_nanos(
346 arg: &ColumnarValue,
347 tz: Option<Arc<str>>,
348) -> Result<ColumnarValue> {
349 match arg {
350 ColumnarValue::Scalar(ScalarValue::Decimal128(Some(value), _, scale)) => {
351 let timestamp_nanos = decimal_to_nanoseconds(*value, *scale);
352 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
353 Some(timestamp_nanos),
354 tz,
355 )))
356 }
357 ColumnarValue::Scalar(ScalarValue::Decimal128(None, _, _)) => Ok(
358 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, tz)),
359 ),
360 ColumnarValue::Array(arr) => {
361 let decimal_arr = downcast_arg!(arr, Decimal128Array);
362 let scale = decimal_arr.scale();
363 let result: TimestampNanosecondArray = decimal_arr
364 .iter()
365 .map(|v| v.map(|val| decimal_to_nanoseconds(val, scale)))
366 .collect();
367 let result = result.with_timezone_opt(tz);
368 Ok(ColumnarValue::Array(Arc::new(result)))
369 }
370 _ => exec_err!("Invalid Decimal128 value for to_timestamp"),
371 }
372}
373
374macro_rules! impl_with_updated_config {
382 () => {
383 fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> {
384 Some(Self::new_with_config(config).into())
385 }
386 };
387}
388
389impl ScalarUDFImpl for ToTimestampFunc {
390 fn as_any(&self) -> &dyn Any {
391 self
392 }
393
394 fn name(&self) -> &str {
395 "to_timestamp"
396 }
397
398 fn signature(&self) -> &Signature {
399 &self.signature
400 }
401
402 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
403 Ok(Timestamp(Nanosecond, self.timezone.clone()))
404 }
405
406 impl_with_updated_config!();
407
408 fn invoke_with_args(
409 &self,
410 args: datafusion_expr::ScalarFunctionArgs,
411 ) -> Result<ColumnarValue> {
412 let datafusion_expr::ScalarFunctionArgs { args, .. } = args;
413
414 if args.is_empty() {
415 return exec_err!(
416 "to_timestamp function requires 1 or more arguments, got {}",
417 args.len()
418 );
419 }
420
421 if args.len() > 1 {
423 validate_data_types(&args, "to_timestamp")?;
424 }
425
426 let tz = self.timezone.clone();
427
428 match args[0].data_type() {
429 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => args[0]
430 .cast_to(&Timestamp(Second, None), None)?
431 .cast_to(&Timestamp(Nanosecond, tz), None),
432 Null | Timestamp(_, _) => args[0].cast_to(&Timestamp(Nanosecond, tz), None),
433 Float16 => match &args[0] {
434 ColumnarValue::Scalar(ScalarValue::Float16(value)) => {
435 let timestamp_nanos =
436 value.map(|v| (v.to_f64() * 1_000_000_000.0) as i64);
437 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
438 timestamp_nanos,
439 tz,
440 )))
441 }
442 ColumnarValue::Array(arr) => {
443 let f16_arr = downcast_arg!(arr, Float16Array);
444 let result: TimestampNanosecondArray =
445 f16_arr.unary(|x| (x.to_f64() * 1_000_000_000.0) as i64);
446 Ok(ColumnarValue::Array(Arc::new(result.with_timezone_opt(tz))))
447 }
448 _ => exec_err!("Invalid Float16 value for to_timestamp"),
449 },
450 Float32 => match &args[0] {
451 ColumnarValue::Scalar(ScalarValue::Float32(value)) => {
452 let timestamp_nanos =
453 value.map(|v| (v as f64 * 1_000_000_000.0) as i64);
454 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
455 timestamp_nanos,
456 tz,
457 )))
458 }
459 ColumnarValue::Array(arr) => {
460 let f32_arr = downcast_arg!(arr, Float32Array);
461 let result: TimestampNanosecondArray =
462 f32_arr.unary(|x| (x as f64 * 1_000_000_000.0) as i64);
463 Ok(ColumnarValue::Array(Arc::new(result.with_timezone_opt(tz))))
464 }
465 _ => exec_err!("Invalid Float32 value for to_timestamp"),
466 },
467 Float64 => match &args[0] {
468 ColumnarValue::Scalar(ScalarValue::Float64(value)) => {
469 let timestamp_nanos = value.map(|v| (v * 1_000_000_000.0) as i64);
470 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
471 timestamp_nanos,
472 tz,
473 )))
474 }
475 ColumnarValue::Array(arr) => {
476 let f64_arr = downcast_arg!(arr, Float64Array);
477 let result: TimestampNanosecondArray =
478 f64_arr.unary(|x| (x * 1_000_000_000.0) as i64);
479 Ok(ColumnarValue::Array(Arc::new(result.with_timezone_opt(tz))))
480 }
481 _ => exec_err!("Invalid Float64 value for to_timestamp"),
482 },
483 Decimal32(_, _) | Decimal64(_, _) | Decimal256(_, _) => {
484 let arg = args[0].cast_to(&Decimal128(38, 9), None)?;
485 decimal128_to_timestamp_nanos(&arg, tz)
486 }
487 Decimal128(_, _) => decimal128_to_timestamp_nanos(&args[0], tz),
488 Utf8View | LargeUtf8 | Utf8 => {
489 to_timestamp_impl::<TimestampNanosecondType>(&args, "to_timestamp", &tz)
490 }
491 other => {
492 exec_err!("Unsupported data type {other} for function to_timestamp")
493 }
494 }
495 }
496
497 fn documentation(&self) -> Option<&Documentation> {
498 self.doc()
499 }
500}
501
502impl ScalarUDFImpl for ToTimestampSecondsFunc {
503 fn as_any(&self) -> &dyn Any {
504 self
505 }
506
507 fn name(&self) -> &str {
508 "to_timestamp_seconds"
509 }
510
511 fn signature(&self) -> &Signature {
512 &self.signature
513 }
514
515 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
516 Ok(Timestamp(Second, self.timezone.clone()))
517 }
518
519 impl_with_updated_config!();
520
521 fn invoke_with_args(
522 &self,
523 args: datafusion_expr::ScalarFunctionArgs,
524 ) -> Result<ColumnarValue> {
525 let datafusion_expr::ScalarFunctionArgs { args, .. } = args;
526
527 if args.is_empty() {
528 return exec_err!(
529 "to_timestamp_seconds function requires 1 or more arguments, got {}",
530 args.len()
531 );
532 }
533
534 if args.len() > 1 {
536 validate_data_types(&args, "to_timestamp")?;
537 }
538
539 let tz = self.timezone.clone();
540
541 match args[0].data_type() {
542 Null
543 | Int8
544 | Int16
545 | Int32
546 | Int64
547 | UInt8
548 | UInt16
549 | UInt32
550 | UInt64
551 | Timestamp(_, _)
552 | Decimal32(_, _)
553 | Decimal64(_, _)
554 | Decimal128(_, _)
555 | Decimal256(_, _) => args[0].cast_to(&Timestamp(Second, tz), None),
556 Float16 | Float32 | Float64 => args[0]
557 .cast_to(&Int64, None)?
558 .cast_to(&Timestamp(Second, tz), None),
559 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampSecondType>(
560 &args,
561 "to_timestamp_seconds",
562 &self.timezone,
563 ),
564 other => {
565 exec_err!(
566 "Unsupported data type {} for function to_timestamp_seconds",
567 other
568 )
569 }
570 }
571 }
572
573 fn documentation(&self) -> Option<&Documentation> {
574 self.doc()
575 }
576}
577
578impl ScalarUDFImpl for ToTimestampMillisFunc {
579 fn as_any(&self) -> &dyn Any {
580 self
581 }
582
583 fn name(&self) -> &str {
584 "to_timestamp_millis"
585 }
586
587 fn signature(&self) -> &Signature {
588 &self.signature
589 }
590
591 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
592 Ok(Timestamp(Millisecond, self.timezone.clone()))
593 }
594
595 impl_with_updated_config!();
596
597 fn invoke_with_args(
598 &self,
599 args: datafusion_expr::ScalarFunctionArgs,
600 ) -> Result<ColumnarValue> {
601 let datafusion_expr::ScalarFunctionArgs { args, .. } = args;
602
603 if args.is_empty() {
604 return exec_err!(
605 "to_timestamp_millis function requires 1 or more arguments, got {}",
606 args.len()
607 );
608 }
609
610 if args.len() > 1 {
612 validate_data_types(&args, "to_timestamp")?;
613 }
614
615 match args[0].data_type() {
616 Null
617 | Int8
618 | Int16
619 | Int32
620 | Int64
621 | UInt8
622 | UInt16
623 | UInt32
624 | UInt64
625 | Timestamp(_, _)
626 | Decimal32(_, _)
627 | Decimal64(_, _)
628 | Decimal128(_, _)
629 | Decimal256(_, _) => {
630 args[0].cast_to(&Timestamp(Millisecond, self.timezone.clone()), None)
631 }
632 Float16 | Float32 | Float64 => args[0]
633 .cast_to(&Int64, None)?
634 .cast_to(&Timestamp(Millisecond, self.timezone.clone()), None),
635 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampMillisecondType>(
636 &args,
637 "to_timestamp_millis",
638 &self.timezone,
639 ),
640 other => {
641 exec_err!(
642 "Unsupported data type {} for function to_timestamp_millis",
643 other
644 )
645 }
646 }
647 }
648
649 fn documentation(&self) -> Option<&Documentation> {
650 self.doc()
651 }
652}
653
654impl ScalarUDFImpl for ToTimestampMicrosFunc {
655 fn as_any(&self) -> &dyn Any {
656 self
657 }
658
659 fn name(&self) -> &str {
660 "to_timestamp_micros"
661 }
662
663 fn signature(&self) -> &Signature {
664 &self.signature
665 }
666
667 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
668 Ok(Timestamp(Microsecond, self.timezone.clone()))
669 }
670
671 impl_with_updated_config!();
672
673 fn invoke_with_args(
674 &self,
675 args: datafusion_expr::ScalarFunctionArgs,
676 ) -> Result<ColumnarValue> {
677 let datafusion_expr::ScalarFunctionArgs { args, .. } = args;
678
679 if args.is_empty() {
680 return exec_err!(
681 "to_timestamp_micros function requires 1 or more arguments, got {}",
682 args.len()
683 );
684 }
685
686 if args.len() > 1 {
688 validate_data_types(&args, "to_timestamp")?;
689 }
690
691 match args[0].data_type() {
692 Null
693 | Int8
694 | Int16
695 | Int32
696 | Int64
697 | UInt8
698 | UInt16
699 | UInt32
700 | UInt64
701 | Timestamp(_, _)
702 | Decimal32(_, _)
703 | Decimal64(_, _)
704 | Decimal128(_, _)
705 | Decimal256(_, _) => {
706 args[0].cast_to(&Timestamp(Microsecond, self.timezone.clone()), None)
707 }
708 Float16 | Float32 | Float64 => args[0]
709 .cast_to(&Int64, None)?
710 .cast_to(&Timestamp(Microsecond, self.timezone.clone()), None),
711 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampMicrosecondType>(
712 &args,
713 "to_timestamp_micros",
714 &self.timezone,
715 ),
716 other => {
717 exec_err!(
718 "Unsupported data type {} for function to_timestamp_micros",
719 other
720 )
721 }
722 }
723 }
724
725 fn documentation(&self) -> Option<&Documentation> {
726 self.doc()
727 }
728}
729
730impl ScalarUDFImpl for ToTimestampNanosFunc {
731 fn as_any(&self) -> &dyn Any {
732 self
733 }
734
735 fn name(&self) -> &str {
736 "to_timestamp_nanos"
737 }
738
739 fn signature(&self) -> &Signature {
740 &self.signature
741 }
742
743 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
744 Ok(Timestamp(Nanosecond, self.timezone.clone()))
745 }
746
747 impl_with_updated_config!();
748
749 fn invoke_with_args(
750 &self,
751 args: datafusion_expr::ScalarFunctionArgs,
752 ) -> Result<ColumnarValue> {
753 let datafusion_expr::ScalarFunctionArgs { args, .. } = args;
754
755 if args.is_empty() {
756 return exec_err!(
757 "to_timestamp_nanos function requires 1 or more arguments, got {}",
758 args.len()
759 );
760 }
761
762 if args.len() > 1 {
764 validate_data_types(&args, "to_timestamp")?;
765 }
766
767 match args[0].data_type() {
768 Null
769 | Int8
770 | Int16
771 | Int32
772 | Int64
773 | UInt8
774 | UInt16
775 | UInt32
776 | UInt64
777 | Timestamp(_, _)
778 | Decimal32(_, _)
779 | Decimal64(_, _)
780 | Decimal128(_, _)
781 | Decimal256(_, _) => {
782 args[0].cast_to(&Timestamp(Nanosecond, self.timezone.clone()), None)
783 }
784 Float16 | Float32 | Float64 => args[0]
785 .cast_to(&Int64, None)?
786 .cast_to(&Timestamp(Nanosecond, self.timezone.clone()), None),
787 Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampNanosecondType>(
788 &args,
789 "to_timestamp_nanos",
790 &self.timezone,
791 ),
792 other => {
793 exec_err!(
794 "Unsupported data type {} for function to_timestamp_nanos",
795 other
796 )
797 }
798 }
799 }
800
801 fn documentation(&self) -> Option<&Documentation> {
802 self.doc()
803 }
804}
805
806fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
807 args: &[ColumnarValue],
808 name: &str,
809 timezone: &Option<Arc<str>>,
810) -> Result<ColumnarValue> {
811 let factor = match T::UNIT {
812 Second => 1_000_000_000,
813 Millisecond => 1_000_000,
814 Microsecond => 1_000,
815 Nanosecond => 1,
816 };
817
818 let tz = match timezone.clone() {
819 Some(tz) => Some(tz.parse::<Tz>()?),
820 None => None,
821 };
822
823 match args.len() {
824 1 => handle::<T, _>(
825 args,
826 move |s| string_to_timestamp_nanos_with_timezone(&tz, s).map(|n| n / factor),
827 name,
828 &Timestamp(T::UNIT, timezone.clone()),
829 ),
830 n if n >= 2 => handle_multiple::<T, _, _>(
831 args,
832 move |s, format| {
833 string_to_timestamp_nanos_formatted_with_timezone(&tz, s, format)
834 },
835 |n| n / factor,
836 name,
837 &Timestamp(T::UNIT, timezone.clone()),
838 ),
839 _ => exec_err!("Unsupported 0 argument count for function {name}"),
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use std::sync::Arc;
846
847 use arrow::array::types::Int64Type;
848 use arrow::array::{
849 Array, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
850 TimestampNanosecondArray, TimestampSecondArray,
851 };
852 use arrow::array::{ArrayRef, Int64Array, StringBuilder};
853 use arrow::datatypes::{Field, TimeUnit};
854 use chrono::{DateTime, FixedOffset, Utc};
855 use datafusion_common::config::ConfigOptions;
856 use datafusion_common::{DataFusionError, ScalarValue, assert_contains};
857 use datafusion_expr::{ScalarFunctionArgs, ScalarFunctionImplementation};
858
859 use super::*;
860
861 fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
862 let timezone: Option<Arc<str>> = Some("UTC".into());
863 to_timestamp_impl::<TimestampNanosecondType>(args, "to_timestamp", &timezone)
864 }
865
866 fn to_timestamp_millis(args: &[ColumnarValue]) -> Result<ColumnarValue> {
868 let timezone: Option<Arc<str>> = Some("UTC".into());
869 to_timestamp_impl::<TimestampMillisecondType>(
870 args,
871 "to_timestamp_millis",
872 &timezone,
873 )
874 }
875
876 fn to_timestamp_micros(args: &[ColumnarValue]) -> Result<ColumnarValue> {
878 let timezone: Option<Arc<str>> = Some("UTC".into());
879 to_timestamp_impl::<TimestampMicrosecondType>(
880 args,
881 "to_timestamp_micros",
882 &timezone,
883 )
884 }
885
886 fn to_timestamp_nanos(args: &[ColumnarValue]) -> Result<ColumnarValue> {
888 let timezone: Option<Arc<str>> = Some("UTC".into());
889 to_timestamp_impl::<TimestampNanosecondType>(
890 args,
891 "to_timestamp_nanos",
892 &timezone,
893 )
894 }
895
896 fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
898 let timezone: Option<Arc<str>> = Some("UTC".into());
899 to_timestamp_impl::<TimestampSecondType>(args, "to_timestamp_seconds", &timezone)
900 }
901
902 fn udfs_and_timeunit() -> Vec<(Box<dyn ScalarUDFImpl>, TimeUnit)> {
903 let udfs: Vec<(Box<dyn ScalarUDFImpl>, TimeUnit)> = vec![
904 (
905 Box::new(ToTimestampFunc::new_with_config(&ConfigOptions::default())),
906 Nanosecond,
907 ),
908 (
909 Box::new(ToTimestampSecondsFunc::new_with_config(
910 &ConfigOptions::default(),
911 )),
912 Second,
913 ),
914 (
915 Box::new(ToTimestampMillisFunc::new_with_config(
916 &ConfigOptions::default(),
917 )),
918 Millisecond,
919 ),
920 (
921 Box::new(ToTimestampMicrosFunc::new_with_config(
922 &ConfigOptions::default(),
923 )),
924 Microsecond,
925 ),
926 (
927 Box::new(ToTimestampNanosFunc::new_with_config(
928 &ConfigOptions::default(),
929 )),
930 Nanosecond,
931 ),
932 ];
933 udfs
934 }
935
936 fn validate_expected_error(
937 options: &mut ConfigOptions,
938 args: ScalarFunctionArgs,
939 expected_err: &str,
940 ) {
941 let udfs = udfs_and_timeunit();
942
943 for (udf, _) in udfs {
944 match udf
945 .with_updated_config(options)
946 .unwrap()
947 .invoke_with_args(args.clone())
948 {
949 Ok(_) => panic!("Expected error but got success"),
950 Err(e) => {
951 assert!(
952 e.to_string().contains(expected_err),
953 "Can not find expected error '{expected_err}'. Actual error '{e}'"
954 );
955 }
956 }
957 }
958 }
959
960 #[test]
961 fn to_timestamp_arrays_and_nulls() -> Result<()> {
962 let mut string_builder = StringBuilder::with_capacity(2, 1024);
965 let mut ts_builder = TimestampNanosecondArray::builder(2);
966
967 string_builder.append_value("2020-09-08T13:42:29.190855");
968 ts_builder.append_value(1599572549190855000);
969
970 string_builder.append_null();
971 ts_builder.append_null();
972 let expected_timestamps = &ts_builder.finish() as &dyn Array;
973
974 let string_array =
975 ColumnarValue::Array(Arc::new(string_builder.finish()) as ArrayRef);
976 let parsed_timestamps = to_timestamp(&[string_array])
977 .expect("that to_timestamp parsed values without error");
978 if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
979 assert_eq!(parsed_array.len(), 2);
980 assert_eq!(expected_timestamps, parsed_array.as_ref());
981 } else {
982 panic!("Expected a columnar array")
983 }
984 Ok(())
985 }
986
987 #[test]
988 fn to_timestamp_with_formats_arrays_and_nulls() -> Result<()> {
989 let mut date_string_builder = StringBuilder::with_capacity(2, 1024);
992 let mut format1_builder = StringBuilder::with_capacity(2, 1024);
993 let mut format2_builder = StringBuilder::with_capacity(2, 1024);
994 let mut format3_builder = StringBuilder::with_capacity(2, 1024);
995 let mut ts_builder = TimestampNanosecondArray::builder(2);
996
997 date_string_builder.append_null();
998 format1_builder.append_null();
999 format2_builder.append_null();
1000 format3_builder.append_null();
1001 ts_builder.append_null();
1002
1003 date_string_builder.append_value("2020-09-08T13:42:29.19085Z");
1004 format1_builder.append_value("%s");
1005 format2_builder.append_value("%c");
1006 format3_builder.append_value("%+");
1007 ts_builder.append_value(1599572549190850000);
1008
1009 let expected_timestamps = &ts_builder.finish() as &dyn Array;
1010
1011 let string_array = [
1012 ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef),
1013 ColumnarValue::Array(Arc::new(format1_builder.finish()) as ArrayRef),
1014 ColumnarValue::Array(Arc::new(format2_builder.finish()) as ArrayRef),
1015 ColumnarValue::Array(Arc::new(format3_builder.finish()) as ArrayRef),
1016 ];
1017 let parsed_timestamps = to_timestamp(&string_array)
1018 .expect("that to_timestamp with format args parsed values without error");
1019 if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
1020 assert_eq!(parsed_array.len(), 2);
1021 assert_eq!(expected_timestamps, parsed_array.as_ref());
1022 } else {
1023 panic!("Expected a columnar array")
1024 }
1025 Ok(())
1026 }
1027
1028 #[test]
1029 fn to_timestamp_respects_execution_timezone() -> Result<()> {
1030 let udfs = udfs_and_timeunit();
1031
1032 let mut options = ConfigOptions::default();
1033 options.execution.time_zone = Some("-05:00".to_string());
1034
1035 let time_zone: Option<Arc<str>> = options
1036 .execution
1037 .time_zone
1038 .as_ref()
1039 .map(|tz| Arc::from(tz.as_str()));
1040
1041 for (udf, time_unit) in udfs {
1042 let field = Field::new("arg", Utf8, true).into();
1043
1044 let args = ScalarFunctionArgs {
1045 args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1046 "2020-09-08T13:42:29".to_string(),
1047 )))],
1048 arg_fields: vec![field],
1049 number_rows: 1,
1050 return_field: Field::new(
1051 "f",
1052 Timestamp(time_unit, Some("-05:00".into())),
1053 true,
1054 )
1055 .into(),
1056 config_options: Arc::new(options.clone()),
1057 };
1058
1059 let result = udf
1060 .with_updated_config(&options.clone())
1061 .unwrap()
1062 .invoke_with_args(args)?;
1063 let result = match time_unit {
1064 Second => {
1065 let ColumnarValue::Scalar(ScalarValue::TimestampSecond(
1066 Some(value),
1067 tz,
1068 )) = result
1069 else {
1070 panic!("expected scalar timestamp");
1071 };
1072
1073 assert_eq!(tz, time_zone);
1074
1075 value
1076 }
1077 Millisecond => {
1078 let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1079 Some(value),
1080 tz,
1081 )) = result
1082 else {
1083 panic!("expected scalar timestamp");
1084 };
1085
1086 assert_eq!(tz, time_zone);
1087
1088 value
1089 }
1090 Microsecond => {
1091 let ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
1092 Some(value),
1093 tz,
1094 )) = result
1095 else {
1096 panic!("expected scalar timestamp");
1097 };
1098
1099 assert_eq!(tz, time_zone);
1100
1101 value
1102 }
1103 Nanosecond => {
1104 let ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1105 Some(value),
1106 tz,
1107 )) = result
1108 else {
1109 panic!("expected scalar timestamp");
1110 };
1111
1112 assert_eq!(tz, time_zone);
1113
1114 value
1115 }
1116 };
1117
1118 let scale = match time_unit {
1119 Second => 1_000_000_000,
1120 Millisecond => 1_000_000,
1121 Microsecond => 1_000,
1122 Nanosecond => 1,
1123 };
1124
1125 let offset = FixedOffset::west_opt(5 * 3600).unwrap();
1126 let result = Some(
1127 DateTime::<Utc>::from_timestamp_nanos(result * scale)
1128 .with_timezone(&offset)
1129 .to_string(),
1130 );
1131
1132 assert_eq!(result, Some("2020-09-08 13:42:29 -05:00".to_string()));
1133 }
1134
1135 Ok(())
1136 }
1137
1138 #[test]
1139 fn to_timestamp_formats_respects_execution_timezone() -> Result<()> {
1140 let udfs = udfs_and_timeunit();
1141
1142 let mut options = ConfigOptions::default();
1143 options.execution.time_zone = Some("-05:00".to_string());
1144
1145 let time_zone: Option<Arc<str>> = options
1146 .execution
1147 .time_zone
1148 .as_ref()
1149 .map(|tz| Arc::from(tz.as_str()));
1150
1151 let expr_field = Field::new("arg", Utf8, true).into();
1152 let format_field: Arc<Field> = Field::new("fmt", Utf8, true).into();
1153
1154 for (udf, time_unit) in udfs {
1155 for (value, format, expected_str) in [
1156 (
1157 "2020-09-08 09:42:29 -05:00",
1158 "%Y-%m-%d %H:%M:%S %z",
1159 Some("2020-09-08 09:42:29 -05:00"),
1160 ),
1161 (
1162 "2020-09-08T13:42:29Z",
1163 "%+",
1164 Some("2020-09-08 08:42:29 -05:00"),
1165 ),
1166 (
1167 "2020-09-08 13:42:29 UTC",
1168 "%Y-%m-%d %H:%M:%S %Z",
1169 Some("2020-09-08 08:42:29 -05:00"),
1170 ),
1171 (
1172 "+0000 2024-01-01 12:00:00",
1173 "%z %Y-%m-%d %H:%M:%S",
1174 Some("2024-01-01 07:00:00 -05:00"),
1175 ),
1176 (
1177 "20200908134229+0100",
1178 "%Y%m%d%H%M%S%z",
1179 Some("2020-09-08 07:42:29 -05:00"),
1180 ),
1181 (
1182 "2020-09-08+0230 13:42",
1183 "%Y-%m-%d%z %H:%M",
1184 Some("2020-09-08 06:12:00 -05:00"),
1185 ),
1186 ] {
1187 let args = ScalarFunctionArgs {
1188 args: vec![
1189 ColumnarValue::Scalar(ScalarValue::Utf8(Some(value.to_string()))),
1190 ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1191 format.to_string(),
1192 ))),
1193 ],
1194 arg_fields: vec![Arc::clone(&expr_field), Arc::clone(&format_field)],
1195 number_rows: 1,
1196 return_field: Field::new(
1197 "f",
1198 Timestamp(time_unit, Some("-05:00".into())),
1199 true,
1200 )
1201 .into(),
1202 config_options: Arc::new(options.clone()),
1203 };
1204 let result = udf
1205 .with_updated_config(&options.clone())
1206 .unwrap()
1207 .invoke_with_args(args)?;
1208 let result = match time_unit {
1209 Second => {
1210 let ColumnarValue::Scalar(ScalarValue::TimestampSecond(
1211 Some(value),
1212 tz,
1213 )) = result
1214 else {
1215 panic!("expected scalar timestamp");
1216 };
1217
1218 assert_eq!(tz, time_zone);
1219
1220 value
1221 }
1222 Millisecond => {
1223 let ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
1224 Some(value),
1225 tz,
1226 )) = result
1227 else {
1228 panic!("expected scalar timestamp");
1229 };
1230
1231 assert_eq!(tz, time_zone);
1232
1233 value
1234 }
1235 Microsecond => {
1236 let ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
1237 Some(value),
1238 tz,
1239 )) = result
1240 else {
1241 panic!("expected scalar timestamp");
1242 };
1243
1244 assert_eq!(tz, time_zone);
1245
1246 value
1247 }
1248 Nanosecond => {
1249 let ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1250 Some(value),
1251 tz,
1252 )) = result
1253 else {
1254 panic!("expected scalar timestamp");
1255 };
1256
1257 assert_eq!(tz, time_zone);
1258
1259 value
1260 }
1261 };
1262
1263 let scale = match time_unit {
1264 Second => 1_000_000_000,
1265 Millisecond => 1_000_000,
1266 Microsecond => 1_000,
1267 Nanosecond => 1,
1268 };
1269 let offset = FixedOffset::west_opt(5 * 3600).unwrap();
1270 let result = Some(
1271 DateTime::<Utc>::from_timestamp_nanos(result * scale)
1272 .with_timezone(&offset)
1273 .to_string(),
1274 );
1275
1276 assert_eq!(result, expected_str.map(|s| s.to_string()));
1277 }
1278 }
1279
1280 Ok(())
1281 }
1282
1283 #[test]
1284 fn to_timestamp_invalid_execution_timezone_behavior() -> Result<()> {
1285 let field: Arc<Field> = Field::new("arg", Utf8, true).into();
1286 let return_field: Arc<Field> =
1287 Field::new("f", Timestamp(Nanosecond, None), true).into();
1288
1289 let mut options = ConfigOptions::default();
1290 options.execution.time_zone = Some("Invalid/Timezone".to_string());
1291
1292 let args = ScalarFunctionArgs {
1293 args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1294 "2020-09-08T13:42:29Z".to_string(),
1295 )))],
1296 arg_fields: vec![Arc::clone(&field)],
1297 number_rows: 1,
1298 return_field: Arc::clone(&return_field),
1299 config_options: Arc::new(options.clone()),
1300 };
1301
1302 let expected_err =
1303 "Invalid timezone \"Invalid/Timezone\": failed to parse timezone";
1304
1305 validate_expected_error(&mut options, args, expected_err);
1306
1307 Ok(())
1308 }
1309
1310 #[test]
1311 fn to_timestamp_formats_invalid_execution_timezone_behavior() -> Result<()> {
1312 let expr_field: Arc<Field> = Field::new("arg", Utf8, true).into();
1313 let format_field: Arc<Field> = Field::new("fmt", Utf8, true).into();
1314 let return_field: Arc<Field> =
1315 Field::new("f", Timestamp(Nanosecond, None), true).into();
1316
1317 let mut options = ConfigOptions::default();
1318 options.execution.time_zone = Some("Invalid/Timezone".to_string());
1319
1320 let expected_err =
1321 "Invalid timezone \"Invalid/Timezone\": failed to parse timezone";
1322
1323 let make_args = |value: &str, format: &str| ScalarFunctionArgs {
1324 args: vec![
1325 ColumnarValue::Scalar(ScalarValue::Utf8(Some(value.to_string()))),
1326 ColumnarValue::Scalar(ScalarValue::Utf8(Some(format.to_string()))),
1327 ],
1328 arg_fields: vec![Arc::clone(&expr_field), Arc::clone(&format_field)],
1329 number_rows: 1,
1330 return_field: Arc::clone(&return_field),
1331 config_options: Arc::new(options.clone()),
1332 };
1333
1334 for (value, format, _expected_str) in [
1335 (
1336 "2020-09-08 09:42:29 -05:00",
1337 "%Y-%m-%d %H:%M:%S %z",
1338 Some("2020-09-08 09:42:29 -05:00"),
1339 ),
1340 (
1341 "2020-09-08T13:42:29Z",
1342 "%+",
1343 Some("2020-09-08 08:42:29 -05:00"),
1344 ),
1345 (
1346 "2020-09-08 13:42:29 +0000",
1347 "%Y-%m-%d %H:%M:%S %z",
1348 Some("2020-09-08 08:42:29 -05:00"),
1349 ),
1350 (
1351 "+0000 2024-01-01 12:00:00",
1352 "%z %Y-%m-%d %H:%M:%S",
1353 Some("2024-01-01 07:00:00 -05:00"),
1354 ),
1355 (
1356 "20200908134229+0100",
1357 "%Y%m%d%H%M%S%z",
1358 Some("2020-09-08 07:42:29 -05:00"),
1359 ),
1360 (
1361 "2020-09-08+0230 13:42",
1362 "%Y-%m-%d%z %H:%M",
1363 Some("2020-09-08 06:12:00 -05:00"),
1364 ),
1365 ] {
1366 let args = make_args(value, format);
1367 validate_expected_error(&mut options.clone(), args, expected_err);
1368 }
1369
1370 let args = ScalarFunctionArgs {
1371 args: vec![
1372 ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1373 "2020-09-08T13:42:29".to_string(),
1374 ))),
1375 ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1376 "%Y-%m-%dT%H:%M:%S".to_string(),
1377 ))),
1378 ],
1379 arg_fields: vec![Arc::clone(&expr_field), Arc::clone(&format_field)],
1380 number_rows: 1,
1381 return_field: Arc::clone(&return_field),
1382 config_options: Arc::new(options.clone()),
1383 };
1384
1385 validate_expected_error(&mut options.clone(), args, expected_err);
1386
1387 Ok(())
1388 }
1389
1390 #[test]
1391 fn to_timestamp_invalid_input_type() -> Result<()> {
1392 let mut builder = Int64Array::builder(1);
1396 builder.append_value(1);
1397 let int64array = ColumnarValue::Array(Arc::new(builder.finish()));
1398
1399 let expected_err =
1400 "Execution error: Unsupported data type Int64 for function to_timestamp";
1401 match to_timestamp(&[int64array]) {
1402 Ok(_) => panic!("Expected error but got success"),
1403 Err(e) => {
1404 assert!(
1405 e.to_string().contains(expected_err),
1406 "Can not find expected error '{expected_err}'. Actual error '{e}'"
1407 );
1408 }
1409 }
1410 Ok(())
1411 }
1412
1413 #[test]
1414 fn to_timestamp_with_formats_invalid_input_type() -> Result<()> {
1415 let mut builder = Int64Array::builder(1);
1419 builder.append_value(1);
1420 let int64array = [
1421 ColumnarValue::Array(Arc::new(builder.finish())),
1422 ColumnarValue::Array(Arc::new(builder.finish())),
1423 ];
1424
1425 let expected_err =
1426 "Execution error: Unsupported data type Int64 for function to_timestamp";
1427 match to_timestamp(&int64array) {
1428 Ok(_) => panic!("Expected error but got success"),
1429 Err(e) => {
1430 assert!(
1431 e.to_string().contains(expected_err),
1432 "Can not find expected error '{expected_err}'. Actual error '{e}'"
1433 );
1434 }
1435 }
1436 Ok(())
1437 }
1438
1439 #[test]
1440 fn to_timestamp_with_unparsable_data() -> Result<()> {
1441 let mut date_string_builder = StringBuilder::with_capacity(2, 1024);
1442
1443 date_string_builder.append_null();
1444
1445 date_string_builder.append_value("2020-09-08 - 13:42:29.19085Z");
1446
1447 let string_array =
1448 ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef);
1449
1450 let expected_err = "Arrow error: Parser error: Error parsing timestamp from '2020-09-08 - 13:42:29.19085Z': error parsing time";
1451 match to_timestamp(&[string_array]) {
1452 Ok(_) => panic!("Expected error but got success"),
1453 Err(e) => {
1454 assert!(
1455 e.to_string().contains(expected_err),
1456 "Can not find expected error '{expected_err}'. Actual error '{e}'"
1457 );
1458 }
1459 }
1460 Ok(())
1461 }
1462
1463 #[test]
1464 fn to_timestamp_with_invalid_tz() -> Result<()> {
1465 let mut date_string_builder = StringBuilder::with_capacity(2, 1024);
1466
1467 date_string_builder.append_null();
1468
1469 date_string_builder.append_value("2020-09-08T13:42:29ZZ");
1470
1471 let string_array =
1472 ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef);
1473
1474 let expected_err = "Arrow error: Parser error: Invalid timezone \"ZZ\": failed to parse timezone";
1475 match to_timestamp(&[string_array]) {
1476 Ok(_) => panic!("Expected error but got success"),
1477 Err(e) => {
1478 assert!(
1479 e.to_string().contains(expected_err),
1480 "Can not find expected error '{expected_err}'. Actual error '{e}'"
1481 );
1482 }
1483 }
1484 Ok(())
1485 }
1486
1487 #[test]
1488 fn to_timestamp_with_no_matching_formats() -> Result<()> {
1489 let mut date_string_builder = StringBuilder::with_capacity(2, 1024);
1490 let mut format1_builder = StringBuilder::with_capacity(2, 1024);
1491 let mut format2_builder = StringBuilder::with_capacity(2, 1024);
1492 let mut format3_builder = StringBuilder::with_capacity(2, 1024);
1493
1494 date_string_builder.append_null();
1495 format1_builder.append_null();
1496 format2_builder.append_null();
1497 format3_builder.append_null();
1498
1499 date_string_builder.append_value("2020-09-08T13:42:29.19085Z");
1500 format1_builder.append_value("%s");
1501 format2_builder.append_value("%c");
1502 format3_builder.append_value("%H:%M:%S");
1503
1504 let string_array = [
1505 ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef),
1506 ColumnarValue::Array(Arc::new(format1_builder.finish()) as ArrayRef),
1507 ColumnarValue::Array(Arc::new(format2_builder.finish()) as ArrayRef),
1508 ColumnarValue::Array(Arc::new(format3_builder.finish()) as ArrayRef),
1509 ];
1510
1511 let expected_err = "Execution error: Error parsing timestamp from '2020-09-08T13:42:29.19085Z' using format '%H:%M:%S': input contains invalid characters";
1512 match to_timestamp(&string_array) {
1513 Ok(_) => panic!("Expected error but got success"),
1514 Err(e) => {
1515 assert!(
1516 e.to_string().contains(expected_err),
1517 "Can not find expected error '{expected_err}'. Actual error '{e}'"
1518 );
1519 }
1520 }
1521 Ok(())
1522 }
1523
1524 #[test]
1525 fn string_to_timestamp_formatted() {
1526 assert_eq!(
1528 1599572549190855000,
1529 parse_timestamp_formatted("2020-09-08T13:42:29.190855+00:00", "%+").unwrap()
1530 );
1531 assert_eq!(
1532 1599572549190855000,
1533 parse_timestamp_formatted("2020-09-08T13:42:29.190855Z", "%+").unwrap()
1534 );
1535 assert_eq!(
1536 1599572549000000000,
1537 parse_timestamp_formatted("2020-09-08T13:42:29Z", "%+").unwrap()
1538 ); assert_eq!(
1540 1599590549190855000,
1541 parse_timestamp_formatted("2020-09-08T13:42:29.190855-05:00", "%+").unwrap()
1542 );
1543 assert_eq!(
1544 1599590549000000000,
1545 parse_timestamp_formatted("1599590549", "%s").unwrap()
1546 );
1547 assert_eq!(
1548 1599572549000000000,
1549 parse_timestamp_formatted("09-08-2020 13/42/29", "%m-%d-%Y %H/%M/%S")
1550 .unwrap()
1551 );
1552 assert_eq!(
1553 1642896000000000000,
1554 parse_timestamp_formatted("2022-01-23", "%Y-%m-%d").unwrap()
1555 );
1556 }
1557
1558 fn parse_timestamp_formatted(s: &str, format: &str) -> Result<i64, DataFusionError> {
1559 let result = string_to_timestamp_nanos_formatted_with_timezone(
1560 &Some("UTC".parse()?),
1561 s,
1562 format,
1563 );
1564 if let Err(e) = &result {
1565 eprintln!("Error parsing timestamp '{s}' using format '{format}': {e:?}");
1566 }
1567 result
1568 }
1569
1570 #[test]
1571 fn string_to_timestamp_formatted_invalid() {
1572 let cases = [
1574 ("", "%Y%m%d %H%M%S", "premature end of input"),
1575 ("SS", "%c", "premature end of input"),
1576 ("Wed, 18 Feb 2015 23:16:09 GMT", "", "trailing input"),
1577 (
1578 "Wed, 18 Feb 2015 23:16:09 GMT",
1579 "%XX",
1580 "input contains invalid characters",
1581 ),
1582 (
1583 "Wed, 18 Feb 2015 23:16:09 GMT",
1584 "%Y%m%d %H%M%S",
1585 "input contains invalid characters",
1586 ),
1587 ];
1588
1589 for (s, f, ctx) in cases {
1590 let expected = format!(
1591 "Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"
1592 );
1593 let actual = string_to_datetime_formatted(&Utc, s, f)
1594 .unwrap_err()
1595 .strip_backtrace();
1596 assert_eq!(actual, expected)
1597 }
1598 }
1599
1600 #[test]
1601 fn string_to_timestamp_invalid_arguments() {
1602 let cases = [
1604 ("", "%Y%m%d %H%M%S", "premature end of input"),
1605 ("SS", "%c", "premature end of input"),
1606 ("Wed, 18 Feb 2015 23:16:09 GMT", "", "trailing input"),
1607 (
1608 "Wed, 18 Feb 2015 23:16:09 GMT",
1609 "%XX",
1610 "input contains invalid characters",
1611 ),
1612 (
1613 "Wed, 18 Feb 2015 23:16:09 GMT",
1614 "%Y%m%d %H%M%S",
1615 "input contains invalid characters",
1616 ),
1617 ];
1618
1619 for (s, f, ctx) in cases {
1620 let expected = format!(
1621 "Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"
1622 );
1623 let actual = string_to_datetime_formatted(&Utc, s, f)
1624 .unwrap_err()
1625 .strip_backtrace();
1626 assert_eq!(actual, expected)
1627 }
1628 }
1629
1630 #[test]
1631 fn test_no_tz() {
1632 let udfs: Vec<Box<dyn ScalarUDFImpl>> = vec![
1633 Box::new(ToTimestampFunc::new_with_config(&ConfigOptions::default())),
1634 Box::new(ToTimestampSecondsFunc::new_with_config(
1635 &ConfigOptions::default(),
1636 )),
1637 Box::new(ToTimestampMillisFunc::new_with_config(
1638 &ConfigOptions::default(),
1639 )),
1640 Box::new(ToTimestampNanosFunc::new_with_config(
1641 &ConfigOptions::default(),
1642 )),
1643 Box::new(ToTimestampSecondsFunc::new_with_config(
1644 &ConfigOptions::default(),
1645 )),
1646 ];
1647
1648 let mut nanos_builder = TimestampNanosecondArray::builder(2);
1649 let mut millis_builder = TimestampMillisecondArray::builder(2);
1650 let mut micros_builder = TimestampMicrosecondArray::builder(2);
1651 let mut sec_builder = TimestampSecondArray::builder(2);
1652
1653 nanos_builder.append_value(1599572549190850000);
1654 millis_builder.append_value(1599572549190);
1655 micros_builder.append_value(1599572549190850);
1656 sec_builder.append_value(1599572549);
1657
1658 let nanos_timestamps =
1659 Arc::new(nanos_builder.finish().with_timezone("UTC")) as ArrayRef;
1660 let millis_timestamps =
1661 Arc::new(millis_builder.finish().with_timezone("UTC")) as ArrayRef;
1662 let micros_timestamps =
1663 Arc::new(micros_builder.finish().with_timezone("UTC")) as ArrayRef;
1664 let sec_timestamps =
1665 Arc::new(sec_builder.finish().with_timezone("UTC")) as ArrayRef;
1666
1667 let arrays = &[
1668 ColumnarValue::Array(Arc::clone(&nanos_timestamps)),
1669 ColumnarValue::Array(Arc::clone(&millis_timestamps)),
1670 ColumnarValue::Array(Arc::clone(µs_timestamps)),
1671 ColumnarValue::Array(Arc::clone(&sec_timestamps)),
1672 ];
1673
1674 for udf in &udfs {
1675 for array in arrays {
1676 let rt = udf.return_type(&[array.data_type()]).unwrap();
1677 let arg_field = Field::new("arg", array.data_type().clone(), true).into();
1678 assert!(matches!(rt, Timestamp(_, None)));
1679 let args = ScalarFunctionArgs {
1680 args: vec![array.clone()],
1681 arg_fields: vec![arg_field],
1682 number_rows: 4,
1683 return_field: Field::new("f", rt, true).into(),
1684 config_options: Arc::new(ConfigOptions::default()),
1685 };
1686 let res = udf
1687 .invoke_with_args(args)
1688 .expect("that to_timestamp parsed values without error");
1689 let array = match res {
1690 ColumnarValue::Array(res) => res,
1691 _ => panic!("Expected a columnar array"),
1692 };
1693 let ty = array.data_type();
1694 assert!(matches!(ty, Timestamp(_, None)));
1695 }
1696 }
1697
1698 let mut nanos_builder = TimestampNanosecondArray::builder(2);
1699 let mut millis_builder = TimestampMillisecondArray::builder(2);
1700 let mut micros_builder = TimestampMicrosecondArray::builder(2);
1701 let mut sec_builder = TimestampSecondArray::builder(2);
1702 let mut i64_builder = Int64Array::builder(2);
1703
1704 nanos_builder.append_value(1599572549190850000);
1705 millis_builder.append_value(1599572549190);
1706 micros_builder.append_value(1599572549190850);
1707 sec_builder.append_value(1599572549);
1708 i64_builder.append_value(1599572549);
1709
1710 let nanos_timestamps = Arc::new(nanos_builder.finish()) as ArrayRef;
1711 let millis_timestamps = Arc::new(millis_builder.finish()) as ArrayRef;
1712 let micros_timestamps = Arc::new(micros_builder.finish()) as ArrayRef;
1713 let sec_timestamps = Arc::new(sec_builder.finish()) as ArrayRef;
1714 let i64_timestamps = Arc::new(i64_builder.finish()) as ArrayRef;
1715
1716 let arrays = &[
1717 ColumnarValue::Array(Arc::clone(&nanos_timestamps)),
1718 ColumnarValue::Array(Arc::clone(&millis_timestamps)),
1719 ColumnarValue::Array(Arc::clone(µs_timestamps)),
1720 ColumnarValue::Array(Arc::clone(&sec_timestamps)),
1721 ColumnarValue::Array(Arc::clone(&i64_timestamps)),
1722 ];
1723
1724 for udf in &udfs {
1725 for array in arrays {
1726 let rt = udf.return_type(&[array.data_type()]).unwrap();
1727 assert!(matches!(rt, Timestamp(_, None)));
1728 let arg_field = Field::new("arg", array.data_type().clone(), true).into();
1729 let args = ScalarFunctionArgs {
1730 args: vec![array.clone()],
1731 arg_fields: vec![arg_field],
1732 number_rows: 5,
1733 return_field: Field::new("f", rt, true).into(),
1734 config_options: Arc::new(ConfigOptions::default()),
1735 };
1736 let res = udf
1737 .invoke_with_args(args)
1738 .expect("that to_timestamp parsed values without error");
1739 let array = match res {
1740 ColumnarValue::Array(res) => res,
1741 _ => panic!("Expected a columnar array"),
1742 };
1743 let ty = array.data_type();
1744 assert!(matches!(ty, Timestamp(_, None)));
1745 }
1746 }
1747 }
1748
1749 #[test]
1750 fn test_to_timestamp_arg_validation() {
1751 let mut date_string_builder = StringBuilder::with_capacity(2, 1024);
1752 date_string_builder.append_value("2020-09-08T13:42:29.19085Z");
1753
1754 let data = date_string_builder.finish();
1755
1756 let funcs: Vec<(ScalarFunctionImplementation, TimeUnit)> = vec![
1757 (Arc::new(to_timestamp), Nanosecond),
1758 (Arc::new(to_timestamp_micros), Microsecond),
1759 (Arc::new(to_timestamp_millis), Millisecond),
1760 (Arc::new(to_timestamp_nanos), Nanosecond),
1761 (Arc::new(to_timestamp_seconds), Second),
1762 ];
1763
1764 let mut nanos_builder = TimestampNanosecondArray::builder(2);
1765 let mut millis_builder = TimestampMillisecondArray::builder(2);
1766 let mut micros_builder = TimestampMicrosecondArray::builder(2);
1767 let mut sec_builder = TimestampSecondArray::builder(2);
1768
1769 nanos_builder.append_value(1599572549190850000);
1770 millis_builder.append_value(1599572549190);
1771 micros_builder.append_value(1599572549190850);
1772 sec_builder.append_value(1599572549);
1773
1774 let nanos_expected_timestamps = &nanos_builder.finish() as &dyn Array;
1775 let millis_expected_timestamps = &millis_builder.finish() as &dyn Array;
1776 let micros_expected_timestamps = µs_builder.finish() as &dyn Array;
1777 let sec_expected_timestamps = &sec_builder.finish() as &dyn Array;
1778
1779 for (func, time_unit) in funcs {
1780 let string_array = [
1782 ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef),
1783 ColumnarValue::Scalar(ScalarValue::Utf8(Some("%s".to_string()))),
1784 ColumnarValue::Scalar(ScalarValue::Utf8(Some("%c".to_string()))),
1785 ColumnarValue::Scalar(ScalarValue::Utf8(Some("%+".to_string()))),
1786 ];
1787 let parsed_timestamps = func(&string_array)
1788 .expect("that to_timestamp with format args parsed values without error");
1789 if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
1790 assert_eq!(parsed_array.len(), 1);
1791 match time_unit {
1792 Nanosecond => {
1793 assert_eq!(nanos_expected_timestamps, parsed_array.as_ref())
1794 }
1795 Millisecond => {
1796 assert_eq!(millis_expected_timestamps, parsed_array.as_ref())
1797 }
1798 Microsecond => {
1799 assert_eq!(micros_expected_timestamps, parsed_array.as_ref())
1800 }
1801 Second => {
1802 assert_eq!(sec_expected_timestamps, parsed_array.as_ref())
1803 }
1804 };
1805 } else {
1806 panic!("Expected a columnar array")
1807 }
1808
1809 let string_array = [
1811 ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef),
1812 ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("%s".to_string()))),
1813 ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("%c".to_string()))),
1814 ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("%+".to_string()))),
1815 ];
1816 let parsed_timestamps = func(&string_array)
1817 .expect("that to_timestamp with format args parsed values without error");
1818 if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
1819 assert_eq!(parsed_array.len(), 1);
1820 assert!(matches!(parsed_array.data_type(), Timestamp(_, None)));
1821
1822 match time_unit {
1823 Nanosecond => {
1824 assert_eq!(nanos_expected_timestamps, parsed_array.as_ref())
1825 }
1826 Millisecond => {
1827 assert_eq!(millis_expected_timestamps, parsed_array.as_ref())
1828 }
1829 Microsecond => {
1830 assert_eq!(micros_expected_timestamps, parsed_array.as_ref())
1831 }
1832 Second => {
1833 assert_eq!(sec_expected_timestamps, parsed_array.as_ref())
1834 }
1835 };
1836 } else {
1837 panic!("Expected a columnar array")
1838 }
1839
1840 let string_array = [
1842 ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef),
1843 ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
1844 ColumnarValue::Scalar(ScalarValue::Int32(Some(2))),
1845 ColumnarValue::Scalar(ScalarValue::Int32(Some(3))),
1846 ];
1847
1848 let expected = "Unsupported data type Int32 for function".to_string();
1849 let actual = func(&string_array).unwrap_err().to_string();
1850 assert_contains!(actual, expected);
1851
1852 let string_array = [
1854 ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef),
1855 ColumnarValue::Array(Arc::new(PrimitiveArray::<Int64Type>::new(
1856 vec![1i64].into(),
1857 None,
1858 )) as ArrayRef),
1859 ];
1860
1861 let expected = "Unsupported data type".to_string();
1862 let actual = func(&string_array).unwrap_err().to_string();
1863 assert_contains!(actual, expected);
1864 }
1865 }
1866
1867 #[test]
1868 fn test_decimal_to_nanoseconds_negative_scale() {
1869 let nanos = decimal_to_nanoseconds(5, -2);
1871 assert_eq!(nanos, 500_000_000_000); let nanos = decimal_to_nanoseconds(10, -1);
1875 assert_eq!(nanos, 100_000_000_000);
1876
1877 let nanos = decimal_to_nanoseconds(5, 0);
1879 assert_eq!(nanos, 5_000_000_000);
1880
1881 let nanos = decimal_to_nanoseconds(1500, 3);
1883 assert_eq!(nanos, 1_500_000_000);
1884 }
1885}