datafusion_functions_nested/
range.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for range and gen_series functions.
19
20use crate::utils::make_scalar_function;
21use arrow::buffer::OffsetBuffer;
22use arrow::datatypes::TimeUnit;
23use arrow::datatypes::{DataType, Field, IntervalUnit::MonthDayNano};
24use arrow::{
25    array::{
26        builder::{Date32Builder, TimestampNanosecondBuilder},
27        temporal_conversions::as_datetime_with_timezone,
28        timezone::Tz,
29        types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType},
30        Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullBufferBuilder,
31    },
32    compute::cast,
33};
34use datafusion_common::internal_err;
35use datafusion_common::{
36    cast::{
37        as_date32_array, as_int64_array, as_interval_mdn_array,
38        as_timestamp_nanosecond_array,
39    },
40    types::{
41        logical_date, logical_int64, logical_interval_mdn, logical_string, NativeType,
42    },
43    ScalarValue,
44};
45use datafusion_common::{
46    exec_datafusion_err, exec_err, not_impl_datafusion_err, utils::take_function_args,
47    Result,
48};
49use datafusion_expr::{
50    Coercion, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature,
51    TypeSignatureClass, Volatility,
52};
53use datafusion_macros::user_doc;
54use std::any::Any;
55use std::cmp::Ordering;
56use std::iter::from_fn;
57use std::str::FromStr;
58use std::sync::Arc;
59
60make_udf_expr_and_func!(
61    Range,
62    range,
63    start stop step,
64    "create a list of values in the range between start and stop",
65    range_udf,
66    Range::new
67);
68
69make_udf_expr_and_func!(
70    GenSeries,
71    gen_series,
72    start stop step,
73    "create a list of values in the range between start and stop, include upper bound",
74    gen_series_udf,
75    Range::generate_series
76);
77
78#[user_doc(
79    doc_section(label = "Array Functions"),
80    description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
81    syntax_example = "range(stop)
82range(start, stop[, step])",
83    sql_example = r#"```sql
84> select range(2, 10, 3);
85+-----------------------------------+
86| range(Int64(2),Int64(10),Int64(3))|
87+-----------------------------------+
88| [2, 5, 8]                         |
89+-----------------------------------+
90
91> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
92+--------------------------------------------------------------------------+
93| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH)          |
94+--------------------------------------------------------------------------+
95| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
96+--------------------------------------------------------------------------+
97```"#,
98    argument(
99        name = "start",
100        description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
101    ),
102    argument(
103        name = "end",
104        description = "End of the range (not included). Type must be the same as start."
105    ),
106    argument(
107        name = "step",
108        description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
109    )
110)]
111struct RangeDoc {}
112
113#[user_doc(
114    doc_section(label = "Array Functions"),
115    description = "Similar to the range function, but it includes the upper bound.",
116    syntax_example = "generate_series(stop)
117generate_series(start, stop[, step])",
118    sql_example = r#"```sql
119> select generate_series(1,3);
120+------------------------------------+
121| generate_series(Int64(1),Int64(3)) |
122+------------------------------------+
123| [1, 2, 3]                          |
124+------------------------------------+
125```"#,
126    argument(
127        name = "start",
128        description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
129    ),
130    argument(
131        name = "end",
132        description = "End of the series (included). Type must be the same as start."
133    ),
134    argument(
135        name = "step",
136        description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
137    )
138)]
139struct GenerateSeriesDoc {}
140
141#[derive(Debug, PartialEq, Eq, Hash)]
142pub struct Range {
143    signature: Signature,
144    /// `false` for range, `true` for generate_series
145    include_upper_bound: bool,
146}
147
148impl Default for Range {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154impl Range {
155    fn defined_signature() -> Signature {
156        // We natively only support i64 in our implementation; so ensure we cast other integer
157        // types to it.
158        let integer = Coercion::new_implicit(
159            TypeSignatureClass::Native(logical_int64()),
160            vec![TypeSignatureClass::Integer],
161            NativeType::Int64,
162        );
163        // We natively only support mdn in our implementation; so ensure we cast other interval
164        // types to it.
165        let interval = Coercion::new_implicit(
166            TypeSignatureClass::Native(logical_interval_mdn()),
167            vec![TypeSignatureClass::Interval],
168            NativeType::Interval(MonthDayNano),
169        );
170        // Ideally we'd limit to only Date32 & Timestamp(Nanoseconds) as those are the implementations
171        // we have but that is difficult to do with this current API; we'll cast later on to
172        // handle such types.
173        let date = Coercion::new_implicit(
174            TypeSignatureClass::Native(logical_date()),
175            vec![TypeSignatureClass::Native(logical_string())],
176            NativeType::Date,
177        );
178        let timestamp = Coercion::new_exact(TypeSignatureClass::Timestamp);
179        Signature::one_of(
180            vec![
181                // Integer ranges
182                // Stop
183                TypeSignature::Coercible(vec![integer.clone()]),
184                // Start & stop
185                TypeSignature::Coercible(vec![integer.clone(), integer.clone()]),
186                // Start, stop & step
187                TypeSignature::Coercible(vec![integer.clone(), integer.clone(), integer]),
188                // Date range
189                TypeSignature::Coercible(vec![date.clone(), date, interval.clone()]),
190                // Timestamp range
191                TypeSignature::Coercible(vec![timestamp.clone(), timestamp, interval]),
192            ],
193            Volatility::Immutable,
194        )
195    }
196
197    /// Generate `range()` function which excludes upper bound.
198    pub fn new() -> Self {
199        Self {
200            signature: Self::defined_signature(),
201            include_upper_bound: false,
202        }
203    }
204
205    /// Generate `generate_series()` function which includes upper bound.
206    fn generate_series() -> Self {
207        Self {
208            signature: Self::defined_signature(),
209            include_upper_bound: true,
210        }
211    }
212}
213
214impl ScalarUDFImpl for Range {
215    fn as_any(&self) -> &dyn Any {
216        self
217    }
218
219    fn name(&self) -> &str {
220        if self.include_upper_bound {
221            "generate_series"
222        } else {
223            "range"
224        }
225    }
226
227    fn signature(&self) -> &Signature {
228        &self.signature
229    }
230
231    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
232        if arg_types.iter().any(|t| t.is_null()) {
233            return Ok(DataType::Null);
234        }
235
236        match (&arg_types[0], arg_types.get(1)) {
237            // In implementation we downcast to Date32 so ensure reflect that here
238            (_, Some(DataType::Date64)) | (DataType::Date64, _) => Ok(DataType::List(
239                Arc::new(Field::new_list_field(DataType::Date32, true)),
240            )),
241            // Ensure we preserve timezone
242            (DataType::Timestamp(_, tz), _) => {
243                Ok(DataType::List(Arc::new(Field::new_list_field(
244                    DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
245                    true,
246                ))))
247            }
248            _ => Ok(DataType::List(Arc::new(Field::new_list_field(
249                arg_types[0].clone(),
250                true,
251            )))),
252        }
253    }
254
255    fn invoke_with_args(
256        &self,
257        args: datafusion_expr::ScalarFunctionArgs,
258    ) -> Result<ColumnarValue> {
259        let args = &args.args;
260
261        if args.iter().any(|arg| arg.data_type().is_null()) {
262            return Ok(ColumnarValue::Scalar(ScalarValue::Null));
263        }
264        match args[0].data_type() {
265            DataType::Int64 => {
266                make_scalar_function(|args| self.gen_range_inner(args))(args)
267            }
268            DataType::Date32 | DataType::Date64 => {
269                make_scalar_function(|args| self.gen_range_date(args))(args)
270            }
271            DataType::Timestamp(_, _) => {
272                make_scalar_function(|args| self.gen_range_timestamp(args))(args)
273            }
274            dt => {
275                internal_err!(
276                    "Signature failed to guard unknown input type for {}: {dt}",
277                    self.name()
278                )
279            }
280        }
281    }
282
283    fn documentation(&self) -> Option<&Documentation> {
284        if self.include_upper_bound {
285            GenerateSeriesDoc {}.doc()
286        } else {
287            RangeDoc {}.doc()
288        }
289    }
290}
291
292impl Range {
293    /// Generates an array of integers from start to stop with a given step.
294    ///
295    /// This function takes 1 to 3 ArrayRefs as arguments, representing start, stop, and step values.
296    /// It returns a `Result<ArrayRef>` representing the resulting ListArray after the operation.
297    ///
298    /// # Arguments
299    ///
300    /// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step(step value can not be zero.) values.
301    ///
302    /// # Examples
303    ///
304    /// gen_range(3) => [0, 1, 2]
305    /// gen_range(1, 4) => [1, 2, 3]
306    /// gen_range(1, 7, 2) => [1, 3, 5]
307    fn gen_range_inner(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
308        let (start_array, stop_array, step_array) = match args {
309            [stop_array] => (None, as_int64_array(stop_array)?, None),
310            [start_array, stop_array] => (
311                Some(as_int64_array(start_array)?),
312                as_int64_array(stop_array)?,
313                None,
314            ),
315            [start_array, stop_array, step_array] => (
316                Some(as_int64_array(start_array)?),
317                as_int64_array(stop_array)?,
318                Some(as_int64_array(step_array)?),
319            ),
320            _ => return internal_err!("{} expects 1 to 3 arguments", self.name()),
321        };
322
323        let mut values = vec![];
324        let mut offsets = vec![0];
325        let mut valid = NullBufferBuilder::new(stop_array.len());
326        for (idx, stop) in stop_array.iter().enumerate() {
327            match retrieve_range_args(start_array, stop, step_array, idx) {
328                Some((_, _, 0)) => {
329                    return exec_err!(
330                        "step can't be 0 for function {}(start [, stop, step])",
331                        self.name()
332                    );
333                }
334                Some((start, stop, step)) => {
335                    // Below, we utilize `usize` to represent steps.
336                    // On 32-bit targets, the absolute value of `i64` may fail to fit into `usize`.
337                    let step_abs =
338                        usize::try_from(step.unsigned_abs()).map_err(|_| {
339                            not_impl_datafusion_err!("step {} can't fit into usize", step)
340                        })?;
341                    values.extend(
342                        gen_range_iter(start, stop, step < 0, self.include_upper_bound)
343                            .step_by(step_abs),
344                    );
345                    offsets.push(values.len() as i32);
346                    valid.append_non_null();
347                }
348                // If any of the arguments is NULL, append a NULL value to the result.
349                None => {
350                    offsets.push(values.len() as i32);
351                    valid.append_null();
352                }
353            };
354        }
355        let arr = Arc::new(ListArray::try_new(
356            Arc::new(Field::new_list_field(DataType::Int64, true)),
357            OffsetBuffer::new(offsets.into()),
358            Arc::new(Int64Array::from(values)),
359            valid.finish(),
360        )?);
361        Ok(arr)
362    }
363
364    fn gen_range_date(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
365        let [start, stop, step] = take_function_args(self.name(), args)?;
366        let step = as_interval_mdn_array(step)?;
367
368        // Signature can only guarantee we get a date type, not specifically
369        // date32 so handle potential cast from date64 here.
370        let start = cast(start, &DataType::Date32)?;
371        let start = as_date32_array(&start)?;
372        let stop = cast(stop, &DataType::Date32)?;
373        let stop = as_date32_array(&stop)?;
374
375        // values are date32s
376        let values_builder = Date32Builder::new();
377        let mut list_builder = ListBuilder::new(values_builder);
378
379        for idx in 0..stop.len() {
380            if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
381                list_builder.append_null();
382                continue;
383            }
384
385            let start = start.value(idx);
386            let stop = stop.value(idx);
387            let step = step.value(idx);
388
389            let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
390            if months == 0 && days == 0 {
391                return exec_err!("Cannot generate date range less than 1 day.");
392            }
393
394            let stop = if !self.include_upper_bound {
395                Date32Type::subtract_month_day_nano(stop, step)
396            } else {
397                stop
398            };
399
400            let neg = months < 0 || days < 0;
401            let mut new_date = start;
402
403            let values = from_fn(|| {
404                if (neg && new_date < stop) || (!neg && new_date > stop) {
405                    None
406                } else {
407                    let current_date = new_date;
408                    new_date = Date32Type::add_month_day_nano(new_date, step);
409                    Some(Some(current_date))
410                }
411            });
412
413            list_builder.append_value(values);
414        }
415
416        let arr = Arc::new(list_builder.finish());
417
418        Ok(arr)
419    }
420
421    fn gen_range_timestamp(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
422        let [start, stop, step] = take_function_args(self.name(), args)?;
423        let step = as_interval_mdn_array(step)?;
424
425        // Signature can only guarantee we get a timestamp type, not specifically
426        // timestamp(ns) so handle potential cast from other timestamps here.
427        fn cast_to_ns(arr: &ArrayRef) -> Result<ArrayRef> {
428            match arr.data_type() {
429                DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok(Arc::clone(arr)),
430                DataType::Timestamp(_, tz) => Ok(cast(
431                    arr,
432                    &DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
433                )?),
434                _ => unreachable!(),
435            }
436        }
437        let start = cast_to_ns(start)?;
438        let start = as_timestamp_nanosecond_array(&start)?;
439        let stop = cast_to_ns(stop)?;
440        let stop = as_timestamp_nanosecond_array(&stop)?;
441
442        let start_tz = parse_tz(&start.timezone())?;
443        let stop_tz = parse_tz(&stop.timezone())?;
444
445        // values are timestamps
446        let values_builder = start
447            .timezone()
448            .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
449                TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
450            });
451        let mut list_builder = ListBuilder::new(values_builder);
452
453        for idx in 0..start.len() {
454            if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
455                list_builder.append_null();
456                continue;
457            }
458
459            let start = start.value(idx);
460            let stop = stop.value(idx);
461            let step = step.value(idx);
462
463            let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
464            if months == 0 && days == 0 && ns == 0 {
465                return exec_err!("Interval argument to {} must not be 0", self.name());
466            }
467
468            let neg = TimestampNanosecondType::add_month_day_nano(start, step, start_tz)
469                .ok_or(exec_datafusion_err!(
470                    "Cannot generate timestamp range where start + step overflows"
471                ))?
472                .cmp(&start)
473                == Ordering::Less;
474
475            let stop_dt =
476                as_datetime_with_timezone::<TimestampNanosecondType>(stop, stop_tz)
477                    .ok_or(exec_datafusion_err!(
478                        "Cannot generate timestamp for stop: {}: {:?}",
479                        stop,
480                        stop_tz
481                    ))?;
482
483            let mut current = start;
484            let mut current_dt =
485                as_datetime_with_timezone::<TimestampNanosecondType>(current, start_tz)
486                    .ok_or(exec_datafusion_err!(
487                    "Cannot generate timestamp for start: {}: {:?}",
488                    current,
489                    start_tz
490                ))?;
491
492            let values = from_fn(|| {
493                let generate_series_should_end = self.include_upper_bound
494                    && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt));
495                let range_should_end = !self.include_upper_bound
496                    && ((neg && current_dt <= stop_dt)
497                        || (!neg && current_dt >= stop_dt));
498                if generate_series_should_end || range_should_end {
499                    return None;
500                }
501
502                let prev_current = current;
503
504                if let Some(ts) =
505                    TimestampNanosecondType::add_month_day_nano(current, step, start_tz)
506                {
507                    current = ts;
508                    current_dt = as_datetime_with_timezone::<TimestampNanosecondType>(
509                        current, start_tz,
510                    )?;
511
512                    Some(Some(prev_current))
513                } else {
514                    // we failed to parse the timestamp here so terminate the series
515                    None
516                }
517            });
518
519            list_builder.append_value(values);
520        }
521
522        let arr = Arc::new(list_builder.finish());
523
524        Ok(arr)
525    }
526}
527
528/// Get the (start, stop, step) args for the range and generate_series function.
529/// If any of the arguments is NULL, returns None.
530fn retrieve_range_args(
531    start_array: Option<&Int64Array>,
532    stop: Option<i64>,
533    step_array: Option<&Int64Array>,
534    idx: usize,
535) -> Option<(i64, i64, i64)> {
536    // Default start value is 0 if not provided
537    let start =
538        start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
539    let stop = stop?;
540    // Default step value is 1 if not provided
541    let step =
542        step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
543    Some((start, stop, step))
544}
545
546/// Returns an iterator of i64 values from start to stop
547fn gen_range_iter(
548    start: i64,
549    stop: i64,
550    decreasing: bool,
551    include_upper: bool,
552) -> Box<dyn Iterator<Item = i64>> {
553    match (decreasing, include_upper) {
554        // Decreasing range, stop is inclusive
555        (true, true) => Box::new((stop..=start).rev()),
556        // Decreasing range, stop is exclusive
557        (true, false) => {
558            if stop == i64::MAX {
559                // start is never greater than stop, and stop is exclusive,
560                // so the decreasing range must be empty.
561                Box::new(std::iter::empty())
562            } else {
563                // Increase the stop value by one to exclude it.
564                // Since stop is not i64::MAX, `stop + 1` will not overflow.
565                Box::new((stop + 1..=start).rev())
566            }
567        }
568        // Increasing range, stop is inclusive
569        (false, true) => Box::new(start..=stop),
570        // Increasing range, stop is exclusive
571        (false, false) => Box::new(start..stop),
572    }
573}
574
575fn parse_tz(tz: &Option<&str>) -> Result<Tz> {
576    let tz = tz.as_ref().map_or_else(|| "+00", |s| s);
577
578    Tz::from_str(tz)
579        .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
580}