1use crate::utils::make_scalar_function;
21use arrow::buffer::OffsetBuffer;
22use arrow::datatypes::TimeUnit;
23use arrow::datatypes::{DataType, Field, IntervalUnit::MonthDayNano};
24use arrow::{
25 array::{
26 Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullBufferBuilder,
27 builder::{Date32Builder, TimestampNanosecondBuilder},
28 temporal_conversions::as_datetime_with_timezone,
29 timezone::Tz,
30 types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType},
31 },
32 compute::cast,
33};
34use datafusion_common::internal_err;
35use datafusion_common::{
36 Result, exec_datafusion_err, exec_err, utils::take_function_args,
37};
38use datafusion_common::{
39 ScalarValue,
40 cast::{
41 as_date32_array, as_int64_array, as_interval_mdn_array,
42 as_timestamp_nanosecond_array,
43 },
44 types::{
45 NativeType, logical_date, logical_int64, logical_interval_mdn, logical_string,
46 },
47};
48use datafusion_expr::{
49 Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
50 TypeSignature, TypeSignatureClass, Volatility,
51};
52use datafusion_macros::user_doc;
53use std::cmp::Ordering;
54use std::iter::from_fn;
55use std::str::FromStr;
56use std::sync::Arc;
57
58make_udf_expr_and_func!(
59 Range,
60 range,
61 start stop step,
62 "create a list of values in the range between start and stop",
63 range_udf,
64 Range::new
65);
66
67make_udf_expr_and_func!(
68 GenSeries,
69 gen_series,
70 start stop step,
71 "create a list of values in the range between start and stop, include upper bound",
72 gen_series_udf,
73 Range::generate_series
74);
75
76#[user_doc(
77 doc_section(label = "Array Functions"),
78 description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
79 syntax_example = "range(stop)
80range(start, stop[, step])",
81 sql_example = r#"```sql
82> select range(2, 10, 3);
83+-----------------------------------+
84| range(Int64(2),Int64(10),Int64(3))|
85+-----------------------------------+
86| [2, 5, 8] |
87+-----------------------------------+
88
89> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
90+--------------------------------------------------------------------------+
91| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH) |
92+--------------------------------------------------------------------------+
93| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
94+--------------------------------------------------------------------------+
95```"#,
96 argument(
97 name = "start",
98 description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
99 ),
100 argument(
101 name = "end",
102 description = "End of the range (not included). Type must be the same as start."
103 ),
104 argument(
105 name = "step",
106 description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
107 )
108)]
109struct RangeDoc {}
110
111#[user_doc(
112 doc_section(label = "Array Functions"),
113 description = "Similar to the range function, but it includes the upper bound.",
114 syntax_example = "generate_series(stop)
115generate_series(start, stop[, step])",
116 sql_example = r#"```sql
117> select generate_series(1,3);
118+------------------------------------+
119| generate_series(Int64(1),Int64(3)) |
120+------------------------------------+
121| [1, 2, 3] |
122+------------------------------------+
123```"#,
124 argument(
125 name = "start",
126 description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
127 ),
128 argument(
129 name = "end",
130 description = "End of the series (included). Type must be the same as start."
131 ),
132 argument(
133 name = "step",
134 description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
135 )
136)]
137struct GenerateSeriesDoc {}
138
139#[derive(Debug, PartialEq, Eq, Hash)]
140pub struct Range {
141 signature: Signature,
142 include_upper_bound: bool,
144}
145
146impl Default for Range {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl Range {
153 fn defined_signature() -> Signature {
154 let integer = Coercion::new_implicit(
157 TypeSignatureClass::Native(logical_int64()),
158 vec![TypeSignatureClass::Integer],
159 NativeType::Int64,
160 );
161 let interval = Coercion::new_implicit(
164 TypeSignatureClass::Native(logical_interval_mdn()),
165 vec![TypeSignatureClass::Interval],
166 NativeType::Interval(MonthDayNano),
167 );
168 let date = Coercion::new_implicit(
172 TypeSignatureClass::Native(logical_date()),
173 vec![TypeSignatureClass::Native(logical_string())],
174 NativeType::Date,
175 );
176 let timestamp = Coercion::new_exact(TypeSignatureClass::Timestamp);
177 Signature::one_of(
178 vec![
179 TypeSignature::Coercible(vec![integer.clone()]),
182 TypeSignature::Coercible(vec![integer.clone(), integer.clone()]),
184 TypeSignature::Coercible(vec![integer.clone(), integer.clone(), integer]),
186 TypeSignature::Coercible(vec![date.clone(), date, interval.clone()]),
188 TypeSignature::Coercible(vec![timestamp.clone(), timestamp, interval]),
190 ],
191 Volatility::Immutable,
192 )
193 }
194
195 pub fn new() -> Self {
197 Self {
198 signature: Self::defined_signature(),
199 include_upper_bound: false,
200 }
201 }
202
203 fn generate_series() -> Self {
205 Self {
206 signature: Self::defined_signature(),
207 include_upper_bound: true,
208 }
209 }
210}
211
212impl ScalarUDFImpl for Range {
213 fn name(&self) -> &str {
214 if self.include_upper_bound {
215 "generate_series"
216 } else {
217 "range"
218 }
219 }
220
221 fn signature(&self) -> &Signature {
222 &self.signature
223 }
224
225 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
226 if arg_types.iter().any(|t| t.is_null()) {
227 return Ok(DataType::Null);
228 }
229
230 match (&arg_types[0], arg_types.get(1)) {
231 (_, Some(DataType::Date64)) | (DataType::Date64, _) => Ok(DataType::List(
233 Arc::new(Field::new_list_field(DataType::Date32, true)),
234 )),
235 (DataType::Timestamp(_, tz), _) => {
237 Ok(DataType::List(Arc::new(Field::new_list_field(
238 DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
239 true,
240 ))))
241 }
242 _ => Ok(DataType::List(Arc::new(Field::new_list_field(
243 arg_types[0].clone(),
244 true,
245 )))),
246 }
247 }
248
249 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
250 let args = &args.args;
251
252 if args.iter().any(|arg| arg.data_type().is_null()) {
253 return Ok(ColumnarValue::Scalar(ScalarValue::Null));
254 }
255 match args[0].data_type() {
256 DataType::Int64 => {
257 make_scalar_function(|args| self.gen_range_inner(args))(args)
258 }
259 DataType::Date32 | DataType::Date64 => {
260 make_scalar_function(|args| self.gen_range_date(args))(args)
261 }
262 DataType::Timestamp(_, _) => {
263 make_scalar_function(|args| self.gen_range_timestamp(args))(args)
264 }
265 dt => {
266 internal_err!(
267 "Signature failed to guard unknown input type for {}: {dt}",
268 self.name()
269 )
270 }
271 }
272 }
273
274 fn documentation(&self) -> Option<&Documentation> {
275 if self.include_upper_bound {
276 GenerateSeriesDoc {}.doc()
277 } else {
278 RangeDoc {}.doc()
279 }
280 }
281}
282
283impl Range {
284 fn gen_range_inner(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
299 let (start_array, stop_array, step_array) = match args {
300 [stop_array] => (None, as_int64_array(stop_array)?, None),
301 [start_array, stop_array] => (
302 Some(as_int64_array(start_array)?),
303 as_int64_array(stop_array)?,
304 None,
305 ),
306 [start_array, stop_array, step_array] => (
307 Some(as_int64_array(start_array)?),
308 as_int64_array(stop_array)?,
309 Some(as_int64_array(step_array)?),
310 ),
311 _ => return internal_err!("{} expects 1 to 3 arguments", self.name()),
312 };
313
314 let mut values = vec![];
315 let mut offsets = vec![0];
316 let mut valid = NullBufferBuilder::new(stop_array.len());
317 for (idx, stop) in stop_array.iter().enumerate() {
318 match retrieve_range_args(start_array, stop, step_array, idx) {
319 Some((_, _, 0)) => {
320 return exec_err!(
321 "step can't be 0 for function {}(start [, stop, step])",
322 self.name()
323 );
324 }
325 Some((start, stop, step)) => {
326 generate_range_values(
327 start,
328 stop,
329 step,
330 self.include_upper_bound,
331 &mut values,
332 )?;
333 offsets.push(values.len() as i32);
334 valid.append_non_null();
335 }
336 None => {
338 offsets.push(values.len() as i32);
339 valid.append_null();
340 }
341 };
342 }
343 let arr = Arc::new(ListArray::try_new(
344 Arc::new(Field::new_list_field(DataType::Int64, true)),
345 OffsetBuffer::new(offsets.into()),
346 Arc::new(Int64Array::from(values)),
347 valid.finish(),
348 )?);
349 Ok(arr)
350 }
351
352 fn gen_range_date(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
353 let [start, stop, step] = take_function_args(self.name(), args)?;
354 let step = as_interval_mdn_array(step)?;
355
356 let start = cast(start, &DataType::Date32)?;
359 let start = as_date32_array(&start)?;
360 let stop = cast(stop, &DataType::Date32)?;
361 let stop = as_date32_array(&stop)?;
362
363 let values_builder = Date32Builder::new();
365 let mut list_builder = ListBuilder::new(values_builder);
366
367 for idx in 0..stop.len() {
368 if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
369 list_builder.append_null();
370 continue;
371 }
372
373 let start = start.value(idx);
374 let stop = stop.value(idx);
375 let step = step.value(idx);
376
377 let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
378 if months == 0 && days == 0 {
379 return exec_err!("Cannot generate date range less than 1 day.");
380 }
381
382 let stop = if !self.include_upper_bound {
383 Date32Type::subtract_month_day_nano_opt(stop, step).ok_or_else(|| {
384 exec_datafusion_err!(
385 "Cannot generate date range where stop {} - {step:?}) overflows",
386 date32_to_string(stop)
387 )
388 })?
389 } else {
390 stop
391 };
392
393 let neg = months < 0 || days < 0;
394 let mut new_date = Some(start);
395
396 let values = from_fn(|| {
397 let Some(current_date) = new_date else {
398 return None; };
400 if (neg && current_date < stop) || (!neg && current_date > stop) {
401 None
402 } else {
403 new_date = Date32Type::add_month_day_nano_opt(current_date, step);
404 Some(Some(current_date))
405 }
406 });
407
408 list_builder.append_value(values);
409 }
410
411 let arr = Arc::new(list_builder.finish());
412
413 Ok(arr)
414 }
415
416 fn gen_range_timestamp(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
417 let [start, stop, step] = take_function_args(self.name(), args)?;
418 let step = as_interval_mdn_array(step)?;
419
420 fn cast_to_ns(arr: &ArrayRef) -> Result<ArrayRef> {
423 match arr.data_type() {
424 DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok(Arc::clone(arr)),
425 DataType::Timestamp(_, tz) => Ok(cast(
426 arr,
427 &DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
428 )?),
429 _ => unreachable!(),
430 }
431 }
432 let start = cast_to_ns(start)?;
433 let start = as_timestamp_nanosecond_array(&start)?;
434 let stop = cast_to_ns(stop)?;
435 let stop = as_timestamp_nanosecond_array(&stop)?;
436
437 let start_tz = parse_tz(&start.timezone())?;
438 let stop_tz = parse_tz(&stop.timezone())?;
439
440 let values_builder = start
442 .timezone()
443 .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
444 TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
445 });
446 let mut list_builder = ListBuilder::new(values_builder);
447
448 for idx in 0..start.len() {
449 if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
450 list_builder.append_null();
451 continue;
452 }
453
454 let start = start.value(idx);
455 let stop = stop.value(idx);
456 let step = step.value(idx);
457
458 let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
459 if months == 0 && days == 0 && ns == 0 {
460 return exec_err!("Interval argument to {} must not be 0", self.name());
461 }
462
463 let neg = TimestampNanosecondType::add_month_day_nano(start, step, start_tz)
464 .ok_or(exec_datafusion_err!(
465 "Cannot generate timestamp range where start + step overflows"
466 ))?
467 .cmp(&start)
468 == Ordering::Less;
469
470 let stop_dt =
471 as_datetime_with_timezone::<TimestampNanosecondType>(stop, stop_tz)
472 .ok_or(exec_datafusion_err!(
473 "Cannot generate timestamp for stop: {}: {:?}",
474 stop,
475 stop_tz
476 ))?;
477
478 let mut current = start;
479 let mut current_dt =
480 as_datetime_with_timezone::<TimestampNanosecondType>(current, start_tz)
481 .ok_or(exec_datafusion_err!(
482 "Cannot generate timestamp for start: {}: {:?}",
483 current,
484 start_tz
485 ))?;
486
487 let values = from_fn(|| {
488 let generate_series_should_end = self.include_upper_bound
489 && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt));
490 let range_should_end = !self.include_upper_bound
491 && ((neg && current_dt <= stop_dt)
492 || (!neg && current_dt >= stop_dt));
493 if generate_series_should_end || range_should_end {
494 return None;
495 }
496
497 let prev_current = current;
498
499 if let Some(ts) =
500 TimestampNanosecondType::add_month_day_nano(current, step, start_tz)
501 {
502 current = ts;
503 current_dt = as_datetime_with_timezone::<TimestampNanosecondType>(
504 current, start_tz,
505 )?;
506
507 Some(Some(prev_current))
508 } else {
509 None
511 }
512 });
513
514 list_builder.append_value(values);
515 }
516
517 let arr = Arc::new(list_builder.finish());
518
519 Ok(arr)
520 }
521}
522
523fn retrieve_range_args(
526 start_array: Option<&Int64Array>,
527 stop: Option<i64>,
528 step_array: Option<&Int64Array>,
529 idx: usize,
530) -> Option<(i64, i64, i64)> {
531 let start =
533 start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
534 let stop = stop?;
535 let step =
537 step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
538 Some((start, stop, step))
539}
540
541fn reserve_range_capacity(values: &mut Vec<i64>, count: u64) -> Result<()> {
545 let count_usize = usize::try_from(count).map_err(|_| {
546 exec_datafusion_err!(
547 "Range too large to materialize: would produce {count} elements"
548 )
549 })?;
550 values.try_reserve(count_usize).map_err(|e| {
551 exec_datafusion_err!(
552 "Range too large to materialize: failed to allocate {count} elements: {e}"
553 )
554 })
555}
556
557#[inline]
559fn generate_range_values(
560 start: i64,
561 stop: i64,
562 step: i64,
563 include_upper: bool,
564 values: &mut Vec<i64>,
565) -> Result<()> {
566 if !include_upper && start == stop {
567 return Ok(());
568 }
569
570 if step > 0 {
571 let limit = if include_upper {
572 stop
573 } else {
574 stop.saturating_sub(1)
575 };
576 if start > limit {
577 return Ok(());
578 }
579 let count = (start.abs_diff(limit) / step.unsigned_abs()).saturating_add(1);
580 reserve_range_capacity(values, count)?;
581 let mut current = start;
582 while current <= limit {
583 values.push(current);
584 match current.checked_add(step) {
585 Some(next) => current = next,
586 None => break,
587 }
588 }
589 } else if step < 0 {
590 let limit = if include_upper {
591 stop
592 } else {
593 stop.saturating_add(1)
594 };
595 if start < limit {
596 return Ok(());
597 }
598 let count = (start.abs_diff(limit) / step.unsigned_abs()).saturating_add(1);
599 reserve_range_capacity(values, count)?;
600 let mut current = start;
601 while current >= limit {
602 values.push(current);
603 match current.checked_add(step) {
604 Some(next) => current = next,
605 None => break,
606 }
607 }
608 }
609 Ok(())
610}
611
612fn parse_tz(tz: &Option<&str>) -> Result<Tz> {
613 let tz = tz.unwrap_or_else(|| "+00");
614
615 Tz::from_str(tz)
616 .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
617}
618
619fn date32_to_string(value: i32) -> String {
620 if let Some(d) = Date32Type::to_naive_date_opt(value) {
621 format!("{value} ({d})")
622 } else {
623 format!("{value} (unknown date)")
624 }
625}