Skip to main content

datafusion_functions_nested/
range.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for range and gen_series functions.
19
20use crate::utils::make_scalar_function;
21use arrow::buffer::OffsetBuffer;
22use arrow::datatypes::TimeUnit;
23use arrow::datatypes::{DataType, Field, IntervalUnit::MonthDayNano};
24use arrow::{
25    array::{
26        Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullBufferBuilder,
27        builder::{Date32Builder, TimestampNanosecondBuilder},
28        temporal_conversions::as_datetime_with_timezone,
29        timezone::Tz,
30        types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType},
31    },
32    compute::cast,
33};
34use datafusion_common::internal_err;
35use datafusion_common::{
36    Result, exec_datafusion_err, exec_err, utils::take_function_args,
37};
38use datafusion_common::{
39    ScalarValue,
40    cast::{
41        as_date32_array, as_int64_array, as_interval_mdn_array,
42        as_timestamp_nanosecond_array,
43    },
44    types::{
45        NativeType, logical_date, logical_int64, logical_interval_mdn, logical_string,
46    },
47};
48use datafusion_expr::{
49    Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
50    TypeSignature, TypeSignatureClass, Volatility,
51};
52use datafusion_macros::user_doc;
53use std::cmp::Ordering;
54use std::iter::from_fn;
55use std::str::FromStr;
56use std::sync::Arc;
57
58make_udf_expr_and_func!(
59    Range,
60    range,
61    start stop step,
62    "create a list of values in the range between start and stop",
63    range_udf,
64    Range::new
65);
66
67make_udf_expr_and_func!(
68    GenSeries,
69    gen_series,
70    start stop step,
71    "create a list of values in the range between start and stop, include upper bound",
72    gen_series_udf,
73    Range::generate_series
74);
75
76#[user_doc(
77    doc_section(label = "Array Functions"),
78    description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
79    syntax_example = "range(stop)
80range(start, stop[, step])",
81    sql_example = r#"```sql
82> select range(2, 10, 3);
83+-----------------------------------+
84| range(Int64(2),Int64(10),Int64(3))|
85+-----------------------------------+
86| [2, 5, 8]                         |
87+-----------------------------------+
88
89> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
90+--------------------------------------------------------------------------+
91| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH)          |
92+--------------------------------------------------------------------------+
93| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
94+--------------------------------------------------------------------------+
95```"#,
96    argument(
97        name = "start",
98        description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
99    ),
100    argument(
101        name = "end",
102        description = "End of the range (not included). Type must be the same as start."
103    ),
104    argument(
105        name = "step",
106        description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
107    )
108)]
109struct RangeDoc {}
110
111#[user_doc(
112    doc_section(label = "Array Functions"),
113    description = "Similar to the range function, but it includes the upper bound.",
114    syntax_example = "generate_series(stop)
115generate_series(start, stop[, step])",
116    sql_example = r#"```sql
117> select generate_series(1,3);
118+------------------------------------+
119| generate_series(Int64(1),Int64(3)) |
120+------------------------------------+
121| [1, 2, 3]                          |
122+------------------------------------+
123```"#,
124    argument(
125        name = "start",
126        description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
127    ),
128    argument(
129        name = "end",
130        description = "End of the series (included). Type must be the same as start."
131    ),
132    argument(
133        name = "step",
134        description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
135    )
136)]
137struct GenerateSeriesDoc {}
138
139#[derive(Debug, PartialEq, Eq, Hash)]
140pub struct Range {
141    signature: Signature,
142    /// `false` for range, `true` for generate_series
143    include_upper_bound: bool,
144}
145
146impl Default for Range {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl Range {
153    fn defined_signature() -> Signature {
154        // We natively only support i64 in our implementation; so ensure we cast other integer
155        // types to it.
156        let integer = Coercion::new_implicit(
157            TypeSignatureClass::Native(logical_int64()),
158            vec![TypeSignatureClass::Integer],
159            NativeType::Int64,
160        );
161        // We natively only support mdn in our implementation; so ensure we cast other interval
162        // types to it.
163        let interval = Coercion::new_implicit(
164            TypeSignatureClass::Native(logical_interval_mdn()),
165            vec![TypeSignatureClass::Interval],
166            NativeType::Interval(MonthDayNano),
167        );
168        // Ideally we'd limit to only Date32 & Timestamp(Nanoseconds) as those are the implementations
169        // we have but that is difficult to do with this current API; we'll cast later on to
170        // handle such types.
171        let date = Coercion::new_implicit(
172            TypeSignatureClass::Native(logical_date()),
173            vec![TypeSignatureClass::Native(logical_string())],
174            NativeType::Date,
175        );
176        let timestamp = Coercion::new_exact(TypeSignatureClass::Timestamp);
177        Signature::one_of(
178            vec![
179                // Integer ranges
180                // Stop
181                TypeSignature::Coercible(vec![integer.clone()]),
182                // Start & stop
183                TypeSignature::Coercible(vec![integer.clone(), integer.clone()]),
184                // Start, stop & step
185                TypeSignature::Coercible(vec![integer.clone(), integer.clone(), integer]),
186                // Date range
187                TypeSignature::Coercible(vec![date.clone(), date, interval.clone()]),
188                // Timestamp range
189                TypeSignature::Coercible(vec![timestamp.clone(), timestamp, interval]),
190            ],
191            Volatility::Immutable,
192        )
193    }
194
195    /// Generate `range()` function which excludes upper bound.
196    pub fn new() -> Self {
197        Self {
198            signature: Self::defined_signature(),
199            include_upper_bound: false,
200        }
201    }
202
203    /// Generate `generate_series()` function which includes upper bound.
204    fn generate_series() -> Self {
205        Self {
206            signature: Self::defined_signature(),
207            include_upper_bound: true,
208        }
209    }
210}
211
212impl ScalarUDFImpl for Range {
213    fn name(&self) -> &str {
214        if self.include_upper_bound {
215            "generate_series"
216        } else {
217            "range"
218        }
219    }
220
221    fn signature(&self) -> &Signature {
222        &self.signature
223    }
224
225    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
226        if arg_types.iter().any(|t| t.is_null()) {
227            return Ok(DataType::Null);
228        }
229
230        match (&arg_types[0], arg_types.get(1)) {
231            // In implementation we downcast to Date32 so ensure reflect that here
232            (_, Some(DataType::Date64)) | (DataType::Date64, _) => Ok(DataType::List(
233                Arc::new(Field::new_list_field(DataType::Date32, true)),
234            )),
235            // Ensure we preserve timezone
236            (DataType::Timestamp(_, tz), _) => {
237                Ok(DataType::List(Arc::new(Field::new_list_field(
238                    DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
239                    true,
240                ))))
241            }
242            _ => Ok(DataType::List(Arc::new(Field::new_list_field(
243                arg_types[0].clone(),
244                true,
245            )))),
246        }
247    }
248
249    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
250        let args = &args.args;
251
252        if args.iter().any(|arg| arg.data_type().is_null()) {
253            return Ok(ColumnarValue::Scalar(ScalarValue::Null));
254        }
255        match args[0].data_type() {
256            DataType::Int64 => {
257                make_scalar_function(|args| self.gen_range_inner(args))(args)
258            }
259            DataType::Date32 | DataType::Date64 => {
260                make_scalar_function(|args| self.gen_range_date(args))(args)
261            }
262            DataType::Timestamp(_, _) => {
263                make_scalar_function(|args| self.gen_range_timestamp(args))(args)
264            }
265            dt => {
266                internal_err!(
267                    "Signature failed to guard unknown input type for {}: {dt}",
268                    self.name()
269                )
270            }
271        }
272    }
273
274    fn documentation(&self) -> Option<&Documentation> {
275        if self.include_upper_bound {
276            GenerateSeriesDoc {}.doc()
277        } else {
278            RangeDoc {}.doc()
279        }
280    }
281}
282
283impl Range {
284    /// Generates an array of integers from start to stop with a given step.
285    ///
286    /// This function takes 1 to 3 ArrayRefs as arguments, representing start, stop, and step values.
287    /// It returns a `Result<ArrayRef>` representing the resulting ListArray after the operation.
288    ///
289    /// # Arguments
290    ///
291    /// * `args` - An array of 1 to 3 ArrayRefs representing start, stop, and step (step value can not be zero) values.
292    ///
293    /// # Examples
294    ///
295    /// gen_range(3) => [0, 1, 2]
296    /// gen_range(1, 4) => [1, 2, 3]
297    /// gen_range(1, 7, 2) => [1, 3, 5]
298    fn gen_range_inner(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
299        let (start_array, stop_array, step_array) = match args {
300            [stop_array] => (None, as_int64_array(stop_array)?, None),
301            [start_array, stop_array] => (
302                Some(as_int64_array(start_array)?),
303                as_int64_array(stop_array)?,
304                None,
305            ),
306            [start_array, stop_array, step_array] => (
307                Some(as_int64_array(start_array)?),
308                as_int64_array(stop_array)?,
309                Some(as_int64_array(step_array)?),
310            ),
311            _ => return internal_err!("{} expects 1 to 3 arguments", self.name()),
312        };
313
314        let mut values = vec![];
315        let mut offsets = vec![0];
316        let mut valid = NullBufferBuilder::new(stop_array.len());
317        for (idx, stop) in stop_array.iter().enumerate() {
318            match retrieve_range_args(start_array, stop, step_array, idx) {
319                Some((_, _, 0)) => {
320                    return exec_err!(
321                        "step can't be 0 for function {}(start [, stop, step])",
322                        self.name()
323                    );
324                }
325                Some((start, stop, step)) => {
326                    generate_range_values(
327                        start,
328                        stop,
329                        step,
330                        self.include_upper_bound,
331                        &mut values,
332                    )?;
333                    offsets.push(values.len() as i32);
334                    valid.append_non_null();
335                }
336                // If any of the arguments is NULL, append a NULL value to the result.
337                None => {
338                    offsets.push(values.len() as i32);
339                    valid.append_null();
340                }
341            };
342        }
343        let arr = Arc::new(ListArray::try_new(
344            Arc::new(Field::new_list_field(DataType::Int64, true)),
345            OffsetBuffer::new(offsets.into()),
346            Arc::new(Int64Array::from(values)),
347            valid.finish(),
348        )?);
349        Ok(arr)
350    }
351
352    fn gen_range_date(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
353        let [start, stop, step] = take_function_args(self.name(), args)?;
354        let step = as_interval_mdn_array(step)?;
355
356        // Signature can only guarantee we get a date type, not specifically
357        // date32 so handle potential cast from date64 here.
358        let start = cast(start, &DataType::Date32)?;
359        let start = as_date32_array(&start)?;
360        let stop = cast(stop, &DataType::Date32)?;
361        let stop = as_date32_array(&stop)?;
362
363        // values are date32s
364        let values_builder = Date32Builder::new();
365        let mut list_builder = ListBuilder::new(values_builder);
366
367        for idx in 0..stop.len() {
368            if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
369                list_builder.append_null();
370                continue;
371            }
372
373            let start = start.value(idx);
374            let stop = stop.value(idx);
375            let step = step.value(idx);
376
377            let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
378            if months == 0 && days == 0 {
379                return exec_err!("Cannot generate date range less than 1 day.");
380            }
381
382            let stop = if !self.include_upper_bound {
383                Date32Type::subtract_month_day_nano_opt(stop, step).ok_or_else(|| {
384                    exec_datafusion_err!(
385                        "Cannot generate date range where stop {} - {step:?}) overflows",
386                        date32_to_string(stop)
387                    )
388                })?
389            } else {
390                stop
391            };
392
393            let neg = months < 0 || days < 0;
394            let mut new_date = Some(start);
395
396            let values = from_fn(|| {
397                let Some(current_date) = new_date else {
398                    return None; // previous overflow
399                };
400                if (neg && current_date < stop) || (!neg && current_date > stop) {
401                    None
402                } else {
403                    new_date = Date32Type::add_month_day_nano_opt(current_date, step);
404                    Some(Some(current_date))
405                }
406            });
407
408            list_builder.append_value(values);
409        }
410
411        let arr = Arc::new(list_builder.finish());
412
413        Ok(arr)
414    }
415
416    fn gen_range_timestamp(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
417        let [start, stop, step] = take_function_args(self.name(), args)?;
418        let step = as_interval_mdn_array(step)?;
419
420        // Signature can only guarantee we get a timestamp type, not specifically
421        // timestamp(ns) so handle potential cast from other timestamps here.
422        fn cast_to_ns(arr: &ArrayRef) -> Result<ArrayRef> {
423            match arr.data_type() {
424                DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok(Arc::clone(arr)),
425                DataType::Timestamp(_, tz) => Ok(cast(
426                    arr,
427                    &DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
428                )?),
429                _ => unreachable!(),
430            }
431        }
432        let start = cast_to_ns(start)?;
433        let start = as_timestamp_nanosecond_array(&start)?;
434        let stop = cast_to_ns(stop)?;
435        let stop = as_timestamp_nanosecond_array(&stop)?;
436
437        let start_tz = parse_tz(&start.timezone())?;
438        let stop_tz = parse_tz(&stop.timezone())?;
439
440        // values are timestamps
441        let values_builder = start
442            .timezone()
443            .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
444                TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
445            });
446        let mut list_builder = ListBuilder::new(values_builder);
447
448        for idx in 0..start.len() {
449            if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
450                list_builder.append_null();
451                continue;
452            }
453
454            let start = start.value(idx);
455            let stop = stop.value(idx);
456            let step = step.value(idx);
457
458            let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
459            if months == 0 && days == 0 && ns == 0 {
460                return exec_err!("Interval argument to {} must not be 0", self.name());
461            }
462
463            let neg = TimestampNanosecondType::add_month_day_nano(start, step, start_tz)
464                .ok_or(exec_datafusion_err!(
465                    "Cannot generate timestamp range where start + step overflows"
466                ))?
467                .cmp(&start)
468                == Ordering::Less;
469
470            let stop_dt =
471                as_datetime_with_timezone::<TimestampNanosecondType>(stop, stop_tz)
472                    .ok_or(exec_datafusion_err!(
473                        "Cannot generate timestamp for stop: {}: {:?}",
474                        stop,
475                        stop_tz
476                    ))?;
477
478            let mut current = start;
479            let mut current_dt =
480                as_datetime_with_timezone::<TimestampNanosecondType>(current, start_tz)
481                    .ok_or(exec_datafusion_err!(
482                    "Cannot generate timestamp for start: {}: {:?}",
483                    current,
484                    start_tz
485                ))?;
486
487            let values = from_fn(|| {
488                let generate_series_should_end = self.include_upper_bound
489                    && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt));
490                let range_should_end = !self.include_upper_bound
491                    && ((neg && current_dt <= stop_dt)
492                        || (!neg && current_dt >= stop_dt));
493                if generate_series_should_end || range_should_end {
494                    return None;
495                }
496
497                let prev_current = current;
498
499                if let Some(ts) =
500                    TimestampNanosecondType::add_month_day_nano(current, step, start_tz)
501                {
502                    current = ts;
503                    current_dt = as_datetime_with_timezone::<TimestampNanosecondType>(
504                        current, start_tz,
505                    )?;
506
507                    Some(Some(prev_current))
508                } else {
509                    // we failed to parse the timestamp here so terminate the series
510                    None
511                }
512            });
513
514            list_builder.append_value(values);
515        }
516
517        let arr = Arc::new(list_builder.finish());
518
519        Ok(arr)
520    }
521}
522
523/// Get the (start, stop, step) args for the range and generate_series function.
524/// If any of the arguments is NULL, returns None.
525fn retrieve_range_args(
526    start_array: Option<&Int64Array>,
527    stop: Option<i64>,
528    step_array: Option<&Int64Array>,
529    idx: usize,
530) -> Option<(i64, i64, i64)> {
531    // Default start value is 0 if not provided
532    let start =
533        start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
534    let stop = stop?;
535    // Default step value is 1 if not provided
536    let step =
537        step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
538    Some((start, stop, step))
539}
540
541/// Reserve space for `count` more elements, returning an error when the
542/// allocation would overflow `Vec`'s capacity limit or the allocator
543/// rejects it, rather than panicking on user-supplied SQL.
544fn reserve_range_capacity(values: &mut Vec<i64>, count: u64) -> Result<()> {
545    let count_usize = usize::try_from(count).map_err(|_| {
546        exec_datafusion_err!(
547            "Range too large to materialize: would produce {count} elements"
548        )
549    })?;
550    values.try_reserve(count_usize).map_err(|e| {
551        exec_datafusion_err!(
552            "Range too large to materialize: failed to allocate {count} elements: {e}"
553        )
554    })
555}
556
557/// Generate integer range values directly into the provided buffer.
558#[inline]
559fn generate_range_values(
560    start: i64,
561    stop: i64,
562    step: i64,
563    include_upper: bool,
564    values: &mut Vec<i64>,
565) -> Result<()> {
566    if !include_upper && start == stop {
567        return Ok(());
568    }
569
570    if step > 0 {
571        let limit = if include_upper {
572            stop
573        } else {
574            stop.saturating_sub(1)
575        };
576        if start > limit {
577            return Ok(());
578        }
579        let count = (start.abs_diff(limit) / step.unsigned_abs()).saturating_add(1);
580        reserve_range_capacity(values, count)?;
581        let mut current = start;
582        while current <= limit {
583            values.push(current);
584            match current.checked_add(step) {
585                Some(next) => current = next,
586                None => break,
587            }
588        }
589    } else if step < 0 {
590        let limit = if include_upper {
591            stop
592        } else {
593            stop.saturating_add(1)
594        };
595        if start < limit {
596            return Ok(());
597        }
598        let count = (start.abs_diff(limit) / step.unsigned_abs()).saturating_add(1);
599        reserve_range_capacity(values, count)?;
600        let mut current = start;
601        while current >= limit {
602            values.push(current);
603            match current.checked_add(step) {
604                Some(next) => current = next,
605                None => break,
606            }
607        }
608    }
609    Ok(())
610}
611
612fn parse_tz(tz: &Option<&str>) -> Result<Tz> {
613    let tz = tz.unwrap_or_else(|| "+00");
614
615    Tz::from_str(tz)
616        .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
617}
618
619fn date32_to_string(value: i32) -> String {
620    if let Some(d) = Date32Type::to_naive_date_opt(value) {
621        format!("{value} ({d})")
622    } else {
623        format!("{value} (unknown date)")
624    }
625}