1use std::fmt::Write;
19use std::sync::Arc;
20
21use DataType::{LargeUtf8, Utf8, Utf8View};
22use arrow::array::{
23 ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array,
24 OffsetSizeTrait, StringArrayType, StringViewArray,
25};
26use arrow::datatypes::DataType;
27
28use crate::utils::{make_scalar_function, utf8_to_str_type};
29use datafusion_common::cast::as_int64_array;
30use datafusion_common::{Result, exec_err};
31use datafusion_expr::TypeSignature::Exact;
32use datafusion_expr::{
33 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
34 Volatility,
35};
36use datafusion_macros::user_doc;
37
38#[user_doc(
39 doc_section(label = "String Functions"),
40 description = "Pads the right side of a string with another string to a specified string length.",
41 syntax_example = "rpad(str, n[, padding_str])",
42 sql_example = r#"```sql
43> select rpad('datafusion', 20, '_-');
44+-----------------------------------------------+
45| rpad(Utf8("datafusion"),Int64(20),Utf8("_-")) |
46+-----------------------------------------------+
47| datafusion_-_-_-_-_- |
48+-----------------------------------------------+
49```"#,
50 standard_argument(name = "str", prefix = "String"),
51 argument(
52 name = "n",
53 description = "String length to pad to. If the input string is longer than this length, it is truncated."
54 ),
55 argument(
56 name = "padding_str",
57 description = "String expression to pad with. Can be a constant, column, or function, and any combination of string operators. _Default is a space._"
58 ),
59 related_udf(name = "lpad")
60)]
61#[derive(Debug, PartialEq, Eq, Hash)]
62pub struct RPadFunc {
63 signature: Signature,
64}
65
66impl Default for RPadFunc {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72impl RPadFunc {
73 pub fn new() -> Self {
74 use DataType::*;
75 Self {
76 signature: Signature::one_of(
77 vec![
78 Exact(vec![Utf8View, Int64]),
79 Exact(vec![Utf8View, Int64, Utf8View]),
80 Exact(vec![Utf8View, Int64, Utf8]),
81 Exact(vec![Utf8View, Int64, LargeUtf8]),
82 Exact(vec![Utf8, Int64]),
83 Exact(vec![Utf8, Int64, Utf8View]),
84 Exact(vec![Utf8, Int64, Utf8]),
85 Exact(vec![Utf8, Int64, LargeUtf8]),
86 Exact(vec![LargeUtf8, Int64]),
87 Exact(vec![LargeUtf8, Int64, Utf8View]),
88 Exact(vec![LargeUtf8, Int64, Utf8]),
89 Exact(vec![LargeUtf8, Int64, LargeUtf8]),
90 ],
91 Volatility::Immutable,
92 ),
93 }
94 }
95}
96
97impl ScalarUDFImpl for RPadFunc {
98 fn name(&self) -> &str {
99 "rpad"
100 }
101
102 fn signature(&self) -> &Signature {
103 &self.signature
104 }
105
106 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
107 utf8_to_str_type(&arg_types[0], "rpad")
108 }
109
110 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
111 let ScalarFunctionArgs {
112 args, number_rows, ..
113 } = args;
114
115 const MAX_SCALAR_TARGET_LEN: usize = 16384;
116
117 if let Some(target_len) = try_as_scalar_i64(&args[1]) {
120 let target_len: usize = match usize::try_from(target_len) {
121 Ok(n) if n <= i32::MAX as usize => n,
122 Ok(n) => {
123 return exec_err!(
124 "rpad requested length {n} too large, maximum allowed length is {}",
125 i32::MAX
126 );
127 }
128 Err(_) => 0, };
130
131 let fill_str = if args.len() == 3 {
132 try_as_scalar_str(&args[2])
133 } else {
134 Some(" ")
135 };
136
137 if target_len <= MAX_SCALAR_TARGET_LEN
141 && let Some(fill) = fill_str
142 {
143 let string_array = args[0].to_array_of_size(number_rows)?;
144 let result = match string_array.data_type() {
145 Utf8View => rpad_scalar_args::<_, i32>(
146 string_array.as_string_view(),
147 target_len,
148 fill,
149 ),
150 Utf8 => rpad_scalar_args::<_, i32>(
151 string_array.as_string::<i32>(),
152 target_len,
153 fill,
154 ),
155 LargeUtf8 => rpad_scalar_args::<_, i64>(
156 string_array.as_string::<i64>(),
157 target_len,
158 fill,
159 ),
160 other => {
161 exec_err!("Unsupported data type {other:?} for function rpad")
162 }
163 }?;
164 return Ok(ColumnarValue::Array(result));
165 }
166 }
167
168 match args[0].data_type() {
169 Utf8 | Utf8View => make_scalar_function(rpad::<i32>, vec![])(&args),
170 LargeUtf8 => make_scalar_function(rpad::<i64>, vec![])(&args),
171 other => exec_err!("Unsupported data type {other:?} for function rpad"),
172 }
173 }
174
175 fn documentation(&self) -> Option<&Documentation> {
176 self.doc()
177 }
178}
179
180use super::common::{
181 StringCharLen, char_count_or_boundary, try_as_scalar_i64, try_as_scalar_str,
182};
183
184fn rpad_scalar_args<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
186 string_array: V,
187 target_len: usize,
188 fill: &str,
189) -> Result<ArrayRef> {
190 if string_array.is_ascii() && fill.is_ascii() {
191 rpad_scalar_ascii::<V, T>(string_array, target_len, fill)
192 } else {
193 rpad_scalar_unicode::<V, T>(string_array, target_len, fill)
194 }
195}
196
197fn rpad_scalar_ascii<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
198 string_array: V,
199 target_len: usize,
200 fill: &str,
201) -> Result<ArrayRef> {
202 let padding_buf = if !fill.is_empty() {
205 let mut buf = String::with_capacity(target_len);
206 while buf.len() < target_len {
207 let remaining = target_len - buf.len();
208 if remaining >= fill.len() {
209 buf.push_str(fill);
210 } else {
211 buf.push_str(&fill[..remaining]);
212 }
213 }
214 buf
215 } else {
216 String::new()
217 };
218
219 let data_capacity = string_array.len().saturating_mul(target_len);
221 let mut builder =
222 GenericStringBuilder::<T>::with_capacity(string_array.len(), data_capacity);
223
224 for maybe_string in string_array.iter() {
225 match maybe_string {
226 Some(string) => {
227 let str_len = string.len();
228 if target_len <= str_len {
229 builder.append_value(&string[..target_len]);
230 } else if fill.is_empty() {
231 builder.append_value(string);
232 } else {
233 let pad_needed = target_len - str_len;
234 builder.write_str(string)?;
235 builder.write_str(&padding_buf[..pad_needed])?;
236 builder.append_value("");
237 }
238 }
239 None => builder.append_null(),
240 }
241 }
242
243 Ok(Arc::new(builder.finish()) as ArrayRef)
244}
245
246fn rpad_scalar_unicode<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
247 string_array: V,
248 target_len: usize,
249 fill: &str,
250) -> Result<ArrayRef> {
251 let fill_chars: Vec<char> = fill.chars().collect();
252
253 let (padding_buf, char_byte_offsets) = if !fill_chars.is_empty() {
258 let mut buf = String::new();
259 let mut offsets = Vec::with_capacity(target_len + 1);
260 offsets.push(0usize);
261 for i in 0..target_len {
262 buf.push(fill_chars[i % fill_chars.len()]);
263 offsets.push(buf.len());
264 }
265 (buf, offsets)
266 } else {
267 (String::new(), vec![0])
268 };
269
270 let data_capacity = string_array.len().saturating_mul(target_len * 4);
273 let mut builder =
274 GenericStringBuilder::<T>::with_capacity(string_array.len(), data_capacity);
275
276 for maybe_string in string_array.iter() {
277 match maybe_string {
278 Some(string) => match char_count_or_boundary(string, target_len) {
279 StringCharLen::ByteOffset(offset) => {
280 builder.append_value(&string[..offset]);
281 }
282 StringCharLen::CharCount(char_count) => {
283 builder.write_str(string)?;
284 if !fill_chars.is_empty() {
285 let pad_chars = target_len - char_count;
286 let pad_bytes = char_byte_offsets[pad_chars];
287 builder.write_str(&padding_buf[..pad_bytes])?;
288 }
289 builder.append_value("");
290 }
291 },
292 None => builder.append_null(),
293 }
294 }
295
296 Ok(Arc::new(builder.finish()) as ArrayRef)
297}
298
299fn rpad<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
300 if args.len() <= 1 || args.len() > 3 {
301 return exec_err!(
302 "rpad was called with {} arguments. It requires at least 2 and at most 3.",
303 args.len()
304 );
305 }
306
307 let length_array = as_int64_array(&args[1])?;
308
309 match (args.len(), args[0].data_type()) {
310 (2, Utf8View) => rpad_impl::<&StringViewArray, &GenericStringArray<i32>, T>(
311 &args[0].as_string_view(),
312 length_array,
313 None,
314 ),
315 (2, Utf8 | LargeUtf8) => rpad_impl::<
316 &GenericStringArray<T>,
317 &GenericStringArray<T>,
318 T,
319 >(&args[0].as_string::<T>(), length_array, None),
320 (3, Utf8View) => rpad_with_replace::<&StringViewArray, T>(
321 &args[0].as_string_view(),
322 length_array,
323 &args[2],
324 ),
325 (3, Utf8 | LargeUtf8) => rpad_with_replace::<&GenericStringArray<T>, T>(
326 &args[0].as_string::<T>(),
327 length_array,
328 &args[2],
329 ),
330 (len, dt) => unreachable!("rpad: unexpected arg count ({len}) or type ({dt})"),
331 }
332}
333
334fn rpad_with_replace<'a, V, T: OffsetSizeTrait>(
335 string_array: &V,
336 length_array: &Int64Array,
337 fill_array: &'a ArrayRef,
338) -> Result<ArrayRef>
339where
340 V: StringArrayType<'a>,
341{
342 match fill_array.data_type() {
343 Utf8View => rpad_impl::<V, &StringViewArray, T>(
344 string_array,
345 length_array,
346 Some(fill_array.as_string_view()),
347 ),
348 LargeUtf8 => rpad_impl::<V, &GenericStringArray<i64>, T>(
349 string_array,
350 length_array,
351 Some(fill_array.as_string::<i64>()),
352 ),
353 Utf8 => rpad_impl::<V, &GenericStringArray<i32>, T>(
354 string_array,
355 length_array,
356 Some(fill_array.as_string::<i32>()),
357 ),
358 other => {
359 exec_err!("Unsupported data type {other:?} for function rpad")
360 }
361 }
362}
363
364fn rpad_impl<'a, V, V2, T>(
365 string_array: &V,
366 length_array: &Int64Array,
367 fill_array: Option<V2>,
368) -> Result<ArrayRef>
369where
370 V: StringArrayType<'a>,
371 V2: StringArrayType<'a>,
372 T: OffsetSizeTrait,
373{
374 let array = if let Some(fill_array) = fill_array {
375 let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new();
376 let mut fill_chars_buf = Vec::new();
377
378 for ((string, target_len), fill) in string_array
379 .iter()
380 .zip(length_array.iter())
381 .zip(fill_array.iter())
382 {
383 if let (Some(string), Some(target_len), Some(fill)) =
384 (string, target_len, fill)
385 {
386 if target_len > i32::MAX as i64 {
387 return exec_err!(
388 "rpad requested length {target_len} too large, maximum allowed length is {}",
389 i32::MAX
390 );
391 }
392
393 let target_len = if target_len < 0 {
394 0
395 } else {
396 target_len as usize
397 };
398 if target_len == 0 {
399 builder.append_value("");
400 continue;
401 }
402
403 if string.is_ascii() && fill.is_ascii() {
404 let str_len = string.len();
406 if target_len < str_len {
407 builder.append_value(&string[..target_len]);
408 } else if fill.is_empty() {
409 builder.append_value(string);
410 } else {
411 let pad_len = target_len - str_len;
412 let fill_len = fill.len();
413 let full_reps = pad_len / fill_len;
414 let remainder = pad_len % fill_len;
415 builder.write_str(string)?;
416 for _ in 0..full_reps {
417 builder.write_str(fill)?;
418 }
419 if remainder > 0 {
420 builder.write_str(&fill[..remainder])?;
421 }
422 builder.append_value("");
423 }
424 } else {
425 fill_chars_buf.clear();
426 fill_chars_buf.extend(fill.chars());
427
428 match char_count_or_boundary(string, target_len) {
429 StringCharLen::ByteOffset(offset) => {
430 builder.append_value(&string[..offset]);
431 }
432 StringCharLen::CharCount(char_count) => {
433 builder.write_str(string)?;
434 if !fill_chars_buf.is_empty() {
435 for l in 0..target_len - char_count {
436 let c = *fill_chars_buf
437 .get(l % fill_chars_buf.len())
438 .unwrap();
439 builder.write_char(c)?;
440 }
441 }
442 builder.append_value("");
443 }
444 }
445 }
446 } else {
447 builder.append_null();
448 }
449 }
450
451 builder.finish()
452 } else {
453 let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new();
454
455 for (string, target_len) in string_array.iter().zip(length_array.iter()) {
456 if let (Some(string), Some(target_len)) = (string, target_len) {
457 if target_len > i32::MAX as i64 {
458 return exec_err!(
459 "rpad requested length {target_len} too large, maximum allowed length is {}",
460 i32::MAX
461 );
462 }
463
464 let target_len = if target_len < 0 {
465 0
466 } else {
467 target_len as usize
468 };
469 if target_len == 0 {
470 builder.append_value("");
471 continue;
472 }
473
474 if string.is_ascii() {
475 let str_len = string.len();
477 if target_len < str_len {
478 builder.append_value(&string[..target_len]);
479 } else {
480 builder.write_str(string)?;
481 for _ in 0..(target_len - str_len) {
482 builder.write_str(" ")?;
483 }
484 builder.append_value("");
485 }
486 } else {
487 match char_count_or_boundary(string, target_len) {
488 StringCharLen::ByteOffset(offset) => {
489 builder.append_value(&string[..offset]);
490 }
491 StringCharLen::CharCount(char_count) => {
492 builder.write_str(string)?;
493 for _ in 0..(target_len - char_count) {
494 builder.write_str(" ")?;
495 }
496 builder.append_value("");
497 }
498 }
499 }
500 } else {
501 builder.append_null();
502 }
503 }
504
505 builder.finish()
506 };
507
508 Ok(Arc::new(array) as ArrayRef)
509}
510
511#[cfg(test)]
512mod tests {
513 use arrow::array::{Array, StringArray};
514 use arrow::datatypes::DataType::Utf8;
515
516 use datafusion_common::{Result, ScalarValue};
517 use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
518
519 use crate::unicode::rpad::RPadFunc;
520 use crate::utils::test::test_function;
521
522 #[test]
523 fn test_functions() -> Result<()> {
524 test_function!(
525 RPadFunc::new(),
526 vec![
527 ColumnarValue::Scalar(ScalarValue::from("josé")),
528 ColumnarValue::Scalar(ScalarValue::from(5i64)),
529 ],
530 Ok(Some("josé ")),
531 &str,
532 Utf8,
533 StringArray
534 );
535 test_function!(
536 RPadFunc::new(),
537 vec![
538 ColumnarValue::Scalar(ScalarValue::from("hi")),
539 ColumnarValue::Scalar(ScalarValue::from(5i64)),
540 ],
541 Ok(Some("hi ")),
542 &str,
543 Utf8,
544 StringArray
545 );
546 test_function!(
547 RPadFunc::new(),
548 vec![
549 ColumnarValue::Scalar(ScalarValue::from("hi")),
550 ColumnarValue::Scalar(ScalarValue::from(0i64)),
551 ],
552 Ok(Some("")),
553 &str,
554 Utf8,
555 StringArray
556 );
557 test_function!(
558 RPadFunc::new(),
559 vec![
560 ColumnarValue::Scalar(ScalarValue::from("hi")),
561 ColumnarValue::Scalar(ScalarValue::Int64(None)),
562 ],
563 Ok(None),
564 &str,
565 Utf8,
566 StringArray
567 );
568 test_function!(
569 RPadFunc::new(),
570 vec![
571 ColumnarValue::Scalar(ScalarValue::Utf8(None)),
572 ColumnarValue::Scalar(ScalarValue::from(5i64)),
573 ],
574 Ok(None),
575 &str,
576 Utf8,
577 StringArray
578 );
579 test_function!(
580 RPadFunc::new(),
581 vec![
582 ColumnarValue::Scalar(ScalarValue::from("hi")),
583 ColumnarValue::Scalar(ScalarValue::from(5i64)),
584 ColumnarValue::Scalar(ScalarValue::from("xy")),
585 ],
586 Ok(Some("hixyx")),
587 &str,
588 Utf8,
589 StringArray
590 );
591 test_function!(
592 RPadFunc::new(),
593 vec![
594 ColumnarValue::Scalar(ScalarValue::from("hi")),
595 ColumnarValue::Scalar(ScalarValue::from(21i64)),
596 ColumnarValue::Scalar(ScalarValue::from("abcdef")),
597 ],
598 Ok(Some("hiabcdefabcdefabcdefa")),
599 &str,
600 Utf8,
601 StringArray
602 );
603 test_function!(
604 RPadFunc::new(),
605 vec![
606 ColumnarValue::Scalar(ScalarValue::from("hi")),
607 ColumnarValue::Scalar(ScalarValue::from(5i64)),
608 ColumnarValue::Scalar(ScalarValue::from(" ")),
609 ],
610 Ok(Some("hi ")),
611 &str,
612 Utf8,
613 StringArray
614 );
615 test_function!(
616 RPadFunc::new(),
617 vec![
618 ColumnarValue::Scalar(ScalarValue::from("hi")),
619 ColumnarValue::Scalar(ScalarValue::from(5i64)),
620 ColumnarValue::Scalar(ScalarValue::from("")),
621 ],
622 Ok(Some("hi")),
623 &str,
624 Utf8,
625 StringArray
626 );
627 test_function!(
628 RPadFunc::new(),
629 vec![
630 ColumnarValue::Scalar(ScalarValue::Utf8(None)),
631 ColumnarValue::Scalar(ScalarValue::from(5i64)),
632 ColumnarValue::Scalar(ScalarValue::from("xy")),
633 ],
634 Ok(None),
635 &str,
636 Utf8,
637 StringArray
638 );
639 test_function!(
640 RPadFunc::new(),
641 vec![
642 ColumnarValue::Scalar(ScalarValue::from("hi")),
643 ColumnarValue::Scalar(ScalarValue::Int64(None)),
644 ColumnarValue::Scalar(ScalarValue::from("xy")),
645 ],
646 Ok(None),
647 &str,
648 Utf8,
649 StringArray
650 );
651 test_function!(
652 RPadFunc::new(),
653 vec![
654 ColumnarValue::Scalar(ScalarValue::from("hi")),
655 ColumnarValue::Scalar(ScalarValue::from(5i64)),
656 ColumnarValue::Scalar(ScalarValue::Utf8(None)),
657 ],
658 Ok(None),
659 &str,
660 Utf8,
661 StringArray
662 );
663 test_function!(
664 RPadFunc::new(),
665 vec![
666 ColumnarValue::Scalar(ScalarValue::from("hello")),
667 ColumnarValue::Scalar(ScalarValue::from(2i64)),
668 ],
669 Ok(Some("he")),
670 &str,
671 Utf8,
672 StringArray
673 );
674 test_function!(
675 RPadFunc::new(),
676 vec![
677 ColumnarValue::Scalar(ScalarValue::from("hi")),
678 ColumnarValue::Scalar(ScalarValue::from(6i64)),
679 ColumnarValue::Scalar(ScalarValue::from("xy")),
680 ],
681 Ok(Some("hixyxy")),
682 &str,
683 Utf8,
684 StringArray
685 );
686 test_function!(
687 RPadFunc::new(),
688 vec![
689 ColumnarValue::Scalar(ScalarValue::from("josé")),
690 ColumnarValue::Scalar(ScalarValue::from(10i64)),
691 ColumnarValue::Scalar(ScalarValue::from("xy")),
692 ],
693 Ok(Some("joséxyxyxy")),
694 &str,
695 Utf8,
696 StringArray
697 );
698 test_function!(
699 RPadFunc::new(),
700 vec![
701 ColumnarValue::Scalar(ScalarValue::from("josé")),
702 ColumnarValue::Scalar(ScalarValue::from(10i64)),
703 ColumnarValue::Scalar(ScalarValue::from("éñ")),
704 ],
705 Ok(Some("josééñéñéñ")),
706 &str,
707 Utf8,
708 StringArray
709 );
710 #[cfg(not(feature = "unicode_expressions"))]
711 test_function!(
712 RPadFunc::new(),
713 &[
714 ColumnarValue::Scalar(ScalarValue::from("josé")),
715 ColumnarValue::Scalar(ScalarValue::from(5i64)),
716 ],
717 internal_err!(
718 "function rpad requires compilation with feature flag: unicode_expressions."
719 ),
720 &str,
721 Utf8,
722 StringArray
723 );
724
725 Ok(())
726 }
727}