1use crate::utils::make_scalar_function;
21use arrow::buffer::OffsetBuffer;
22use arrow::datatypes::TimeUnit;
23use arrow::datatypes::{DataType, Field, IntervalUnit::MonthDayNano};
24use arrow::{
25 array::{
26 builder::{Date32Builder, TimestampNanosecondBuilder},
27 temporal_conversions::as_datetime_with_timezone,
28 timezone::Tz,
29 types::{Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType},
30 Array, ArrayRef, Int64Array, ListArray, ListBuilder, NullBufferBuilder,
31 },
32 compute::cast,
33};
34use datafusion_common::internal_err;
35use datafusion_common::{
36 cast::{
37 as_date32_array, as_int64_array, as_interval_mdn_array,
38 as_timestamp_nanosecond_array,
39 },
40 types::{
41 logical_date, logical_int64, logical_interval_mdn, logical_string, NativeType,
42 },
43 ScalarValue,
44};
45use datafusion_common::{
46 exec_datafusion_err, exec_err, not_impl_datafusion_err, utils::take_function_args,
47 Result,
48};
49use datafusion_expr::{
50 Coercion, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature,
51 TypeSignatureClass, Volatility,
52};
53use datafusion_macros::user_doc;
54use std::any::Any;
55use std::cmp::Ordering;
56use std::iter::from_fn;
57use std::str::FromStr;
58use std::sync::Arc;
59
60make_udf_expr_and_func!(
61 Range,
62 range,
63 start stop step,
64 "create a list of values in the range between start and stop",
65 range_udf,
66 Range::new
67);
68
69make_udf_expr_and_func!(
70 GenSeries,
71 gen_series,
72 start stop step,
73 "create a list of values in the range between start and stop, include upper bound",
74 gen_series_udf,
75 Range::generate_series
76);
77
78#[user_doc(
79 doc_section(label = "Array Functions"),
80 description = "Returns an Arrow array between start and stop with step. The range start..end contains all values with start <= x < end. It is empty if start >= end. Step cannot be 0.",
81 syntax_example = "range(stop)
82range(start, stop[, step])",
83 sql_example = r#"```sql
84> select range(2, 10, 3);
85+-----------------------------------+
86| range(Int64(2),Int64(10),Int64(3))|
87+-----------------------------------+
88| [2, 5, 8] |
89+-----------------------------------+
90
91> select range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
92+--------------------------------------------------------------------------+
93| range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH) |
94+--------------------------------------------------------------------------+
95| [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01] |
96+--------------------------------------------------------------------------+
97```"#,
98 argument(
99 name = "start",
100 description = "Start of the range. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
101 ),
102 argument(
103 name = "end",
104 description = "End of the range (not included). Type must be the same as start."
105 ),
106 argument(
107 name = "step",
108 description = "Increase by step (cannot be 0). Steps less than a day are supported only for timestamp ranges."
109 )
110)]
111struct RangeDoc {}
112
113#[user_doc(
114 doc_section(label = "Array Functions"),
115 description = "Similar to the range function, but it includes the upper bound.",
116 syntax_example = "generate_series(stop)
117generate_series(start, stop[, step])",
118 sql_example = r#"```sql
119> select generate_series(1,3);
120+------------------------------------+
121| generate_series(Int64(1),Int64(3)) |
122+------------------------------------+
123| [1, 2, 3] |
124+------------------------------------+
125```"#,
126 argument(
127 name = "start",
128 description = "Start of the series. Ints, timestamps, dates or string types that can be coerced to Date32 are supported."
129 ),
130 argument(
131 name = "end",
132 description = "End of the series (included). Type must be the same as start."
133 ),
134 argument(
135 name = "step",
136 description = "Increase by step (can not be 0). Steps less than a day are supported only for timestamp ranges."
137 )
138)]
139struct GenerateSeriesDoc {}
140
141#[derive(Debug, PartialEq, Eq, Hash)]
142pub struct Range {
143 signature: Signature,
144 include_upper_bound: bool,
146}
147
148impl Default for Range {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl Range {
155 fn defined_signature() -> Signature {
156 let integer = Coercion::new_implicit(
159 TypeSignatureClass::Native(logical_int64()),
160 vec![TypeSignatureClass::Integer],
161 NativeType::Int64,
162 );
163 let interval = Coercion::new_implicit(
166 TypeSignatureClass::Native(logical_interval_mdn()),
167 vec![TypeSignatureClass::Interval],
168 NativeType::Interval(MonthDayNano),
169 );
170 let date = Coercion::new_implicit(
174 TypeSignatureClass::Native(logical_date()),
175 vec![TypeSignatureClass::Native(logical_string())],
176 NativeType::Date,
177 );
178 let timestamp = Coercion::new_exact(TypeSignatureClass::Timestamp);
179 Signature::one_of(
180 vec![
181 TypeSignature::Coercible(vec![integer.clone()]),
184 TypeSignature::Coercible(vec![integer.clone(), integer.clone()]),
186 TypeSignature::Coercible(vec![integer.clone(), integer.clone(), integer]),
188 TypeSignature::Coercible(vec![date.clone(), date, interval.clone()]),
190 TypeSignature::Coercible(vec![timestamp.clone(), timestamp, interval]),
192 ],
193 Volatility::Immutable,
194 )
195 }
196
197 pub fn new() -> Self {
199 Self {
200 signature: Self::defined_signature(),
201 include_upper_bound: false,
202 }
203 }
204
205 fn generate_series() -> Self {
207 Self {
208 signature: Self::defined_signature(),
209 include_upper_bound: true,
210 }
211 }
212}
213
214impl ScalarUDFImpl for Range {
215 fn as_any(&self) -> &dyn Any {
216 self
217 }
218
219 fn name(&self) -> &str {
220 if self.include_upper_bound {
221 "generate_series"
222 } else {
223 "range"
224 }
225 }
226
227 fn signature(&self) -> &Signature {
228 &self.signature
229 }
230
231 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
232 if arg_types.iter().any(|t| t.is_null()) {
233 return Ok(DataType::Null);
234 }
235
236 match (&arg_types[0], arg_types.get(1)) {
237 (_, Some(DataType::Date64)) | (DataType::Date64, _) => Ok(DataType::List(
239 Arc::new(Field::new_list_field(DataType::Date32, true)),
240 )),
241 (DataType::Timestamp(_, tz), _) => {
243 Ok(DataType::List(Arc::new(Field::new_list_field(
244 DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
245 true,
246 ))))
247 }
248 _ => Ok(DataType::List(Arc::new(Field::new_list_field(
249 arg_types[0].clone(),
250 true,
251 )))),
252 }
253 }
254
255 fn invoke_with_args(
256 &self,
257 args: datafusion_expr::ScalarFunctionArgs,
258 ) -> Result<ColumnarValue> {
259 let args = &args.args;
260
261 if args.iter().any(|arg| arg.data_type().is_null()) {
262 return Ok(ColumnarValue::Scalar(ScalarValue::Null));
263 }
264 match args[0].data_type() {
265 DataType::Int64 => {
266 make_scalar_function(|args| self.gen_range_inner(args))(args)
267 }
268 DataType::Date32 | DataType::Date64 => {
269 make_scalar_function(|args| self.gen_range_date(args))(args)
270 }
271 DataType::Timestamp(_, _) => {
272 make_scalar_function(|args| self.gen_range_timestamp(args))(args)
273 }
274 dt => {
275 internal_err!(
276 "Signature failed to guard unknown input type for {}: {dt}",
277 self.name()
278 )
279 }
280 }
281 }
282
283 fn documentation(&self) -> Option<&Documentation> {
284 if self.include_upper_bound {
285 GenerateSeriesDoc {}.doc()
286 } else {
287 RangeDoc {}.doc()
288 }
289 }
290}
291
292impl Range {
293 fn gen_range_inner(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
308 let (start_array, stop_array, step_array) = match args {
309 [stop_array] => (None, as_int64_array(stop_array)?, None),
310 [start_array, stop_array] => (
311 Some(as_int64_array(start_array)?),
312 as_int64_array(stop_array)?,
313 None,
314 ),
315 [start_array, stop_array, step_array] => (
316 Some(as_int64_array(start_array)?),
317 as_int64_array(stop_array)?,
318 Some(as_int64_array(step_array)?),
319 ),
320 _ => return internal_err!("{} expects 1 to 3 arguments", self.name()),
321 };
322
323 let mut values = vec![];
324 let mut offsets = vec![0];
325 let mut valid = NullBufferBuilder::new(stop_array.len());
326 for (idx, stop) in stop_array.iter().enumerate() {
327 match retrieve_range_args(start_array, stop, step_array, idx) {
328 Some((_, _, 0)) => {
329 return exec_err!(
330 "step can't be 0 for function {}(start [, stop, step])",
331 self.name()
332 );
333 }
334 Some((start, stop, step)) => {
335 let step_abs =
338 usize::try_from(step.unsigned_abs()).map_err(|_| {
339 not_impl_datafusion_err!("step {} can't fit into usize", step)
340 })?;
341 values.extend(
342 gen_range_iter(start, stop, step < 0, self.include_upper_bound)
343 .step_by(step_abs),
344 );
345 offsets.push(values.len() as i32);
346 valid.append_non_null();
347 }
348 None => {
350 offsets.push(values.len() as i32);
351 valid.append_null();
352 }
353 };
354 }
355 let arr = Arc::new(ListArray::try_new(
356 Arc::new(Field::new_list_field(DataType::Int64, true)),
357 OffsetBuffer::new(offsets.into()),
358 Arc::new(Int64Array::from(values)),
359 valid.finish(),
360 )?);
361 Ok(arr)
362 }
363
364 fn gen_range_date(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
365 let [start, stop, step] = take_function_args(self.name(), args)?;
366 let step = as_interval_mdn_array(step)?;
367
368 let start = cast(start, &DataType::Date32)?;
371 let start = as_date32_array(&start)?;
372 let stop = cast(stop, &DataType::Date32)?;
373 let stop = as_date32_array(&stop)?;
374
375 let values_builder = Date32Builder::new();
377 let mut list_builder = ListBuilder::new(values_builder);
378
379 for idx in 0..stop.len() {
380 if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
381 list_builder.append_null();
382 continue;
383 }
384
385 let start = start.value(idx);
386 let stop = stop.value(idx);
387 let step = step.value(idx);
388
389 let (months, days, _) = IntervalMonthDayNanoType::to_parts(step);
390 if months == 0 && days == 0 {
391 return exec_err!("Cannot generate date range less than 1 day.");
392 }
393
394 let stop = if !self.include_upper_bound {
395 Date32Type::subtract_month_day_nano(stop, step)
396 } else {
397 stop
398 };
399
400 let neg = months < 0 || days < 0;
401 let mut new_date = start;
402
403 let values = from_fn(|| {
404 if (neg && new_date < stop) || (!neg && new_date > stop) {
405 None
406 } else {
407 let current_date = new_date;
408 new_date = Date32Type::add_month_day_nano(new_date, step);
409 Some(Some(current_date))
410 }
411 });
412
413 list_builder.append_value(values);
414 }
415
416 let arr = Arc::new(list_builder.finish());
417
418 Ok(arr)
419 }
420
421 fn gen_range_timestamp(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
422 let [start, stop, step] = take_function_args(self.name(), args)?;
423 let step = as_interval_mdn_array(step)?;
424
425 fn cast_to_ns(arr: &ArrayRef) -> Result<ArrayRef> {
428 match arr.data_type() {
429 DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok(Arc::clone(arr)),
430 DataType::Timestamp(_, tz) => Ok(cast(
431 arr,
432 &DataType::Timestamp(TimeUnit::Nanosecond, tz.to_owned()),
433 )?),
434 _ => unreachable!(),
435 }
436 }
437 let start = cast_to_ns(start)?;
438 let start = as_timestamp_nanosecond_array(&start)?;
439 let stop = cast_to_ns(stop)?;
440 let stop = as_timestamp_nanosecond_array(&stop)?;
441
442 let start_tz = parse_tz(&start.timezone())?;
443 let stop_tz = parse_tz(&stop.timezone())?;
444
445 let values_builder = start
447 .timezone()
448 .map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
449 TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
450 });
451 let mut list_builder = ListBuilder::new(values_builder);
452
453 for idx in 0..start.len() {
454 if start.is_null(idx) || stop.is_null(idx) || step.is_null(idx) {
455 list_builder.append_null();
456 continue;
457 }
458
459 let start = start.value(idx);
460 let stop = stop.value(idx);
461 let step = step.value(idx);
462
463 let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
464 if months == 0 && days == 0 && ns == 0 {
465 return exec_err!("Interval argument to {} must not be 0", self.name());
466 }
467
468 let neg = TimestampNanosecondType::add_month_day_nano(start, step, start_tz)
469 .ok_or(exec_datafusion_err!(
470 "Cannot generate timestamp range where start + step overflows"
471 ))?
472 .cmp(&start)
473 == Ordering::Less;
474
475 let stop_dt =
476 as_datetime_with_timezone::<TimestampNanosecondType>(stop, stop_tz)
477 .ok_or(exec_datafusion_err!(
478 "Cannot generate timestamp for stop: {}: {:?}",
479 stop,
480 stop_tz
481 ))?;
482
483 let mut current = start;
484 let mut current_dt =
485 as_datetime_with_timezone::<TimestampNanosecondType>(current, start_tz)
486 .ok_or(exec_datafusion_err!(
487 "Cannot generate timestamp for start: {}: {:?}",
488 current,
489 start_tz
490 ))?;
491
492 let values = from_fn(|| {
493 let generate_series_should_end = self.include_upper_bound
494 && ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt));
495 let range_should_end = !self.include_upper_bound
496 && ((neg && current_dt <= stop_dt)
497 || (!neg && current_dt >= stop_dt));
498 if generate_series_should_end || range_should_end {
499 return None;
500 }
501
502 let prev_current = current;
503
504 if let Some(ts) =
505 TimestampNanosecondType::add_month_day_nano(current, step, start_tz)
506 {
507 current = ts;
508 current_dt = as_datetime_with_timezone::<TimestampNanosecondType>(
509 current, start_tz,
510 )?;
511
512 Some(Some(prev_current))
513 } else {
514 None
516 }
517 });
518
519 list_builder.append_value(values);
520 }
521
522 let arr = Arc::new(list_builder.finish());
523
524 Ok(arr)
525 }
526}
527
528fn retrieve_range_args(
531 start_array: Option<&Int64Array>,
532 stop: Option<i64>,
533 step_array: Option<&Int64Array>,
534 idx: usize,
535) -> Option<(i64, i64, i64)> {
536 let start =
538 start_array.map_or(Some(0), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
539 let stop = stop?;
540 let step =
542 step_array.map_or(Some(1), |arr| arr.is_valid(idx).then(|| arr.value(idx)))?;
543 Some((start, stop, step))
544}
545
546fn gen_range_iter(
548 start: i64,
549 stop: i64,
550 decreasing: bool,
551 include_upper: bool,
552) -> Box<dyn Iterator<Item = i64>> {
553 match (decreasing, include_upper) {
554 (true, true) => Box::new((stop..=start).rev()),
556 (true, false) => {
558 if stop == i64::MAX {
559 Box::new(std::iter::empty())
562 } else {
563 Box::new((stop + 1..=start).rev())
566 }
567 }
568 (false, true) => Box::new(start..=stop),
570 (false, false) => Box::new(start..stop),
572 }
573}
574
575fn parse_tz(tz: &Option<&str>) -> Result<Tz> {
576 let tz = tz.as_ref().map_or_else(|| "+00", |s| s);
577
578 Tz::from_str(tz)
579 .map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
580}