1use crate::utils::make_scalar_function;
21use arrow::buffer::OffsetBuffer;
22use arrow::datatypes::TimeUnit;
23use arrow::datatypes::{DataType, Field, IntervalUnit::MonthDayNano};
24use arrow::{
25 array::{
26 Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullBufferBuilder,
27 builder::{Date32Builder, TimestampNanosecondBuilder},
28 temporal_conversions::as_datetime_with_timezone,
29 timezone::Tz,
30 types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType},
31 },
32 compute::cast,
33};
34use datafusion_common::internal_err;
35use datafusion_common::{
36 Result, exec_datafusion_err, exec_err, not_impl_datafusion_err,
37 utils::take_function_args,
38};
39use datafusion_common::{
40 ScalarValue,
41 cast::{
42 as_date32_array, as_int64_array, as_interval_mdn_array,
43 as_timestamp_nanosecond_array,
44 },
45 types::{
46 NativeType, logical_date, logical_int64, logical_interval_mdn, logical_string,
47 },
48};
49use datafusion_expr::{
50 Coercion, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature,
51 TypeSignatureClass, Volatility,
52};
53use datafusion_macros::user_doc;
54use std::any::Any;
55use std::cmp::Ordering;
56use std::iter::from_fn;
57use std::str::FromStr;
58use std::sync::Arc;
59
60make_udf_expr_and_func!(
61 Range,
62 range,
63 start stop step,
64 "create a list of values in the range between start and stop",
65 range_udf,
66 Range::new
67);
68
69make_udf_expr_and_func!(
70 GenSeries,
71 gen_series,
72 start stop step,
73 "create a list of values in the range between start and stop, include upper bound",
74 gen_series_udf,
75 Range::generate_series
76);
77
78#[user_doc(
79 doc_section(label = "Array Functions"),
80 description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
81 syntax_example = "range(stop)
82range(start, stop[, step])",
83 sql_example = r#"```sql
84> select range(2, 10, 3);
85+-----------------------------------+
86| range(Int64(2),Int64(10),Int64(3))|
87+-----------------------------------+
88| [2, 5, 8] |
89+-----------------------------------+
90
91> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
92+--------------------------------------------------------------------------+
93| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH) |
94+--------------------------------------------------------------------------+
95| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
96+--------------------------------------------------------------------------+
97```"#,
98 argument(
99 name = "start",
100 description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
101 ),
102 argument(
103 name = "end",
104 description = "End of the range (not included). Type must be the same as start."
105 ),
106 argument(
107 name = "step",
108 description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
109 )
110)]
111struct RangeDoc {}
112
113#[user_doc(
114 doc_section(label = "Array Functions"),
115 description = "Similar to the range function, but it includes the upper bound.",
116 syntax_example = "generate_series(stop)
117generate_series(start, stop[, step])",
118 sql_example = r#"```sql
119> select generate_series(1,3);
120+------------------------------------+
121| generate_series(Int64(1),Int64(3)) |
122+------------------------------------+
123| [1, 2, 3] |
124+------------------------------------+
125```"#,
126 argument(
127 name = "start",
128 description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
129 ),
130 argument(
131 name = "end",
132 description = "End of the series (included). Type must be the same as start."
133 ),
134 argument(
135 name = "step",
136 description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
137 )
138)]
139struct GenerateSeriesDoc {}
140
141#[derive(Debug, PartialEq, Eq, Hash)]
142pub struct Range {
143 signature: Signature,
144 include_upper_bound: bool,
146}
147
148impl Default for Range {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl Range {
155 fn defined_signature() -> Signature {
156 let integer = Coercion::new_implicit(
159 TypeSignatureClass::Native(logical_int64()),
160 vec![TypeSignatureClass::Integer],
161 NativeType::Int64,
162 );
163 let interval = Coercion::new_implicit(
166 TypeSignatureClass::Native(logical_interval_mdn()),
167 vec![TypeSignatureClass::Interval],
168 NativeType::Interval(MonthDayNano),
169 );
170 let date = Coercion::new_implicit(
174 TypeSignatureClass::Native(logical_date()),
175 vec![TypeSignatureClass::Native(logical_string())],
176 NativeType::Date,
177 );
178 let timestamp = Coercion::new_exact(TypeSignatureClass::Timestamp);
179 Signature::one_of(
180 vec![
181 TypeSignature::Coercible(vec![integer.clone()]),
184 TypeSignature::Coercible(vec![integer.clone(), integer.clone()]),
186 TypeSignature::Coercible(vec![integer.clone(), integer.clone(), integer]),
188 TypeSignature::Coercible(vec![date.clone(), date, interval.clone()]),
190 TypeSignature::Coercible(vec![timestamp.clone(), timestamp, interval]),
192 ],
193 Volatility::Immutable,
194 )
195 }
196
197 pub fn new() -> Self {
199 Self {
200 signature: Self::defined_signature(),
201 include_upper_bound: false,
202 }
203 }
204
205 fn generate_series() -> Self {
207 Self {
208 signature: Self::defined_signature(),
209 include_upper_bound: true,
210 }
211 }
212}
213
214impl ScalarUDFImpl for Range {
215 fn as_any(&self) -> &dyn Any {
216 self
217 }
218
219 fn name(&self) -> &str {
220 if self.include_upper_bound {
221 "generate_series"
222 } else {
223 "range"
224 }
225 }
226
227 fn signature(&self) -> &Signature {
228 &self.signature
229 }
230
231 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
232 if arg_types.iter().any(|t| t.is_null()) {
233 return Ok(DataType::Null);
234 }
235
236 match (&arg_types[0], arg_types.get(1)) {
237 (_, Some(DataType::Date64)) | (DataType::Date64, _) => Ok(DataType::List(
239 Arc::new(Field::new_list_field(DataType::Date32, true)),
240 )),
241 (DataType::Timestamp(_, tz), _) => {
243 Ok(DataType::List(Arc::new(Field::new_list_field(
244 DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
245 true,
246 ))))
247 }
248 _ => Ok(DataType::List(Arc::new(Field::new_list_field(
249 arg_types[0].clone(),
250 true,
251 )))),
252 }
253 }
254
255 fn invoke_with_args(
256 &self,
257 args: datafusion_expr::ScalarFunctionArgs,
258 ) -> Result<ColumnarValue> {
259 let args = &args.args;
260
261 if args.iter().any(|arg| arg.data_type().is_null()) {
262 return Ok(ColumnarValue::Scalar(ScalarValue::Null));
263 }
264 match args[0].data_type() {
265 DataType::Int64 => {
266 make_scalar_function(|args| self.gen_range_inner(args))(args)
267 }
268 DataType::Date32 | DataType::Date64 => {
269 make_scalar_function(|args| self.gen_range_date(args))(args)
270 }
271 DataType::Timestamp(_, _) => {
272 make_scalar_function(|args| self.gen_range_timestamp(args))(args)
273 }
274 dt => {
275 internal_err!(
276 "Signature failed to guard unknown input type for {}: {dt}",
277 self.name()
278 )
279 }
280 }
281 }
282
283 fn documentation(&self) -> Option<&Documentation> {
284 if self.include_upper_bound {
285 GenerateSeriesDoc {}.doc()
286 } else {
287 RangeDoc {}.doc()
288 }
289 }
290}
291
292impl Range {
293 fn gen_range_inner(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
308 let (start_array, stop_array, step_array) = match args {
309 [stop_array] => (None, as_int64_array(stop_array)?, None),
310 [start_array, stop_array] => (
311 Some(as_int64_array(start_array)?),
312 as_int64_array(stop_array)?,
313 None,
314 ),
315 [start_array, stop_array, step_array] => (
316 Some(as_int64_array(start_array)?),
317 as_int64_array(stop_array)?,
318 Some(as_int64_array(step_array)?),
319 ),
320 _ => return internal_err!("{} expects 1 to 3 arguments", self.name()),
321 };
322
323 let mut values = vec![];
324 let mut offsets = vec![0];
325 let mut valid = NullBufferBuilder::new(stop_array.len());
326 for (idx, stop) in stop_array.iter().enumerate() {
327 match retrieve_range_args(start_array, stop, step_array, idx) {
328 Some((_, _, 0)) => {
329 return exec_err!(
330 "step can't be 0 for function {}(start [, stop, step])",
331 self.name()
332 );
333 }
334 Some((start, stop, step)) => {
335 let step_abs =
338 usize::try_from(step.unsigned_abs()).map_err(|_| {
339 not_impl_datafusion_err!("step {} can't fit into usize", step)
340 })?;
341 values.extend(
342 gen_range_iter(start, stop, step < 0, self.include_upper_bound)
343 .step_by(step_abs),
344 );
345 offsets.push(values.len() as i32);
346 valid.append_non_null();
347 }
348 None => {
350 offsets.push(values.len() as i32);
351 valid.append_null();
352 }
353 };
354 }
355 let arr = Arc::new(ListArray::try_new(
356 Arc::new(Field::new_list_field(DataType::Int64, true)),
357 OffsetBuffer::new(offsets.into()),
358 Arc::new(Int64Array::from(values)),
359 valid.finish(),
360 )?);
361 Ok(arr)
362 }
363
364 fn gen_range_date(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
365 let [start, stop, step] = take_function_args(self.name(), args)?;
366 let step = as_interval_mdn_array(step)?;
367
368 let start = cast(start, &DataType::Date32)?;
371 let start = as_date32_array(&start)?;
372 let stop = cast(stop, &DataType::Date32)?;
373 let stop = as_date32_array(&stop)?;
374
375 let values_builder = Date32Builder::new();
377 let mut list_builder = ListBuilder::new(values_builder);
378
379 for idx in 0..stop.len() {
380 if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
381 list_builder.append_null();
382 continue;
383 }
384
385 let start = start.value(idx);
386 let stop = stop.value(idx);
387 let step = step.value(idx);
388
389 let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
390 if months == 0 && days == 0 {
391 return exec_err!("Cannot generate date range less than 1 day.");
392 }
393
394 let stop = if !self.include_upper_bound {
395 Date32Type::subtract_month_day_nano_opt(stop, step).ok_or_else(|| {
396 exec_datafusion_err!(
397 "Cannot generate date range where stop {} - {step:?}) overflows",
398 date32_to_string(stop)
399 )
400 })?
401 } else {
402 stop
403 };
404
405 let neg = months < 0 || days < 0;
406 let mut new_date = Some(start);
407
408 let values = from_fn(|| {
409 let Some(current_date) = new_date else {
410 return None; };
412 if (neg && current_date < stop) || (!neg && current_date > stop) {
413 None
414 } else {
415 new_date = Date32Type::add_month_day_nano_opt(current_date, step);
416 Some(Some(current_date))
417 }
418 });
419
420 list_builder.append_value(values);
421 }
422
423 let arr = Arc::new(list_builder.finish());
424
425 Ok(arr)
426 }
427
428 fn gen_range_timestamp(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
429 let [start, stop, step] = take_function_args(self.name(), args)?;
430 let step = as_interval_mdn_array(step)?;
431
432 fn cast_to_ns(arr: &ArrayRef) -> Result<ArrayRef> {
435 match arr.data_type() {
436 DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok(Arc::clone(arr)),
437 DataType::Timestamp(_, tz) => Ok(cast(
438 arr,
439 &DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
440 )?),
441 _ => unreachable!(),
442 }
443 }
444 let start = cast_to_ns(start)?;
445 let start = as_timestamp_nanosecond_array(&start)?;
446 let stop = cast_to_ns(stop)?;
447 let stop = as_timestamp_nanosecond_array(&stop)?;
448
449 let start_tz = parse_tz(&start.timezone())?;
450 let stop_tz = parse_tz(&stop.timezone())?;
451
452 let values_builder = start
454 .timezone()
455 .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
456 TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
457 });
458 let mut list_builder = ListBuilder::new(values_builder);
459
460 for idx in 0..start.len() {
461 if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
462 list_builder.append_null();
463 continue;
464 }
465
466 let start = start.value(idx);
467 let stop = stop.value(idx);
468 let step = step.value(idx);
469
470 let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
471 if months == 0 && days == 0 && ns == 0 {
472 return exec_err!("Interval argument to {} must not be 0", self.name());
473 }
474
475 let neg = TimestampNanosecondType::add_month_day_nano(start, step, start_tz)
476 .ok_or(exec_datafusion_err!(
477 "Cannot generate timestamp range where start + step overflows"
478 ))?
479 .cmp(&start)
480 == Ordering::Less;
481
482 let stop_dt =
483 as_datetime_with_timezone::<TimestampNanosecondType>(stop, stop_tz)
484 .ok_or(exec_datafusion_err!(
485 "Cannot generate timestamp for stop: {}: {:?}",
486 stop,
487 stop_tz
488 ))?;
489
490 let mut current = start;
491 let mut current_dt =
492 as_datetime_with_timezone::<TimestampNanosecondType>(current, start_tz)
493 .ok_or(exec_datafusion_err!(
494 "Cannot generate timestamp for start: {}: {:?}",
495 current,
496 start_tz
497 ))?;
498
499 let values = from_fn(|| {
500 let generate_series_should_end = self.include_upper_bound
501 && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt));
502 let range_should_end = !self.include_upper_bound
503 && ((neg && current_dt <= stop_dt)
504 || (!neg && current_dt >= stop_dt));
505 if generate_series_should_end || range_should_end {
506 return None;
507 }
508
509 let prev_current = current;
510
511 if let Some(ts) =
512 TimestampNanosecondType::add_month_day_nano(current, step, start_tz)
513 {
514 current = ts;
515 current_dt = as_datetime_with_timezone::<TimestampNanosecondType>(
516 current, start_tz,
517 )?;
518
519 Some(Some(prev_current))
520 } else {
521 None
523 }
524 });
525
526 list_builder.append_value(values);
527 }
528
529 let arr = Arc::new(list_builder.finish());
530
531 Ok(arr)
532 }
533}
534
535fn retrieve_range_args(
538 start_array: Option<&Int64Array>,
539 stop: Option<i64>,
540 step_array: Option<&Int64Array>,
541 idx: usize,
542) -> Option<(i64, i64, i64)> {
543 let start =
545 start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
546 let stop = stop?;
547 let step =
549 step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
550 Some((start, stop, step))
551}
552
553fn gen_range_iter(
555 start: i64,
556 stop: i64,
557 decreasing: bool,
558 include_upper: bool,
559) -> Box<dyn Iterator<Item = i64>> {
560 match (decreasing, include_upper) {
561 (true, true) => Box::new((stop..=start).rev()),
563 (true, false) => {
565 if stop == i64::MAX {
566 Box::new(std::iter::empty())
569 } else {
570 Box::new((stop + 1..=start).rev())
573 }
574 }
575 (false, true) => Box::new(start..=stop),
577 (false, false) => Box::new(start..stop),
579 }
580}
581
582fn parse_tz(tz: &Option<&str>) -> Result<Tz> {
583 let tz = tz.unwrap_or_else(|| "+00");
584
585 Tz::from_str(tz)
586 .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
587}
588
589fn date32_to_string(value: i32) -> String {
590 if let Some(d) = Date32Type::to_naive_date_opt(value) {
591 format!("{value} ({d})")
592 } else {
593 format!("{value} (unknown date)")
594 }
595}