1use std::any::Any;
19use std::str::FromStr;
20use std::sync::Arc;
21
22use arrow::array::{Array, ArrayRef, Float64Array, Int32Array};
23use arrow::compute::kernels::cast_utils::IntervalUnit;
24use arrow::compute::{binary, date_part, DatePart};
25use arrow::datatypes::DataType::{
26 Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
27};
28use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
29use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
30use datafusion_common::types::{logical_date, NativeType};
31
32use datafusion_common::{
33 cast::{
34 as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array,
35 as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
36 as_timestamp_microsecond_array, as_timestamp_millisecond_array,
37 as_timestamp_nanosecond_array, as_timestamp_second_array,
38 },
39 exec_err, internal_err, not_impl_err,
40 types::logical_string,
41 utils::take_function_args,
42 Result, ScalarValue,
43};
44use datafusion_expr::{
45 ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
46 TypeSignature, Volatility,
47};
48use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
49use datafusion_macros::user_doc;
50
51#[user_doc(
52 doc_section(label = "Time and Date Functions"),
53 description = "Returns the specified part of the date as an integer.",
54 syntax_example = "date_part(part, expression)",
55 alternative_syntax = "extract(field FROM source)",
56 argument(
57 name = "part",
58 description = r#"Part of the date to return. The following date parts are supported:
59
60 - year
61 - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
62 - month
63 - week (week of the year)
64 - day (day of the month)
65 - hour
66 - minute
67 - second
68 - millisecond
69 - microsecond
70 - nanosecond
71 - dow (day of the week where Sunday is 0)
72 - doy (day of the year)
73 - epoch (seconds since Unix epoch)
74 - isodow (day of the week where Monday is 0)
75"#
76 ),
77 argument(
78 name = "expression",
79 description = "Time expression to operate on. Can be a constant, column, or function."
80 )
81)]
82#[derive(Debug, PartialEq, Eq, Hash)]
83pub struct DatePartFunc {
84 signature: Signature,
85 aliases: Vec<String>,
86}
87
88impl Default for DatePartFunc {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl DatePartFunc {
95 pub fn new() -> Self {
96 Self {
97 signature: Signature::one_of(
98 vec![
99 TypeSignature::Coercible(vec![
100 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
101 Coercion::new_implicit(
102 TypeSignatureClass::Timestamp,
103 vec![TypeSignatureClass::Native(logical_string())],
105 NativeType::Timestamp(Nanosecond, None),
106 ),
107 ]),
108 TypeSignature::Coercible(vec![
109 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
110 Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
111 ]),
112 TypeSignature::Coercible(vec![
113 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
114 Coercion::new_exact(TypeSignatureClass::Time),
115 ]),
116 TypeSignature::Coercible(vec![
117 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
118 Coercion::new_exact(TypeSignatureClass::Interval),
119 ]),
120 TypeSignature::Coercible(vec![
121 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
122 Coercion::new_exact(TypeSignatureClass::Duration),
123 ]),
124 ],
125 Volatility::Immutable,
126 ),
127 aliases: vec![String::from("datepart")],
128 }
129 }
130}
131
132impl ScalarUDFImpl for DatePartFunc {
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136
137 fn name(&self) -> &str {
138 "date_part"
139 }
140
141 fn signature(&self) -> &Signature {
142 &self.signature
143 }
144
145 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
146 internal_err!("return_field_from_args should be called instead")
147 }
148
149 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
150 let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
151
152 field
153 .and_then(|sv| {
154 sv.try_as_str()
155 .flatten()
156 .filter(|s| !s.is_empty())
157 .map(|part| {
158 if is_epoch(part) {
159 Field::new(self.name(), DataType::Float64, true)
160 } else {
161 Field::new(self.name(), DataType::Int32, true)
162 }
163 })
164 })
165 .map(Arc::new)
166 .map_or_else(
167 || exec_err!("{} requires non-empty constant string", self.name()),
168 Ok,
169 )
170 }
171
172 fn invoke_with_args(
173 &self,
174 args: datafusion_expr::ScalarFunctionArgs,
175 ) -> Result<ColumnarValue> {
176 let args = args.args;
177 let [part, array] = take_function_args(self.name(), args)?;
178
179 let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
180 v
181 } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
182 v
183 } else {
184 return exec_err!(
185 "First argument of `DATE_PART` must be non-null scalar Utf8"
186 );
187 };
188
189 let is_scalar = matches!(array, ColumnarValue::Scalar(_));
190
191 let array = match array {
192 ColumnarValue::Array(array) => Arc::clone(&array),
193 ColumnarValue::Scalar(scalar) => scalar.to_array()?,
194 };
195
196 let part_trim = part_normalization(&part);
197
198 let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
201 match interval_unit {
202 IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
203 IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
204 IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
205 IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
206 IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
207 IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
208 IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
209 IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
210 IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
211 IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
212 _ => return exec_err!("Date part '{part}' not supported"),
214 }
215 } else {
216 match part_trim.to_lowercase().as_str() {
218 "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
219 "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
220 "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
221 "isodow" => date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?,
222 "epoch" => epoch(array.as_ref())?,
223 _ => return exec_err!("Date part '{part}' not supported"),
224 }
225 };
226
227 Ok(if is_scalar {
228 ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
229 } else {
230 ColumnarValue::Array(arr)
231 })
232 }
233
234 fn aliases(&self) -> &[String] {
235 &self.aliases
236 }
237
238 fn documentation(&self) -> Option<&Documentation> {
239 self.doc()
240 }
241}
242
243fn is_epoch(part: &str) -> bool {
244 let part = part_normalization(part);
245 matches!(part.to_lowercase().as_str(), "epoch")
246}
247
248fn part_normalization(part: &str) -> &str {
250 part.strip_prefix(|c| c == '\'' || c == '\"')
251 .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
252 .unwrap_or(part)
253}
254
255fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
259 if unit == Nanosecond {
262 return not_impl_err!("Date part {unit:?} not supported");
263 }
264
265 let conversion_factor = match unit {
266 Second => 1_000_000_000,
267 Millisecond => 1_000_000,
268 Microsecond => 1_000,
269 Nanosecond => 1,
270 };
271
272 let second_factor = match unit {
273 Second => 1,
274 Millisecond => 1_000,
275 Microsecond => 1_000_000,
276 Nanosecond => 1_000_000_000,
277 };
278
279 let secs = date_part(array, DatePart::Second)?;
280 let secs = as_int32_array(secs.as_ref())?;
282 let subsecs = date_part(array, DatePart::Nanosecond)?;
283 let subsecs = as_int32_array(subsecs.as_ref())?;
284
285 if subsecs.null_count() == 0 {
287 let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
288 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
289 })?;
290 Ok(Arc::new(r))
291 } else {
292 let r: Int32Array = secs
295 .iter()
296 .zip(subsecs)
297 .map(|(secs, subsecs)| {
298 secs.map(|secs| {
299 let subsecs = subsecs.unwrap_or(0);
300 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
301 })
302 })
303 .collect();
304 Ok(Arc::new(r))
305 }
306}
307
308fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
314 let sf = match unit {
315 Second => 1_f64,
316 Millisecond => 1_000_f64,
317 Microsecond => 1_000_000_f64,
318 Nanosecond => 1_000_000_000_f64,
319 };
320 let secs = date_part(array, DatePart::Second)?;
321 let secs = as_int32_array(secs.as_ref())?;
323 let subsecs = date_part(array, DatePart::Nanosecond)?;
324 let subsecs = as_int32_array(subsecs.as_ref())?;
325
326 if subsecs.null_count() == 0 {
328 let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
329 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
330 })?;
331 Ok(Arc::new(r))
332 } else {
333 let r: Float64Array = secs
336 .iter()
337 .zip(subsecs)
338 .map(|(secs, subsecs)| {
339 secs.map(|secs| {
340 let subsecs = subsecs.unwrap_or(0);
341 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
342 * sf
343 })
344 })
345 .collect();
346 Ok(Arc::new(r))
347 }
348}
349
350fn epoch(array: &dyn Array) -> Result<ArrayRef> {
351 const SECONDS_IN_A_DAY: f64 = 86400_f64;
352
353 let f: Float64Array = match array.data_type() {
354 Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
355 Timestamp(Millisecond, _) => {
356 as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
357 }
358 Timestamp(Microsecond, _) => {
359 as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
360 }
361 Timestamp(Nanosecond, _) => {
362 as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
363 }
364 Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
365 Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
366 Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
367 Time32(Millisecond) => {
368 as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
369 }
370 Time64(Microsecond) => {
371 as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
372 }
373 Time64(Nanosecond) => {
374 as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
375 }
376 Interval(_) | Duration(_) => return seconds(array, Second),
377 d => return exec_err!("Cannot convert {d:?} to epoch"),
378 };
379 Ok(Arc::new(f))
380}