1use crate::utils::make_scalar_function;
21use arrow::array::{
22 builder::{Date32Builder, TimestampNanosecondBuilder},
23 temporal_conversions::as_datetime_with_timezone,
24 timezone::Tz,
25 types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType as TSNT},
26 Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullArray, NullBufferBuilder,
27 TimestampNanosecondArray,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{
31 DataType, DataType::*, Field, IntervalUnit::MonthDayNano, TimeUnit::Nanosecond,
32};
33use datafusion_common::cast::{
34 as_date32_array, as_int64_array, as_interval_mdn_array, as_timestamp_nanosecond_array,
35};
36use datafusion_common::{
37 exec_datafusion_err, exec_err, internal_err, not_impl_datafusion_err,
38 utils::take_function_args, Result,
39};
40use datafusion_expr::{
41 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
42};
43use datafusion_macros::user_doc;
44use itertools::Itertools;
45use std::any::Any;
46use std::cmp::Ordering;
47use std::iter::from_fn;
48use std::str::FromStr;
49use std::sync::Arc;
50
51make_udf_expr_and_func!(
52 Range,
53 range,
54 start stop step,
55 "create a list of values in the range between start and stop",
56 range_udf
57);
58
59#[user_doc(
60 doc_section(label = "Array Functions"),
61 description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
62 syntax_example = "range(start, stop, step)",
63 sql_example = r#"```sql
64> select range(2, 10, 3);
65+-----------------------------------+
66| range(Int64(2),Int64(10),Int64(3))|
67+-----------------------------------+
68| [2, 5, 8] |
69+-----------------------------------+
70
71> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
72+--------------------------------------------------------------+
73| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH) |
74+--------------------------------------------------------------+
75| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
76+--------------------------------------------------------------+
77```"#,
78 argument(
79 name = "start",
80 description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
81 ),
82 argument(
83 name = "end",
84 description = "End of the range (not included). Type must be the same as start."
85 ),
86 argument(
87 name = "step",
88 description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
89 )
90)]
91#[derive(Debug)]
92pub struct Range {
93 signature: Signature,
94 aliases: Vec<String>,
95}
96
97impl Default for Range {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102impl Range {
103 pub fn new() -> Self {
104 Self {
105 signature: Signature::user_defined(Volatility::Immutable),
106 aliases: vec![],
107 }
108 }
109}
110impl ScalarUDFImpl for Range {
111 fn as_any(&self) -> &dyn Any {
112 self
113 }
114 fn name(&self) -> &str {
115 "range"
116 }
117
118 fn signature(&self) -> &Signature {
119 &self.signature
120 }
121
122 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
123 arg_types
124 .iter()
125 .map(|arg_type| match arg_type {
126 Null => Ok(Null),
127 Int8 => Ok(Int64),
128 Int16 => Ok(Int64),
129 Int32 => Ok(Int64),
130 Int64 => Ok(Int64),
131 UInt8 => Ok(Int64),
132 UInt16 => Ok(Int64),
133 UInt32 => Ok(Int64),
134 UInt64 => Ok(Int64),
135 Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
136 Date32 => Ok(Date32),
137 Date64 => Ok(Date32),
138 Utf8 => Ok(Date32),
139 LargeUtf8 => Ok(Date32),
140 Utf8View => Ok(Date32),
141 Interval(_) => Ok(Interval(MonthDayNano)),
142 _ => exec_err!("Unsupported DataType"),
143 })
144 .try_collect()
145 }
146
147 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
148 if arg_types.iter().any(|t| t.is_null()) {
149 Ok(Null)
150 } else {
151 Ok(List(Arc::new(Field::new_list_field(
152 arg_types[0].clone(),
153 true,
154 ))))
155 }
156 }
157
158 fn invoke_with_args(
159 &self,
160 args: datafusion_expr::ScalarFunctionArgs,
161 ) -> Result<ColumnarValue> {
162 let args = &args.args;
163
164 if args.iter().any(|arg| arg.data_type().is_null()) {
165 return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
166 }
167 match args[0].data_type() {
168 Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args),
169 Date32 => make_scalar_function(|args| gen_range_date(args, false))(args),
170 Timestamp(_, _) => {
171 make_scalar_function(|args| gen_range_timestamp(args, false))(args)
172 }
173 dt => {
174 exec_err!("unsupported type for RANGE. Expected Int64, Date32 or Timestamp, got: {dt}")
175 }
176 }
177 }
178
179 fn aliases(&self) -> &[String] {
180 &self.aliases
181 }
182
183 fn documentation(&self) -> Option<&Documentation> {
184 self.doc()
185 }
186}
187
188make_udf_expr_and_func!(
189 GenSeries,
190 gen_series,
191 start stop step,
192 "create a list of values in the range between start and stop, include upper bound",
193 gen_series_udf
194);
195
196#[user_doc(
197 doc_section(label = "Array Functions"),
198 description = "Similar to the range function, but it includes the upper bound.",
199 syntax_example = "generate_series(start, stop, step)",
200 sql_example = r#"```sql
201> select generate_series(1,3);
202+------------------------------------+
203| generate_series(Int64(1),Int64(3)) |
204+------------------------------------+
205| [1, 2, 3] |
206+------------------------------------+
207```"#,
208 argument(
209 name = "start",
210 description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
211 ),
212 argument(
213 name = "end",
214 description = "End of the series (included). Type must be the same as start."
215 ),
216 argument(
217 name = "step",
218 description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
219 )
220)]
221#[derive(Debug)]
222pub(super) struct GenSeries {
223 signature: Signature,
224 aliases: Vec<String>,
225}
226impl GenSeries {
227 pub fn new() -> Self {
228 Self {
229 signature: Signature::user_defined(Volatility::Immutable),
230 aliases: vec![],
231 }
232 }
233}
234impl ScalarUDFImpl for GenSeries {
235 fn as_any(&self) -> &dyn Any {
236 self
237 }
238 fn name(&self) -> &str {
239 "generate_series"
240 }
241
242 fn signature(&self) -> &Signature {
243 &self.signature
244 }
245
246 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
247 arg_types
248 .iter()
249 .map(|arg_type| match arg_type {
250 Null => Ok(Null),
251 Int8 => Ok(Int64),
252 Int16 => Ok(Int64),
253 Int32 => Ok(Int64),
254 Int64 => Ok(Int64),
255 UInt8 => Ok(Int64),
256 UInt16 => Ok(Int64),
257 UInt32 => Ok(Int64),
258 UInt64 => Ok(Int64),
259 Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
260 Date32 => Ok(Date32),
261 Date64 => Ok(Date32),
262 Utf8 => Ok(Date32),
263 LargeUtf8 => Ok(Date32),
264 Utf8View => Ok(Date32),
265 Interval(_) => Ok(Interval(MonthDayNano)),
266 _ => exec_err!("Unsupported DataType"),
267 })
268 .try_collect()
269 }
270
271 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
272 if arg_types.iter().any(|t| t.is_null()) {
273 Ok(Null)
274 } else {
275 Ok(List(Arc::new(Field::new_list_field(
276 arg_types[0].clone(),
277 true,
278 ))))
279 }
280 }
281
282 fn invoke_with_args(
283 &self,
284 args: datafusion_expr::ScalarFunctionArgs,
285 ) -> Result<ColumnarValue> {
286 let args = &args.args;
287
288 if args.iter().any(|arg| arg.data_type().is_null()) {
289 return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
290 }
291 match args[0].data_type() {
292 Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args),
293 Date32 => make_scalar_function(|args| gen_range_date(args, true))(args),
294 Timestamp(_, _) => {
295 make_scalar_function(|args| gen_range_timestamp(args, true))(args)
296 }
297 dt => {
298 exec_err!(
299 "unsupported type for GENERATE_SERIES. Expected Int64, Date32 or Timestamp, got: {}",
300 dt
301 )
302 }
303 }
304 }
305
306 fn aliases(&self) -> &[String] {
307 &self.aliases
308 }
309
310 fn documentation(&self) -> Option<&Documentation> {
311 self.doc()
312 }
313}
314
315pub(super) fn gen_range_inner(
330 args: &[ArrayRef],
331 include_upper: bool,
332) -> Result<ArrayRef> {
333 let (start_array, stop_array, step_array) = match args.len() {
334 1 => (None, as_int64_array(&args[0])?, None),
335 2 => (
336 Some(as_int64_array(&args[0])?),
337 as_int64_array(&args[1])?,
338 None,
339 ),
340 3 => (
341 Some(as_int64_array(&args[0])?),
342 as_int64_array(&args[1])?,
343 Some(as_int64_array(&args[2])?),
344 ),
345 _ => return exec_err!("gen_range expects 1 to 3 arguments"),
346 };
347
348 let mut values = vec![];
349 let mut offsets = vec![0];
350 let mut valid = NullBufferBuilder::new(stop_array.len());
351 for (idx, stop) in stop_array.iter().enumerate() {
352 match retrieve_range_args(start_array, stop, step_array, idx) {
353 Some((_, _, 0)) => {
354 return exec_err!(
355 "step can't be 0 for function {}(start [, stop, step])",
356 if include_upper {
357 "generate_series"
358 } else {
359 "range"
360 }
361 );
362 }
363 Some((start, stop, step)) => {
364 let step_abs = usize::try_from(step.unsigned_abs()).map_err(|_| {
367 not_impl_datafusion_err!("step {} can't fit into usize", step)
368 })?;
369 values.extend(
370 gen_range_iter(start, stop, step < 0, include_upper)
371 .step_by(step_abs),
372 );
373 offsets.push(values.len() as i32);
374 valid.append_non_null();
375 }
376 None => {
378 offsets.push(values.len() as i32);
379 valid.append_null();
380 }
381 };
382 }
383 let arr = Arc::new(ListArray::try_new(
384 Arc::new(Field::new_list_field(Int64, true)),
385 OffsetBuffer::new(offsets.into()),
386 Arc::new(Int64Array::from(values)),
387 valid.finish(),
388 )?);
389 Ok(arr)
390}
391
392fn retrieve_range_args(
395 start_array: Option<&Int64Array>,
396 stop: Option<i64>,
397 step_array: Option<&Int64Array>,
398 idx: usize,
399) -> Option<(i64, i64, i64)> {
400 let start =
402 start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
403 let stop = stop?;
404 let step =
406 step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
407 Some((start, stop, step))
408}
409
410fn gen_range_iter(
412 start: i64,
413 stop: i64,
414 decreasing: bool,
415 include_upper: bool,
416) -> Box<dyn Iterator<Item = i64>> {
417 match (decreasing, include_upper) {
418 (true, true) => Box::new((stop..=start).rev()),
420 (true, false) => {
422 if stop == i64::MAX {
423 Box::new(std::iter::empty())
426 } else {
427 Box::new((stop + 1..=start).rev())
430 }
431 }
432 (false, true) => Box::new(start..=stop),
434 (false, false) => Box::new(start..stop),
436 }
437}
438
439fn gen_range_date(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
440 let [start, stop, step] = take_function_args("range", args)?;
441
442 let (start_array, stop_array, step_array) = (
443 Some(as_date32_array(start)?),
444 as_date32_array(stop)?,
445 Some(as_interval_mdn_array(step)?),
446 );
447
448 let values_builder = Date32Builder::new();
450 let mut list_builder = ListBuilder::new(values_builder);
451
452 for idx in 0..stop_array.len() {
453 if stop_array.is_null(idx) {
454 list_builder.append_null();
455 continue;
456 }
457 let mut stop = stop_array.value(idx);
458
459 let start = if let Some(start_array_values) = start_array {
460 if start_array_values.is_null(idx) {
461 list_builder.append_null();
462 continue;
463 }
464 start_array_values.value(idx)
465 } else {
466 list_builder.append_null();
467 continue;
468 };
469
470 let step = if let Some(step) = step_array {
471 if step.is_null(idx) {
472 list_builder.append_null();
473 continue;
474 }
475 step.value(idx)
476 } else {
477 list_builder.append_null();
478 continue;
479 };
480
481 let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
482
483 if months == 0 && days == 0 {
484 return exec_err!("Cannot generate date range less than 1 day.");
485 }
486
487 let neg = months < 0 || days < 0;
488 if !include_upper_bound {
489 stop = Date32Type::subtract_month_day_nano(stop, step);
490 }
491 let mut new_date = start;
492
493 let values = from_fn(|| {
494 if (neg && new_date < stop) || (!neg && new_date > stop) {
495 None
496 } else {
497 let current_date = new_date;
498 new_date = Date32Type::add_month_day_nano(new_date, step);
499 Some(Some(current_date))
500 }
501 });
502
503 list_builder.append_value(values);
504 }
505
506 let arr = Arc::new(list_builder.finish());
507
508 Ok(arr)
509}
510
511fn gen_range_timestamp(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
512 let func_name = if include_upper_bound {
513 "GENERATE_SERIES"
514 } else {
515 "RANGE"
516 };
517 let [start, stop, step] = take_function_args(func_name, args)?;
518
519 let (start_arr, start_tz_opt) = cast_timestamp_arg(start, include_upper_bound)?;
521 let (stop_arr, stop_tz_opt) = cast_timestamp_arg(stop, include_upper_bound)?;
522 let step_arr = as_interval_mdn_array(step)?;
523 let start_tz = parse_tz(start_tz_opt)?;
524 let stop_tz = parse_tz(stop_tz_opt)?;
525
526 let values_builder = start_tz_opt
528 .clone()
529 .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
530 TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
531 });
532 let mut list_builder = ListBuilder::new(values_builder);
533
534 for idx in 0..start_arr.len() {
535 if start_arr.is_null(idx) || stop_arr.is_null(idx) || step_arr.is_null(idx) {
536 list_builder.append_null();
537 continue;
538 }
539
540 let start = start_arr.value(idx);
541 let stop = stop_arr.value(idx);
542 let step = step_arr.value(idx);
543
544 let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
545 if months == 0 && days == 0 && ns == 0 {
546 return exec_err!(
547 "Interval argument to {} must not be 0",
548 if include_upper_bound {
549 "GENERATE_SERIES"
550 } else {
551 "RANGE"
552 }
553 );
554 }
555
556 let neg = TSNT::add_month_day_nano(start, step, start_tz)
557 .ok_or(exec_datafusion_err!(
558 "Cannot generate timestamp range where start + step overflows"
559 ))?
560 .cmp(&start)
561 == Ordering::Less;
562
563 let stop_dt = as_datetime_with_timezone::<TSNT>(stop, stop_tz).ok_or(
564 exec_datafusion_err!(
565 "Cannot generate timestamp for stop: {}: {:?}",
566 stop,
567 stop_tz
568 ),
569 )?;
570
571 let mut current = start;
572 let mut current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz).ok_or(
573 exec_datafusion_err!(
574 "Cannot generate timestamp for start: {}: {:?}",
575 current,
576 start_tz
577 ),
578 )?;
579
580 let values = from_fn(|| {
581 if (include_upper_bound
582 && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt)))
583 || (!include_upper_bound
584 && ((neg && current_dt <= stop_dt)
585 || (!neg && current_dt >= stop_dt)))
586 {
587 return None;
588 }
589
590 let prev_current = current;
591
592 if let Some(ts) = TSNT::add_month_day_nano(current, step, start_tz) {
593 current = ts;
594 current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz)?;
595
596 Some(Some(prev_current))
597 } else {
598 None
600 }
601 });
602
603 list_builder.append_value(values);
604 }
605
606 let arr = Arc::new(list_builder.finish());
607
608 Ok(arr)
609}
610
611fn cast_timestamp_arg(
612 arg: &ArrayRef,
613 include_upper: bool,
614) -> Result<(&TimestampNanosecondArray, &Option<Arc<str>>)> {
615 match arg.data_type() {
616 Timestamp(Nanosecond, tz_opt) => {
617 Ok((as_timestamp_nanosecond_array(arg)?, tz_opt))
618 }
619 _ => {
620 internal_err!(
621 "Unexpected argument type for {} : {}",
622 if include_upper {
623 "GENERATE_SERIES"
624 } else {
625 "RANGE"
626 },
627 arg.data_type()
628 )
629 }
630 }
631}
632
633fn parse_tz(tz: &Option<Arc<str>>) -> Result<Tz> {
634 let tz = tz.as_ref().map_or_else(|| "+00", |s| s);
635
636 Tz::from_str(tz)
637 .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
638}