datafusion_functions_nested/
range.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for range and gen_series functions.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{
22    builder::{Date32Builder, TimestampNanosecondBuilder},
23    temporal_conversions::as_datetime_with_timezone,
24    timezone::Tz,
25    types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType as TSNT},
26    Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullArray, NullBufferBuilder,
27    TimestampNanosecondArray,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{
31    DataType, DataType::*, Field, IntervalUnit::MonthDayNano, TimeUnit::Nanosecond,
32};
33use datafusion_common::cast::{
34    as_date32_array, as_int64_array, as_interval_mdn_array, as_timestamp_nanosecond_array,
35};
36use datafusion_common::{
37    exec_datafusion_err, exec_err, internal_err, not_impl_datafusion_err,
38    utils::take_function_args, Result,
39};
40use datafusion_expr::{
41    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
42};
43use datafusion_macros::user_doc;
44use itertools::Itertools;
45use std::any::Any;
46use std::cmp::Ordering;
47use std::iter::from_fn;
48use std::str::FromStr;
49use std::sync::Arc;
50
51make_udf_expr_and_func!(
52    Range,
53    range,
54    start stop step,
55    "create a list of values in the range between start and stop",
56    range_udf
57);
58
59#[user_doc(
60    doc_section(label = "Array Functions"),
61    description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
62    syntax_example = "range(start, stop, step)",
63    sql_example = r#"```sql
64> select range(2, 10, 3);
65+-----------------------------------+
66| range(Int64(2),Int64(10),Int64(3))|
67+-----------------------------------+
68| [2, 5, 8]                         |
69+-----------------------------------+
70
71> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
72+--------------------------------------------------------------+
73| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH) |
74+--------------------------------------------------------------+
75| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
76+--------------------------------------------------------------+
77```"#,
78    argument(
79        name = "start",
80        description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
81    ),
82    argument(
83        name = "end",
84        description = "End of the range (not included). Type must be the same as start."
85    ),
86    argument(
87        name = "step",
88        description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
89    )
90)]
91#[derive(Debug)]
92pub struct Range {
93    signature: Signature,
94    aliases: Vec<String>,
95}
96
97impl Default for Range {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102impl Range {
103    pub fn new() -> Self {
104        Self {
105            signature: Signature::user_defined(Volatility::Immutable),
106            aliases: vec![],
107        }
108    }
109}
110impl ScalarUDFImpl for Range {
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114    fn name(&self) -> &str {
115        "range"
116    }
117
118    fn signature(&self) -> &Signature {
119        &self.signature
120    }
121
122    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
123        arg_types
124            .iter()
125            .map(|arg_type| match arg_type {
126                Null => Ok(Null),
127                Int8 => Ok(Int64),
128                Int16 => Ok(Int64),
129                Int32 => Ok(Int64),
130                Int64 => Ok(Int64),
131                UInt8 => Ok(Int64),
132                UInt16 => Ok(Int64),
133                UInt32 => Ok(Int64),
134                UInt64 => Ok(Int64),
135                Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
136                Date32 => Ok(Date32),
137                Date64 => Ok(Date32),
138                Utf8 => Ok(Date32),
139                LargeUtf8 => Ok(Date32),
140                Utf8View => Ok(Date32),
141                Interval(_) => Ok(Interval(MonthDayNano)),
142                _ => exec_err!("Unsupported DataType"),
143            })
144            .try_collect()
145    }
146
147    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
148        if arg_types.iter().any(|t| t.is_null()) {
149            Ok(Null)
150        } else {
151            Ok(List(Arc::new(Field::new_list_field(
152                arg_types[0].clone(),
153                true,
154            ))))
155        }
156    }
157
158    fn invoke_with_args(
159        &self,
160        args: datafusion_expr::ScalarFunctionArgs,
161    ) -> Result<ColumnarValue> {
162        let args = &args.args;
163
164        if args.iter().any(|arg| arg.data_type().is_null()) {
165            return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
166        }
167        match args[0].data_type() {
168            Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args),
169            Date32 => make_scalar_function(|args| gen_range_date(args, false))(args),
170            Timestamp(_, _) => {
171                make_scalar_function(|args| gen_range_timestamp(args, false))(args)
172            }
173            dt => {
174                exec_err!("unsupported type for RANGE. Expected Int64, Date32 or Timestamp, got: {dt}")
175            }
176        }
177    }
178
179    fn aliases(&self) -> &[String] {
180        &self.aliases
181    }
182
183    fn documentation(&self) -> Option<&Documentation> {
184        self.doc()
185    }
186}
187
188make_udf_expr_and_func!(
189    GenSeries,
190    gen_series,
191    start stop step,
192    "create a list of values in the range between start and stop, include upper bound",
193    gen_series_udf
194);
195
196#[user_doc(
197    doc_section(label = "Array Functions"),
198    description = "Similar to the range function, but it includes the upper bound.",
199    syntax_example = "generate_series(start, stop, step)",
200    sql_example = r#"```sql
201> select generate_series(1,3);
202+------------------------------------+
203| generate_series(Int64(1),Int64(3)) |
204+------------------------------------+
205| [1, 2, 3]                          |
206+------------------------------------+
207```"#,
208    argument(
209        name = "start",
210        description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
211    ),
212    argument(
213        name = "end",
214        description = "End of the series (included). Type must be the same as start."
215    ),
216    argument(
217        name = "step",
218        description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
219    )
220)]
221#[derive(Debug)]
222pub(super) struct GenSeries {
223    signature: Signature,
224    aliases: Vec<String>,
225}
226impl GenSeries {
227    pub fn new() -> Self {
228        Self {
229            signature: Signature::user_defined(Volatility::Immutable),
230            aliases: vec![],
231        }
232    }
233}
234impl ScalarUDFImpl for GenSeries {
235    fn as_any(&self) -> &dyn Any {
236        self
237    }
238    fn name(&self) -> &str {
239        "generate_series"
240    }
241
242    fn signature(&self) -> &Signature {
243        &self.signature
244    }
245
246    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
247        arg_types
248            .iter()
249            .map(|arg_type| match arg_type {
250                Null => Ok(Null),
251                Int8 => Ok(Int64),
252                Int16 => Ok(Int64),
253                Int32 => Ok(Int64),
254                Int64 => Ok(Int64),
255                UInt8 => Ok(Int64),
256                UInt16 => Ok(Int64),
257                UInt32 => Ok(Int64),
258                UInt64 => Ok(Int64),
259                Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
260                Date32 => Ok(Date32),
261                Date64 => Ok(Date32),
262                Utf8 => Ok(Date32),
263                LargeUtf8 => Ok(Date32),
264                Utf8View => Ok(Date32),
265                Interval(_) => Ok(Interval(MonthDayNano)),
266                _ => exec_err!("Unsupported DataType"),
267            })
268            .try_collect()
269    }
270
271    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
272        if arg_types.iter().any(|t| t.is_null()) {
273            Ok(Null)
274        } else {
275            Ok(List(Arc::new(Field::new_list_field(
276                arg_types[0].clone(),
277                true,
278            ))))
279        }
280    }
281
282    fn invoke_with_args(
283        &self,
284        args: datafusion_expr::ScalarFunctionArgs,
285    ) -> Result<ColumnarValue> {
286        let args = &args.args;
287
288        if args.iter().any(|arg| arg.data_type().is_null()) {
289            return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
290        }
291        match args[0].data_type() {
292            Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args),
293            Date32 => make_scalar_function(|args| gen_range_date(args, true))(args),
294            Timestamp(_, _) => {
295                make_scalar_function(|args| gen_range_timestamp(args, true))(args)
296            }
297            dt => {
298                exec_err!(
299                    "unsupported type for GENERATE_SERIES. Expected Int64, Date32 or Timestamp, got: {}",
300                    dt
301                )
302            }
303        }
304    }
305
306    fn aliases(&self) -> &[String] {
307        &self.aliases
308    }
309
310    fn documentation(&self) -> Option<&Documentation> {
311        self.doc()
312    }
313}
314
315/// Generates an array of integers from start to stop with a given step.
316///
317/// This function takes 1 to 3 ArrayRefs as arguments, representing start, stop, and step values.
318/// It returns a `Result<ArrayRef>` representing the resulting ListArray after the operation.
319///
320/// # Arguments
321///
322/// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step(step value can not be zero.) values.
323///
324/// # Examples
325///
326/// gen_range(3) => [0, 1, 2]
327/// gen_range(1, 4) => [1, 2, 3]
328/// gen_range(1, 7, 2) => [1, 3, 5]
329pub(super) fn gen_range_inner(
330    args: &[ArrayRef],
331    include_upper: bool,
332) -> Result<ArrayRef> {
333    let (start_array, stop_array, step_array) = match args.len() {
334        1 => (None, as_int64_array(&args[0])?, None),
335        2 => (
336            Some(as_int64_array(&args[0])?),
337            as_int64_array(&args[1])?,
338            None,
339        ),
340        3 => (
341            Some(as_int64_array(&args[0])?),
342            as_int64_array(&args[1])?,
343            Some(as_int64_array(&args[2])?),
344        ),
345        _ => return exec_err!("gen_range expects 1 to 3 arguments"),
346    };
347
348    let mut values = vec![];
349    let mut offsets = vec![0];
350    let mut valid = NullBufferBuilder::new(stop_array.len());
351    for (idx, stop) in stop_array.iter().enumerate() {
352        match retrieve_range_args(start_array, stop, step_array, idx) {
353            Some((_, _, 0)) => {
354                return exec_err!(
355                    "step can't be 0 for function {}(start [, stop, step])",
356                    if include_upper {
357                        "generate_series"
358                    } else {
359                        "range"
360                    }
361                );
362            }
363            Some((start, stop, step)) => {
364                // Below, we utilize `usize` to represent steps.
365                // On 32-bit targets, the absolute value of `i64` may fail to fit into `usize`.
366                let step_abs = usize::try_from(step.unsigned_abs()).map_err(|_| {
367                    not_impl_datafusion_err!("step {} can't fit into usize", step)
368                })?;
369                values.extend(
370                    gen_range_iter(start, stop, step < 0, include_upper)
371                        .step_by(step_abs),
372                );
373                offsets.push(values.len() as i32);
374                valid.append_non_null();
375            }
376            // If any of the arguments is NULL, append a NULL value to the result.
377            None => {
378                offsets.push(values.len() as i32);
379                valid.append_null();
380            }
381        };
382    }
383    let arr = Arc::new(ListArray::try_new(
384        Arc::new(Field::new_list_field(Int64, true)),
385        OffsetBuffer::new(offsets.into()),
386        Arc::new(Int64Array::from(values)),
387        valid.finish(),
388    )?);
389    Ok(arr)
390}
391
392/// Get the (start, stop, step) args for the range and generate_series function.
393/// If any of the arguments is NULL, returns None.
394fn retrieve_range_args(
395    start_array: Option<&Int64Array>,
396    stop: Option<i64>,
397    step_array: Option<&Int64Array>,
398    idx: usize,
399) -> Option<(i64, i64, i64)> {
400    // Default start value is 0 if not provided
401    let start =
402        start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
403    let stop = stop?;
404    // Default step value is 1 if not provided
405    let step =
406        step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
407    Some((start, stop, step))
408}
409
410/// Returns an iterator of i64 values from start to stop
411fn gen_range_iter(
412    start: i64,
413    stop: i64,
414    decreasing: bool,
415    include_upper: bool,
416) -> Box<dyn Iterator<Item = i64>> {
417    match (decreasing, include_upper) {
418        // Decreasing range, stop is inclusive
419        (true, true) => Box::new((stop..=start).rev()),
420        // Decreasing range, stop is exclusive
421        (true, false) => {
422            if stop == i64::MAX {
423                // start is never greater than stop, and stop is exclusive,
424                // so the decreasing range must be empty.
425                Box::new(std::iter::empty())
426            } else {
427                // Increase the stop value by one to exclude it.
428                // Since stop is not i64::MAX, `stop + 1` will not overflow.
429                Box::new((stop + 1..=start).rev())
430            }
431        }
432        // Increasing range, stop is inclusive
433        (false, true) => Box::new(start..=stop),
434        // Increasing range, stop is exclusive
435        (false, false) => Box::new(start..stop),
436    }
437}
438
439fn gen_range_date(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
440    let [start, stop, step] = take_function_args("range", args)?;
441
442    let (start_array, stop_array, step_array) = (
443        Some(as_date32_array(start)?),
444        as_date32_array(stop)?,
445        Some(as_interval_mdn_array(step)?),
446    );
447
448    // values are date32s
449    let values_builder = Date32Builder::new();
450    let mut list_builder = ListBuilder::new(values_builder);
451
452    for idx in 0..stop_array.len() {
453        if stop_array.is_null(idx) {
454            list_builder.append_null();
455            continue;
456        }
457        let mut stop = stop_array.value(idx);
458
459        let start = if let Some(start_array_values) = start_array {
460            if start_array_values.is_null(idx) {
461                list_builder.append_null();
462                continue;
463            }
464            start_array_values.value(idx)
465        } else {
466            list_builder.append_null();
467            continue;
468        };
469
470        let step = if let Some(step) = step_array {
471            if step.is_null(idx) {
472                list_builder.append_null();
473                continue;
474            }
475            step.value(idx)
476        } else {
477            list_builder.append_null();
478            continue;
479        };
480
481        let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
482
483        if months == 0 && days == 0 {
484            return exec_err!("Cannot generate date range less than 1 day.");
485        }
486
487        let neg = months < 0 || days < 0;
488        if !include_upper_bound {
489            stop = Date32Type::subtract_month_day_nano(stop, step);
490        }
491        let mut new_date = start;
492
493        let values = from_fn(|| {
494            if (neg && new_date < stop) || (!neg && new_date > stop) {
495                None
496            } else {
497                let current_date = new_date;
498                new_date = Date32Type::add_month_day_nano(new_date, step);
499                Some(Some(current_date))
500            }
501        });
502
503        list_builder.append_value(values);
504    }
505
506    let arr = Arc::new(list_builder.finish());
507
508    Ok(arr)
509}
510
511fn gen_range_timestamp(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
512    let func_name = if include_upper_bound {
513        "GENERATE_SERIES"
514    } else {
515        "RANGE"
516    };
517    let [start, stop, step] = take_function_args(func_name, args)?;
518
519    // coerce_types fn should coerce all types to Timestamp(Nanosecond, tz)
520    let (start_arr, start_tz_opt) = cast_timestamp_arg(start, include_upper_bound)?;
521    let (stop_arr, stop_tz_opt) = cast_timestamp_arg(stop, include_upper_bound)?;
522    let step_arr = as_interval_mdn_array(step)?;
523    let start_tz = parse_tz(start_tz_opt)?;
524    let stop_tz = parse_tz(stop_tz_opt)?;
525
526    // values are timestamps
527    let values_builder = start_tz_opt
528        .clone()
529        .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
530            TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
531        });
532    let mut list_builder = ListBuilder::new(values_builder);
533
534    for idx in 0..start_arr.len() {
535        if start_arr.is_null(idx) || stop_arr.is_null(idx) || step_arr.is_null(idx) {
536            list_builder.append_null();
537            continue;
538        }
539
540        let start = start_arr.value(idx);
541        let stop = stop_arr.value(idx);
542        let step = step_arr.value(idx);
543
544        let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
545        if months == 0 && days == 0 && ns == 0 {
546            return exec_err!(
547                "Interval argument to {} must not be 0",
548                if include_upper_bound {
549                    "GENERATE_SERIES"
550                } else {
551                    "RANGE"
552                }
553            );
554        }
555
556        let neg = TSNT::add_month_day_nano(start, step, start_tz)
557            .ok_or(exec_datafusion_err!(
558                "Cannot generate timestamp range where start + step overflows"
559            ))?
560            .cmp(&start)
561            == Ordering::Less;
562
563        let stop_dt = as_datetime_with_timezone::<TSNT>(stop, stop_tz).ok_or(
564            exec_datafusion_err!(
565                "Cannot generate timestamp for stop: {}: {:?}",
566                stop,
567                stop_tz
568            ),
569        )?;
570
571        let mut current = start;
572        let mut current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz).ok_or(
573            exec_datafusion_err!(
574                "Cannot generate timestamp for start: {}: {:?}",
575                current,
576                start_tz
577            ),
578        )?;
579
580        let values = from_fn(|| {
581            if (include_upper_bound
582                && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt)))
583                || (!include_upper_bound
584                    && ((neg && current_dt <= stop_dt)
585                        || (!neg && current_dt >= stop_dt)))
586            {
587                return None;
588            }
589
590            let prev_current = current;
591
592            if let Some(ts) = TSNT::add_month_day_nano(current, step, start_tz) {
593                current = ts;
594                current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz)?;
595
596                Some(Some(prev_current))
597            } else {
598                // we failed to parse the timestamp here so terminate the series
599                None
600            }
601        });
602
603        list_builder.append_value(values);
604    }
605
606    let arr = Arc::new(list_builder.finish());
607
608    Ok(arr)
609}
610
611fn cast_timestamp_arg(
612    arg: &ArrayRef,
613    include_upper: bool,
614) -> Result<(&TimestampNanosecondArray, &Option<Arc<str>>)> {
615    match arg.data_type() {
616        Timestamp(Nanosecond, tz_opt) => {
617            Ok((as_timestamp_nanosecond_array(arg)?, tz_opt))
618        }
619        _ => {
620            internal_err!(
621                "Unexpected argument type for {} : {}",
622                if include_upper {
623                    "GENERATE_SERIES"
624                } else {
625                    "RANGE"
626                },
627                arg.data_type()
628            )
629        }
630    }
631}
632
633fn parse_tz(tz: &Option<Arc<str>>) -> Result<Tz> {
634    let tz = tz.as_ref().map_or_else(|| "+00", |s| s);
635
636    Tz::from_str(tz)
637        .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
638}