1use crate::arrow::util::pretty::pretty_format_batches_with_options;
21use arrow::array::{ArrayRef, RecordBatch};
22use arrow::error::ArrowError;
23use std::fmt::Display;
24use std::{error::Error, path::PathBuf};
25
26pub trait IntoArrayRef {
28 fn into_array_ref(self) -> ArrayRef;
29}
30
31pub fn format_batches(results: &[RecordBatch]) -> Result<impl Display, ArrowError> {
32 let datafusion_format_options = crate::config::FormatOptions::default();
33
34 let arrow_format_options: arrow::util::display::FormatOptions =
35 (&datafusion_format_options).try_into().unwrap();
36
37 pretty_format_batches_with_options(results, &arrow_format_options)
38}
39
40#[macro_export]
72macro_rules! assert_batches_eq {
73 ($EXPECTED_LINES: expr, $CHUNKS: expr) => {
74 let expected_lines: Vec<String> =
75 $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
76
77 let formatted = $crate::test_util::format_batches($CHUNKS)
78 .unwrap()
79 .to_string();
80
81 let actual_lines: Vec<&str> = formatted.trim().lines().collect();
82
83 assert_eq!(
84 expected_lines, actual_lines,
85 "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
86 expected_lines, actual_lines
87 );
88 };
89}
90
91pub fn batches_to_string(batches: &[RecordBatch]) -> String {
92 let actual = format_batches(batches).unwrap().to_string();
93
94 actual.trim().to_string()
95}
96
97pub fn batches_to_sort_string(batches: &[RecordBatch]) -> String {
98 let actual_lines = format_batches(batches).unwrap().to_string();
99
100 let mut actual_lines: Vec<&str> = actual_lines.trim().lines().collect();
101
102 let num_lines = actual_lines.len();
104 if num_lines > 3 {
105 actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
106 }
107
108 actual_lines.join("\n")
109}
110
111#[macro_export]
121macro_rules! assert_batches_sorted_eq {
122 ($EXPECTED_LINES: expr, $CHUNKS: expr) => {
123 let mut expected_lines: Vec<String> =
124 $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
125
126 let num_lines = expected_lines.len();
128 if num_lines > 3 {
129 expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
130 }
131
132 let formatted = $crate::test_util::format_batches($CHUNKS)
133 .unwrap()
134 .to_string();
135 let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
138
139 let num_lines = actual_lines.len();
141 if num_lines > 3 {
142 actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
143 }
144
145 assert_eq!(
146 expected_lines, actual_lines,
147 "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
148 expected_lines, actual_lines
149 );
150 };
151}
152
153#[macro_export]
163macro_rules! assert_contains {
164 ($ACTUAL: expr, $EXPECTED: expr) => {
165 let actual_value: String = $ACTUAL.into();
166 let expected_value: String = $EXPECTED.into();
167 assert!(
168 actual_value.contains(&expected_value),
169 "Can not find expected in actual.\n\nExpected:\n{}\n\nActual:\n{}",
170 expected_value,
171 actual_value
172 );
173 };
174}
175
176#[macro_export]
186macro_rules! assert_not_contains {
187 ($ACTUAL: expr, $UNEXPECTED: expr) => {
188 let actual_value: String = $ACTUAL.into();
189 let unexpected_value: String = $UNEXPECTED.into();
190 assert!(
191 !actual_value.contains(&unexpected_value),
192 "Found unexpected in actual.\n\nUnexpected:\n{}\n\nActual:\n{}",
193 unexpected_value,
194 actual_value
195 );
196 };
197}
198
199pub fn datafusion_test_data() -> String {
213 match get_data_dir("DATAFUSION_TEST_DATA", "../../datafusion/core/tests/data") {
214 Ok(pb) => pb.display().to_string(),
215 Err(err) => panic!("failed to get arrow data dir: {err}"),
216 }
217}
218
219pub fn arrow_test_data() -> String {
234 match get_data_dir("ARROW_TEST_DATA", "../../testing/data") {
235 Ok(pb) => pb.display().to_string(),
236 Err(err) => panic!("failed to get arrow data dir: {err}"),
237 }
238}
239
240#[cfg(feature = "parquet")]
256pub fn parquet_test_data() -> String {
257 match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") {
258 Ok(pb) => pb.display().to_string(),
259 Err(err) => panic!("failed to get parquet data dir: {err}"),
260 }
261}
262
263pub fn get_data_dir(
273 udf_env: &str,
274 submodule_data: &str,
275) -> Result<PathBuf, Box<dyn Error>> {
276 if let Ok(dir) = std::env::var(udf_env) {
278 let trimmed = dir.trim().to_string();
279 if !trimmed.is_empty() {
280 let pb = PathBuf::from(trimmed);
281 if pb.is_dir() {
282 return Ok(pb);
283 } else {
284 return Err(format!(
285 "the data dir `{}` defined by env {} not found",
286 pb.display(),
287 udf_env
288 )
289 .into());
290 }
291 }
292 }
293
294 let dir = env!("CARGO_MANIFEST_DIR");
300
301 let pb = PathBuf::from(dir).join(submodule_data);
302 if pb.is_dir() {
303 Ok(pb)
304 } else {
305 Err(format!(
306 "env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
307 HINT: try running `git submodule update --init`",
308 udf_env,
309 pb.display(),
310 ).into())
311 }
312}
313
314#[macro_export]
315macro_rules! create_array {
316 (Boolean, $values: expr) => {
317 std::sync::Arc::new(arrow::array::BooleanArray::from($values))
318 };
319 (Int8, $values: expr) => {
320 std::sync::Arc::new(arrow::array::Int8Array::from($values))
321 };
322 (Int16, $values: expr) => {
323 std::sync::Arc::new(arrow::array::Int16Array::from($values))
324 };
325 (Int32, $values: expr) => {
326 std::sync::Arc::new(arrow::array::Int32Array::from($values))
327 };
328 (Int64, $values: expr) => {
329 std::sync::Arc::new(arrow::array::Int64Array::from($values))
330 };
331 (UInt8, $values: expr) => {
332 std::sync::Arc::new(arrow::array::UInt8Array::from($values))
333 };
334 (UInt16, $values: expr) => {
335 std::sync::Arc::new(arrow::array::UInt16Array::from($values))
336 };
337 (UInt32, $values: expr) => {
338 std::sync::Arc::new(arrow::array::UInt32Array::from($values))
339 };
340 (UInt64, $values: expr) => {
341 std::sync::Arc::new(arrow::array::UInt64Array::from($values))
342 };
343 (Float16, $values: expr) => {
344 std::sync::Arc::new(arrow::array::Float16Array::from($values))
345 };
346 (Float32, $values: expr) => {
347 std::sync::Arc::new(arrow::array::Float32Array::from($values))
348 };
349 (Float64, $values: expr) => {
350 std::sync::Arc::new(arrow::array::Float64Array::from($values))
351 };
352 (Utf8, $values: expr) => {
353 std::sync::Arc::new(arrow::array::StringArray::from($values))
354 };
355}
356
357#[macro_export]
370macro_rules! record_batch {
371 ($(($name: expr, $type: ident, $values: expr)),*) => {
372 {
373 let schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
374 $(
375 arrow::datatypes::Field::new($name, arrow::datatypes::DataType::$type, true),
376 )*
377 ]));
378
379 let batch = arrow::array::RecordBatch::try_new(
380 schema,
381 vec![$(
382 $crate::create_array!($type, $values),
383 )*]
384 );
385
386 batch
387 }
388 }
389}
390
391pub mod array_conversion {
392 use arrow::array::ArrayRef;
393
394 use super::IntoArrayRef;
395
396 impl IntoArrayRef for Vec<bool> {
397 fn into_array_ref(self) -> ArrayRef {
398 create_array!(Boolean, self)
399 }
400 }
401
402 impl IntoArrayRef for Vec<Option<bool>> {
403 fn into_array_ref(self) -> ArrayRef {
404 create_array!(Boolean, self)
405 }
406 }
407
408 impl IntoArrayRef for &[bool] {
409 fn into_array_ref(self) -> ArrayRef {
410 create_array!(Boolean, self.to_vec())
411 }
412 }
413
414 impl IntoArrayRef for &[Option<bool>] {
415 fn into_array_ref(self) -> ArrayRef {
416 create_array!(Boolean, self.to_vec())
417 }
418 }
419
420 impl IntoArrayRef for Vec<i8> {
421 fn into_array_ref(self) -> ArrayRef {
422 create_array!(Int8, self)
423 }
424 }
425
426 impl IntoArrayRef for Vec<Option<i8>> {
427 fn into_array_ref(self) -> ArrayRef {
428 create_array!(Int8, self)
429 }
430 }
431
432 impl IntoArrayRef for &[i8] {
433 fn into_array_ref(self) -> ArrayRef {
434 create_array!(Int8, self.to_vec())
435 }
436 }
437
438 impl IntoArrayRef for &[Option<i8>] {
439 fn into_array_ref(self) -> ArrayRef {
440 create_array!(Int8, self.to_vec())
441 }
442 }
443
444 impl IntoArrayRef for Vec<i16> {
445 fn into_array_ref(self) -> ArrayRef {
446 create_array!(Int16, self)
447 }
448 }
449
450 impl IntoArrayRef for Vec<Option<i16>> {
451 fn into_array_ref(self) -> ArrayRef {
452 create_array!(Int16, self)
453 }
454 }
455
456 impl IntoArrayRef for &[i16] {
457 fn into_array_ref(self) -> ArrayRef {
458 create_array!(Int16, self.to_vec())
459 }
460 }
461
462 impl IntoArrayRef for &[Option<i16>] {
463 fn into_array_ref(self) -> ArrayRef {
464 create_array!(Int16, self.to_vec())
465 }
466 }
467
468 impl IntoArrayRef for Vec<i32> {
469 fn into_array_ref(self) -> ArrayRef {
470 create_array!(Int32, self)
471 }
472 }
473
474 impl IntoArrayRef for Vec<Option<i32>> {
475 fn into_array_ref(self) -> ArrayRef {
476 create_array!(Int32, self)
477 }
478 }
479
480 impl IntoArrayRef for &[i32] {
481 fn into_array_ref(self) -> ArrayRef {
482 create_array!(Int32, self.to_vec())
483 }
484 }
485
486 impl IntoArrayRef for &[Option<i32>] {
487 fn into_array_ref(self) -> ArrayRef {
488 create_array!(Int32, self.to_vec())
489 }
490 }
491
492 impl IntoArrayRef for Vec<i64> {
493 fn into_array_ref(self) -> ArrayRef {
494 create_array!(Int64, self)
495 }
496 }
497
498 impl IntoArrayRef for Vec<Option<i64>> {
499 fn into_array_ref(self) -> ArrayRef {
500 create_array!(Int64, self)
501 }
502 }
503
504 impl IntoArrayRef for &[i64] {
505 fn into_array_ref(self) -> ArrayRef {
506 create_array!(Int64, self.to_vec())
507 }
508 }
509
510 impl IntoArrayRef for &[Option<i64>] {
511 fn into_array_ref(self) -> ArrayRef {
512 create_array!(Int64, self.to_vec())
513 }
514 }
515
516 impl IntoArrayRef for Vec<u8> {
517 fn into_array_ref(self) -> ArrayRef {
518 create_array!(UInt8, self)
519 }
520 }
521
522 impl IntoArrayRef for Vec<Option<u8>> {
523 fn into_array_ref(self) -> ArrayRef {
524 create_array!(UInt8, self)
525 }
526 }
527
528 impl IntoArrayRef for &[u8] {
529 fn into_array_ref(self) -> ArrayRef {
530 create_array!(UInt8, self.to_vec())
531 }
532 }
533
534 impl IntoArrayRef for &[Option<u8>] {
535 fn into_array_ref(self) -> ArrayRef {
536 create_array!(UInt8, self.to_vec())
537 }
538 }
539
540 impl IntoArrayRef for Vec<u16> {
541 fn into_array_ref(self) -> ArrayRef {
542 create_array!(UInt16, self)
543 }
544 }
545
546 impl IntoArrayRef for Vec<Option<u16>> {
547 fn into_array_ref(self) -> ArrayRef {
548 create_array!(UInt16, self)
549 }
550 }
551
552 impl IntoArrayRef for &[u16] {
553 fn into_array_ref(self) -> ArrayRef {
554 create_array!(UInt16, self.to_vec())
555 }
556 }
557
558 impl IntoArrayRef for &[Option<u16>] {
559 fn into_array_ref(self) -> ArrayRef {
560 create_array!(UInt16, self.to_vec())
561 }
562 }
563
564 impl IntoArrayRef for Vec<u32> {
565 fn into_array_ref(self) -> ArrayRef {
566 create_array!(UInt32, self)
567 }
568 }
569
570 impl IntoArrayRef for Vec<Option<u32>> {
571 fn into_array_ref(self) -> ArrayRef {
572 create_array!(UInt32, self)
573 }
574 }
575
576 impl IntoArrayRef for &[u32] {
577 fn into_array_ref(self) -> ArrayRef {
578 create_array!(UInt32, self.to_vec())
579 }
580 }
581
582 impl IntoArrayRef for &[Option<u32>] {
583 fn into_array_ref(self) -> ArrayRef {
584 create_array!(UInt32, self.to_vec())
585 }
586 }
587
588 impl IntoArrayRef for Vec<u64> {
589 fn into_array_ref(self) -> ArrayRef {
590 create_array!(UInt64, self)
591 }
592 }
593
594 impl IntoArrayRef for Vec<Option<u64>> {
595 fn into_array_ref(self) -> ArrayRef {
596 create_array!(UInt64, self)
597 }
598 }
599
600 impl IntoArrayRef for &[u64] {
601 fn into_array_ref(self) -> ArrayRef {
602 create_array!(UInt64, self.to_vec())
603 }
604 }
605
606 impl IntoArrayRef for &[Option<u64>] {
607 fn into_array_ref(self) -> ArrayRef {
608 create_array!(UInt64, self.to_vec())
609 }
610 }
611
612 impl IntoArrayRef for Vec<f32> {
615 fn into_array_ref(self) -> ArrayRef {
616 create_array!(Float32, self)
617 }
618 }
619
620 impl IntoArrayRef for Vec<Option<f32>> {
621 fn into_array_ref(self) -> ArrayRef {
622 create_array!(Float32, self)
623 }
624 }
625
626 impl IntoArrayRef for &[f32] {
627 fn into_array_ref(self) -> ArrayRef {
628 create_array!(Float32, self.to_vec())
629 }
630 }
631
632 impl IntoArrayRef for &[Option<f32>] {
633 fn into_array_ref(self) -> ArrayRef {
634 create_array!(Float32, self.to_vec())
635 }
636 }
637
638 impl IntoArrayRef for Vec<f64> {
639 fn into_array_ref(self) -> ArrayRef {
640 create_array!(Float64, self)
641 }
642 }
643
644 impl IntoArrayRef for Vec<Option<f64>> {
645 fn into_array_ref(self) -> ArrayRef {
646 create_array!(Float64, self)
647 }
648 }
649
650 impl IntoArrayRef for &[f64] {
651 fn into_array_ref(self) -> ArrayRef {
652 create_array!(Float64, self.to_vec())
653 }
654 }
655
656 impl IntoArrayRef for &[Option<f64>] {
657 fn into_array_ref(self) -> ArrayRef {
658 create_array!(Float64, self.to_vec())
659 }
660 }
661
662 impl IntoArrayRef for Vec<&str> {
663 fn into_array_ref(self) -> ArrayRef {
664 create_array!(Utf8, self)
665 }
666 }
667
668 impl IntoArrayRef for Vec<Option<&str>> {
669 fn into_array_ref(self) -> ArrayRef {
670 create_array!(Utf8, self)
671 }
672 }
673
674 impl IntoArrayRef for &[&str] {
675 fn into_array_ref(self) -> ArrayRef {
676 create_array!(Utf8, self.to_vec())
677 }
678 }
679
680 impl IntoArrayRef for &[Option<&str>] {
681 fn into_array_ref(self) -> ArrayRef {
682 create_array!(Utf8, self.to_vec())
683 }
684 }
685
686 impl IntoArrayRef for Vec<String> {
687 fn into_array_ref(self) -> ArrayRef {
688 create_array!(Utf8, self)
689 }
690 }
691
692 impl IntoArrayRef for Vec<Option<String>> {
693 fn into_array_ref(self) -> ArrayRef {
694 create_array!(Utf8, self)
695 }
696 }
697
698 impl IntoArrayRef for &[String] {
699 fn into_array_ref(self) -> ArrayRef {
700 create_array!(Utf8, self.to_vec())
701 }
702 }
703
704 impl IntoArrayRef for &[Option<String>] {
705 fn into_array_ref(self) -> ArrayRef {
706 create_array!(Utf8, self.to_vec())
707 }
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use crate::cast::{as_float64_array, as_int32_array, as_string_array};
714 use crate::error::Result;
715
716 use super::*;
717 use std::env;
718
719 #[test]
720 fn test_data_dir() {
721 let udf_env = "get_data_dir";
722 let cwd = env::current_dir().unwrap();
723
724 let existing_pb = cwd.join("..");
725 let existing = existing_pb.display().to_string();
726 let existing_str = existing.as_str();
727
728 let non_existing = cwd.join("non-existing-dir").display().to_string();
729 let non_existing_str = non_existing.as_str();
730
731 env::set_var(udf_env, non_existing_str);
732 let res = get_data_dir(udf_env, existing_str);
733 assert!(res.is_err());
734
735 env::set_var(udf_env, "");
736 let res = get_data_dir(udf_env, existing_str);
737 assert!(res.is_ok());
738 assert_eq!(res.unwrap(), existing_pb);
739
740 env::set_var(udf_env, " ");
741 let res = get_data_dir(udf_env, existing_str);
742 assert!(res.is_ok());
743 assert_eq!(res.unwrap(), existing_pb);
744
745 env::set_var(udf_env, existing_str);
746 let res = get_data_dir(udf_env, existing_str);
747 assert!(res.is_ok());
748 assert_eq!(res.unwrap(), existing_pb);
749
750 env::remove_var(udf_env);
751 let res = get_data_dir(udf_env, non_existing_str);
752 assert!(res.is_err());
753
754 let res = get_data_dir(udf_env, existing_str);
755 assert!(res.is_ok());
756 assert_eq!(res.unwrap(), existing_pb);
757 }
758
759 #[test]
760 #[cfg(feature = "parquet")]
761 fn test_happy() {
762 let res = arrow_test_data();
763 assert!(PathBuf::from(res).is_dir());
764
765 let res = parquet_test_data();
766 assert!(PathBuf::from(res).is_dir());
767 }
768
769 #[test]
770 fn test_create_record_batch() -> Result<()> {
771 use arrow::array::Array;
772
773 let batch = record_batch!(
774 ("a", Int32, vec![1, 2, 3, 4]),
775 ("b", Float64, vec![Some(4.0), None, Some(5.0), None]),
776 ("c", Utf8, vec!["alpha", "beta", "gamma", "delta"])
777 )?;
778
779 assert_eq!(3, batch.num_columns());
780 assert_eq!(4, batch.num_rows());
781
782 let values: Vec<_> = as_int32_array(batch.column(0))?
783 .values()
784 .iter()
785 .map(|v| v.to_owned())
786 .collect();
787 assert_eq!(values, vec![1, 2, 3, 4]);
788
789 let values: Vec<_> = as_float64_array(batch.column(1))?
790 .values()
791 .iter()
792 .map(|v| v.to_owned())
793 .collect();
794 assert_eq!(values, vec![4.0, 0.0, 5.0, 0.0]);
795
796 let nulls: Vec<_> = as_float64_array(batch.column(1))?
797 .nulls()
798 .unwrap()
799 .iter()
800 .collect();
801 assert_eq!(nulls, vec![true, false, true, false]);
802
803 let values: Vec<_> = as_string_array(batch.column(2))?.iter().flatten().collect();
804 assert_eq!(values, vec!["alpha", "beta", "gamma", "delta"]);
805
806 Ok(())
807 }
808}