1use crate::arrow::util::pretty::pretty_format_batches_with_options;
21use arrow::array::{ArrayRef, RecordBatch};
22use arrow::error::ArrowError;
23use std::fmt::Display;
24use std::{error::Error, path::PathBuf};
25
26pub trait IntoArrayRef {
28 fn into_array_ref(self) -> ArrayRef;
29}
30
31pub fn format_batches(results: &[RecordBatch]) -> Result<impl Display, ArrowError> {
32 let datafusion_format_options = crate::config::FormatOptions::default();
33
34 let arrow_format_options: arrow::util::display::FormatOptions =
35 (&datafusion_format_options).try_into().unwrap();
36
37 pretty_format_batches_with_options(results, &arrow_format_options)
38}
39
40#[macro_export]
72macro_rules! assert_batches_eq {
73 ($EXPECTED_LINES: expr, $CHUNKS: expr) => {
74 let expected_lines: Vec<String> =
75 $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
76
77 let formatted = $crate::test_util::format_batches($CHUNKS)
78 .unwrap()
79 .to_string();
80
81 let actual_lines: Vec<&str> = formatted.trim().lines().collect();
82
83 assert_eq!(
84 expected_lines, actual_lines,
85 "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
86 expected_lines, actual_lines
87 );
88 };
89}
90
91pub fn batches_to_string(batches: &[RecordBatch]) -> String {
92 let actual = format_batches(batches).unwrap().to_string();
93
94 actual.trim().to_string()
95}
96
97pub fn batches_to_sort_string(batches: &[RecordBatch]) -> String {
98 let actual_lines = format_batches(batches).unwrap().to_string();
99
100 let mut actual_lines: Vec<&str> = actual_lines.trim().lines().collect();
101
102 let num_lines = actual_lines.len();
104 if num_lines > 3 {
105 actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
106 }
107
108 actual_lines.join("\n")
109}
110
111#[macro_export]
121macro_rules! assert_batches_sorted_eq {
122 ($EXPECTED_LINES: expr, $CHUNKS: expr) => {
123 let mut expected_lines: Vec<String> =
124 $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
125
126 let num_lines = expected_lines.len();
128 if num_lines > 3 {
129 expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
130 }
131
132 let formatted = $crate::test_util::format_batches($CHUNKS)
133 .unwrap()
134 .to_string();
135 let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
138
139 let num_lines = actual_lines.len();
141 if num_lines > 3 {
142 actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
143 }
144
145 assert_eq!(
146 expected_lines, actual_lines,
147 "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
148 expected_lines, actual_lines
149 );
150 };
151}
152
153#[macro_export]
163macro_rules! assert_contains {
164 ($ACTUAL: expr, $EXPECTED: expr) => {
165 let actual_value: String = $ACTUAL.into();
166 let expected_value: String = $EXPECTED.into();
167 assert!(
168 actual_value.contains(&expected_value),
169 "Can not find expected in actual.\n\nExpected:\n{}\n\nActual:\n{}",
170 expected_value,
171 actual_value
172 );
173 };
174}
175
176#[macro_export]
186macro_rules! assert_not_contains {
187 ($ACTUAL: expr, $UNEXPECTED: expr) => {
188 let actual_value: String = $ACTUAL.into();
189 let unexpected_value: String = $UNEXPECTED.into();
190 assert!(
191 !actual_value.contains(&unexpected_value),
192 "Found unexpected in actual.\n\nUnexpected:\n{}\n\nActual:\n{}",
193 unexpected_value,
194 actual_value
195 );
196 };
197}
198
199pub fn datafusion_test_data() -> String {
213 match get_data_dir("DATAFUSION_TEST_DATA", "../../datafusion/core/tests/data") {
214 Ok(pb) => pb.display().to_string(),
215 Err(err) => panic!("failed to get arrow data dir: {err}"),
216 }
217}
218
219pub fn arrow_test_data() -> String {
234 match get_data_dir("ARROW_TEST_DATA", "../../testing/data") {
235 Ok(pb) => pb.display().to_string(),
236 Err(err) => panic!("failed to get arrow data dir: {err}"),
237 }
238}
239
240#[cfg(feature = "parquet")]
256pub fn parquet_test_data() -> String {
257 match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") {
258 Ok(pb) => {
259 let mut path = pb.display().to_string();
260 if cfg!(target_os = "windows") {
261 path = path.replace("\\", "/");
263 }
264 path
265 }
266 Err(err) => panic!("failed to get parquet data dir: {err}"),
267 }
268}
269
270pub fn get_data_dir(
280 udf_env: &str,
281 submodule_data: &str,
282) -> Result<PathBuf, Box<dyn Error>> {
283 if let Ok(dir) = std::env::var(udf_env) {
285 let trimmed = dir.trim().to_string();
286 if !trimmed.is_empty() {
287 let pb = PathBuf::from(trimmed);
288 if pb.is_dir() {
289 return Ok(pb);
290 } else {
291 return Err(format!(
292 "the data dir `{}` defined by env {} not found",
293 pb.display(),
294 udf_env
295 )
296 .into());
297 }
298 }
299 }
300
301 let dir = env!("CARGO_MANIFEST_DIR");
307
308 let pb = PathBuf::from(dir).join(submodule_data);
309 if pb.is_dir() {
310 Ok(pb)
311 } else {
312 Err(format!(
313 "env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
314 HINT: try running `git submodule update --init`",
315 udf_env,
316 pb.display(),
317 ).into())
318 }
319}
320
321#[macro_export]
322macro_rules! create_array {
323 (Boolean, $values: expr) => {
324 std::sync::Arc::new($crate::arrow::array::BooleanArray::from($values))
325 };
326 (Int8, $values: expr) => {
327 std::sync::Arc::new($crate::arrow::array::Int8Array::from($values))
328 };
329 (Int16, $values: expr) => {
330 std::sync::Arc::new($crate::arrow::array::Int16Array::from($values))
331 };
332 (Int32, $values: expr) => {
333 std::sync::Arc::new($crate::arrow::array::Int32Array::from($values))
334 };
335 (Int64, $values: expr) => {
336 std::sync::Arc::new($crate::arrow::array::Int64Array::from($values))
337 };
338 (UInt8, $values: expr) => {
339 std::sync::Arc::new($crate::arrow::array::UInt8Array::from($values))
340 };
341 (UInt16, $values: expr) => {
342 std::sync::Arc::new($crate::arrow::array::UInt16Array::from($values))
343 };
344 (UInt32, $values: expr) => {
345 std::sync::Arc::new($crate::arrow::array::UInt32Array::from($values))
346 };
347 (UInt64, $values: expr) => {
348 std::sync::Arc::new($crate::arrow::array::UInt64Array::from($values))
349 };
350 (Float16, $values: expr) => {
351 std::sync::Arc::new($crate::arrow::array::Float16Array::from($values))
352 };
353 (Float32, $values: expr) => {
354 std::sync::Arc::new($crate::arrow::array::Float32Array::from($values))
355 };
356 (Float64, $values: expr) => {
357 std::sync::Arc::new($crate::arrow::array::Float64Array::from($values))
358 };
359 (Utf8, $values: expr) => {
360 std::sync::Arc::new($crate::arrow::array::StringArray::from($values))
361 };
362}
363
364#[macro_export]
377macro_rules! record_batch {
378 ($(($name: expr, $type: ident, $values: expr)),*) => {
379 {
380 let schema = std::sync::Arc::new($crate::arrow::datatypes::Schema::new(vec![
381 $(
382 $crate::arrow::datatypes::Field::new($name, $crate::arrow::datatypes::DataType::$type, true),
383 )*
384 ]));
385
386 let batch = $crate::arrow::array::RecordBatch::try_new(
387 schema,
388 vec![$(
389 $crate::create_array!($type, $values),
390 )*]
391 );
392
393 batch
394 }
395 }
396}
397
398pub mod array_conversion {
399 use arrow::array::ArrayRef;
400
401 use super::IntoArrayRef;
402
403 impl IntoArrayRef for Vec<bool> {
404 fn into_array_ref(self) -> ArrayRef {
405 create_array!(Boolean, self)
406 }
407 }
408
409 impl IntoArrayRef for Vec<Option<bool>> {
410 fn into_array_ref(self) -> ArrayRef {
411 create_array!(Boolean, self)
412 }
413 }
414
415 impl IntoArrayRef for &[bool] {
416 fn into_array_ref(self) -> ArrayRef {
417 create_array!(Boolean, self.to_vec())
418 }
419 }
420
421 impl IntoArrayRef for &[Option<bool>] {
422 fn into_array_ref(self) -> ArrayRef {
423 create_array!(Boolean, self.to_vec())
424 }
425 }
426
427 impl IntoArrayRef for Vec<i8> {
428 fn into_array_ref(self) -> ArrayRef {
429 create_array!(Int8, self)
430 }
431 }
432
433 impl IntoArrayRef for Vec<Option<i8>> {
434 fn into_array_ref(self) -> ArrayRef {
435 create_array!(Int8, self)
436 }
437 }
438
439 impl IntoArrayRef for &[i8] {
440 fn into_array_ref(self) -> ArrayRef {
441 create_array!(Int8, self.to_vec())
442 }
443 }
444
445 impl IntoArrayRef for &[Option<i8>] {
446 fn into_array_ref(self) -> ArrayRef {
447 create_array!(Int8, self.to_vec())
448 }
449 }
450
451 impl IntoArrayRef for Vec<i16> {
452 fn into_array_ref(self) -> ArrayRef {
453 create_array!(Int16, self)
454 }
455 }
456
457 impl IntoArrayRef for Vec<Option<i16>> {
458 fn into_array_ref(self) -> ArrayRef {
459 create_array!(Int16, self)
460 }
461 }
462
463 impl IntoArrayRef for &[i16] {
464 fn into_array_ref(self) -> ArrayRef {
465 create_array!(Int16, self.to_vec())
466 }
467 }
468
469 impl IntoArrayRef for &[Option<i16>] {
470 fn into_array_ref(self) -> ArrayRef {
471 create_array!(Int16, self.to_vec())
472 }
473 }
474
475 impl IntoArrayRef for Vec<i32> {
476 fn into_array_ref(self) -> ArrayRef {
477 create_array!(Int32, self)
478 }
479 }
480
481 impl IntoArrayRef for Vec<Option<i32>> {
482 fn into_array_ref(self) -> ArrayRef {
483 create_array!(Int32, self)
484 }
485 }
486
487 impl IntoArrayRef for &[i32] {
488 fn into_array_ref(self) -> ArrayRef {
489 create_array!(Int32, self.to_vec())
490 }
491 }
492
493 impl IntoArrayRef for &[Option<i32>] {
494 fn into_array_ref(self) -> ArrayRef {
495 create_array!(Int32, self.to_vec())
496 }
497 }
498
499 impl IntoArrayRef for Vec<i64> {
500 fn into_array_ref(self) -> ArrayRef {
501 create_array!(Int64, self)
502 }
503 }
504
505 impl IntoArrayRef for Vec<Option<i64>> {
506 fn into_array_ref(self) -> ArrayRef {
507 create_array!(Int64, self)
508 }
509 }
510
511 impl IntoArrayRef for &[i64] {
512 fn into_array_ref(self) -> ArrayRef {
513 create_array!(Int64, self.to_vec())
514 }
515 }
516
517 impl IntoArrayRef for &[Option<i64>] {
518 fn into_array_ref(self) -> ArrayRef {
519 create_array!(Int64, self.to_vec())
520 }
521 }
522
523 impl IntoArrayRef for Vec<u8> {
524 fn into_array_ref(self) -> ArrayRef {
525 create_array!(UInt8, self)
526 }
527 }
528
529 impl IntoArrayRef for Vec<Option<u8>> {
530 fn into_array_ref(self) -> ArrayRef {
531 create_array!(UInt8, self)
532 }
533 }
534
535 impl IntoArrayRef for &[u8] {
536 fn into_array_ref(self) -> ArrayRef {
537 create_array!(UInt8, self.to_vec())
538 }
539 }
540
541 impl IntoArrayRef for &[Option<u8>] {
542 fn into_array_ref(self) -> ArrayRef {
543 create_array!(UInt8, self.to_vec())
544 }
545 }
546
547 impl IntoArrayRef for Vec<u16> {
548 fn into_array_ref(self) -> ArrayRef {
549 create_array!(UInt16, self)
550 }
551 }
552
553 impl IntoArrayRef for Vec<Option<u16>> {
554 fn into_array_ref(self) -> ArrayRef {
555 create_array!(UInt16, self)
556 }
557 }
558
559 impl IntoArrayRef for &[u16] {
560 fn into_array_ref(self) -> ArrayRef {
561 create_array!(UInt16, self.to_vec())
562 }
563 }
564
565 impl IntoArrayRef for &[Option<u16>] {
566 fn into_array_ref(self) -> ArrayRef {
567 create_array!(UInt16, self.to_vec())
568 }
569 }
570
571 impl IntoArrayRef for Vec<u32> {
572 fn into_array_ref(self) -> ArrayRef {
573 create_array!(UInt32, self)
574 }
575 }
576
577 impl IntoArrayRef for Vec<Option<u32>> {
578 fn into_array_ref(self) -> ArrayRef {
579 create_array!(UInt32, self)
580 }
581 }
582
583 impl IntoArrayRef for &[u32] {
584 fn into_array_ref(self) -> ArrayRef {
585 create_array!(UInt32, self.to_vec())
586 }
587 }
588
589 impl IntoArrayRef for &[Option<u32>] {
590 fn into_array_ref(self) -> ArrayRef {
591 create_array!(UInt32, self.to_vec())
592 }
593 }
594
595 impl IntoArrayRef for Vec<u64> {
596 fn into_array_ref(self) -> ArrayRef {
597 create_array!(UInt64, self)
598 }
599 }
600
601 impl IntoArrayRef for Vec<Option<u64>> {
602 fn into_array_ref(self) -> ArrayRef {
603 create_array!(UInt64, self)
604 }
605 }
606
607 impl IntoArrayRef for &[u64] {
608 fn into_array_ref(self) -> ArrayRef {
609 create_array!(UInt64, self.to_vec())
610 }
611 }
612
613 impl IntoArrayRef for &[Option<u64>] {
614 fn into_array_ref(self) -> ArrayRef {
615 create_array!(UInt64, self.to_vec())
616 }
617 }
618
619 impl IntoArrayRef for Vec<f32> {
622 fn into_array_ref(self) -> ArrayRef {
623 create_array!(Float32, self)
624 }
625 }
626
627 impl IntoArrayRef for Vec<Option<f32>> {
628 fn into_array_ref(self) -> ArrayRef {
629 create_array!(Float32, self)
630 }
631 }
632
633 impl IntoArrayRef for &[f32] {
634 fn into_array_ref(self) -> ArrayRef {
635 create_array!(Float32, self.to_vec())
636 }
637 }
638
639 impl IntoArrayRef for &[Option<f32>] {
640 fn into_array_ref(self) -> ArrayRef {
641 create_array!(Float32, self.to_vec())
642 }
643 }
644
645 impl IntoArrayRef for Vec<f64> {
646 fn into_array_ref(self) -> ArrayRef {
647 create_array!(Float64, self)
648 }
649 }
650
651 impl IntoArrayRef for Vec<Option<f64>> {
652 fn into_array_ref(self) -> ArrayRef {
653 create_array!(Float64, self)
654 }
655 }
656
657 impl IntoArrayRef for &[f64] {
658 fn into_array_ref(self) -> ArrayRef {
659 create_array!(Float64, self.to_vec())
660 }
661 }
662
663 impl IntoArrayRef for &[Option<f64>] {
664 fn into_array_ref(self) -> ArrayRef {
665 create_array!(Float64, self.to_vec())
666 }
667 }
668
669 impl IntoArrayRef for Vec<&str> {
670 fn into_array_ref(self) -> ArrayRef {
671 create_array!(Utf8, self)
672 }
673 }
674
675 impl IntoArrayRef for Vec<Option<&str>> {
676 fn into_array_ref(self) -> ArrayRef {
677 create_array!(Utf8, self)
678 }
679 }
680
681 impl IntoArrayRef for &[&str] {
682 fn into_array_ref(self) -> ArrayRef {
683 create_array!(Utf8, self.to_vec())
684 }
685 }
686
687 impl IntoArrayRef for &[Option<&str>] {
688 fn into_array_ref(self) -> ArrayRef {
689 create_array!(Utf8, self.to_vec())
690 }
691 }
692
693 impl IntoArrayRef for Vec<String> {
694 fn into_array_ref(self) -> ArrayRef {
695 create_array!(Utf8, self)
696 }
697 }
698
699 impl IntoArrayRef for Vec<Option<String>> {
700 fn into_array_ref(self) -> ArrayRef {
701 create_array!(Utf8, self)
702 }
703 }
704
705 impl IntoArrayRef for &[String] {
706 fn into_array_ref(self) -> ArrayRef {
707 create_array!(Utf8, self.to_vec())
708 }
709 }
710
711 impl IntoArrayRef for &[Option<String>] {
712 fn into_array_ref(self) -> ArrayRef {
713 create_array!(Utf8, self.to_vec())
714 }
715 }
716}
717
718#[cfg(test)]
719mod tests {
720 use crate::cast::{as_float64_array, as_int32_array, as_string_array};
721 use crate::error::Result;
722
723 use super::*;
724 use std::env;
725
726 #[test]
727 fn test_data_dir() {
728 let udf_env = "get_data_dir";
729 let cwd = env::current_dir().unwrap();
730
731 let existing_pb = cwd.join("..");
732 let existing = existing_pb.display().to_string();
733 let existing_str = existing.as_str();
734
735 let non_existing = cwd.join("non-existing-dir").display().to_string();
736 let non_existing_str = non_existing.as_str();
737
738 env::set_var(udf_env, non_existing_str);
739 let res = get_data_dir(udf_env, existing_str);
740 assert!(res.is_err());
741
742 env::set_var(udf_env, "");
743 let res = get_data_dir(udf_env, existing_str);
744 assert!(res.is_ok());
745 assert_eq!(res.unwrap(), existing_pb);
746
747 env::set_var(udf_env, " ");
748 let res = get_data_dir(udf_env, existing_str);
749 assert!(res.is_ok());
750 assert_eq!(res.unwrap(), existing_pb);
751
752 env::set_var(udf_env, existing_str);
753 let res = get_data_dir(udf_env, existing_str);
754 assert!(res.is_ok());
755 assert_eq!(res.unwrap(), existing_pb);
756
757 env::remove_var(udf_env);
758 let res = get_data_dir(udf_env, non_existing_str);
759 assert!(res.is_err());
760
761 let res = get_data_dir(udf_env, existing_str);
762 assert!(res.is_ok());
763 assert_eq!(res.unwrap(), existing_pb);
764 }
765
766 #[test]
767 #[cfg(feature = "parquet")]
768 fn test_happy() {
769 let res = arrow_test_data();
770 assert!(PathBuf::from(res).is_dir());
771
772 let res = parquet_test_data();
773 assert!(PathBuf::from(res).is_dir());
774 }
775
776 #[test]
777 fn test_create_record_batch() -> Result<()> {
778 use arrow::array::Array;
779
780 let batch = record_batch!(
781 ("a", Int32, vec![1, 2, 3, 4]),
782 ("b", Float64, vec![Some(4.0), None, Some(5.0), None]),
783 ("c", Utf8, vec!["alpha", "beta", "gamma", "delta"])
784 )?;
785
786 assert_eq!(3, batch.num_columns());
787 assert_eq!(4, batch.num_rows());
788
789 let values: Vec<_> = as_int32_array(batch.column(0))?
790 .values()
791 .iter()
792 .map(|v| v.to_owned())
793 .collect();
794 assert_eq!(values, vec![1, 2, 3, 4]);
795
796 let values: Vec<_> = as_float64_array(batch.column(1))?
797 .values()
798 .iter()
799 .map(|v| v.to_owned())
800 .collect();
801 assert_eq!(values, vec![4.0, 0.0, 5.0, 0.0]);
802
803 let nulls: Vec<_> = as_float64_array(batch.column(1))?
804 .nulls()
805 .unwrap()
806 .iter()
807 .collect();
808 assert_eq!(nulls, vec![true, false, true, false]);
809
810 let values: Vec<_> = as_string_array(batch.column(2))?.iter().flatten().collect();
811 assert_eq!(values, vec!["alpha", "beta", "gamma", "delta"]);
812
813 Ok(())
814 }
815}