1use std::collections::HashMap;
21use std::{any::Any, str::FromStr, sync::Arc};
22
23use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
24use super::{ListingTableUrl, PartitionedFile};
25
26use crate::datasource::{
27 create_ordering,
28 file_format::{
29 file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
30 },
31 get_statistics_with_limit,
32 physical_plan::FileSinkConfig,
33};
34use crate::execution::context::SessionState;
35use datafusion_catalog::TableProvider;
36use datafusion_common::{config_err, DataFusionError, Result};
37use datafusion_datasource::file_scan_config::FileScanConfig;
38use datafusion_expr::dml::InsertOp;
39use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
40use datafusion_expr::{SortExpr, TableType};
41use datafusion_physical_plan::empty::EmptyExec;
42use datafusion_physical_plan::{ExecutionPlan, Statistics};
43
44use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
45use datafusion_common::{
46 config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
47 SchemaExt, ToDFSchema,
48};
49use datafusion_execution::cache::{
50 cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
51};
52use datafusion_physical_expr::{
53 create_physical_expr, LexOrdering, PhysicalSortRequirement,
54};
55
56use async_trait::async_trait;
57use datafusion_catalog::Session;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use futures::{future, stream, StreamExt, TryStreamExt};
60use itertools::Itertools;
61use object_store::ObjectStore;
62
63#[derive(Debug, Clone)]
65pub struct ListingTableConfig {
66 pub table_paths: Vec<ListingTableUrl>,
69 pub file_schema: Option<SchemaRef>,
71 pub options: Option<ListingOptions>,
73}
74
75impl ListingTableConfig {
76 pub fn new(table_path: ListingTableUrl) -> Self {
81 let table_paths = vec![table_path];
82 Self {
83 table_paths,
84 file_schema: None,
85 options: None,
86 }
87 }
88
89 pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
94 Self {
95 table_paths,
96 file_schema: None,
97 options: None,
98 }
99 }
100 pub fn with_schema(self, schema: SchemaRef) -> Self {
102 Self {
103 table_paths: self.table_paths,
104 file_schema: Some(schema),
105 options: self.options,
106 }
107 }
108
109 pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
111 Self {
112 table_paths: self.table_paths,
113 file_schema: self.file_schema,
114 options: Some(listing_options),
115 }
116 }
117
118 fn infer_file_extension_and_compression_type(
123 path: &str,
124 ) -> Result<(String, Option<String>)> {
125 let mut exts = path.rsplit('.');
126
127 let splitted = exts.next().unwrap_or("");
128
129 let file_compression_type = FileCompressionType::from_str(splitted)
130 .unwrap_or(FileCompressionType::UNCOMPRESSED);
131
132 if file_compression_type.is_compressed() {
133 let splitted2 = exts.next().unwrap_or("");
134 Ok((splitted2.to_string(), Some(splitted.to_string())))
135 } else {
136 Ok((splitted.to_string(), None))
137 }
138 }
139
140 pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
142 let store = if let Some(url) = self.table_paths.first() {
143 state.runtime_env().object_store(url)?
144 } else {
145 return Ok(self);
146 };
147
148 let file = self
149 .table_paths
150 .first()
151 .unwrap()
152 .list_all_files(state, store.as_ref(), "")
153 .await?
154 .next()
155 .await
156 .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
157
158 let (file_extension, maybe_compression_type) =
159 ListingTableConfig::infer_file_extension_and_compression_type(
160 file.location.as_ref(),
161 )?;
162
163 let mut format_options = HashMap::new();
164 if let Some(ref compression_type) = maybe_compression_type {
165 format_options
166 .insert("format.compression".to_string(), compression_type.clone());
167 }
168 let state = state.as_any().downcast_ref::<SessionState>().unwrap();
169 let file_format = state
170 .get_file_format_factory(&file_extension)
171 .ok_or(config_datafusion_err!(
172 "No file_format found with extension {file_extension}"
173 ))?
174 .create(state, &format_options)?;
175
176 let listing_file_extension =
177 if let Some(compression_type) = maybe_compression_type {
178 format!("{}.{}", &file_extension, &compression_type)
179 } else {
180 file_extension
181 };
182
183 let listing_options = ListingOptions::new(file_format)
184 .with_file_extension(listing_file_extension)
185 .with_target_partitions(state.config().target_partitions());
186
187 Ok(Self {
188 table_paths: self.table_paths,
189 file_schema: self.file_schema,
190 options: Some(listing_options),
191 })
192 }
193
194 pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
196 match self.options {
197 Some(options) => {
198 let schema = if let Some(url) = self.table_paths.first() {
199 options.infer_schema(state, url).await?
200 } else {
201 Arc::new(Schema::empty())
202 };
203
204 Ok(Self {
205 table_paths: self.table_paths,
206 file_schema: Some(schema),
207 options: Some(options),
208 })
209 }
210 None => internal_err!("No `ListingOptions` set for inferring schema"),
211 }
212 }
213
214 pub async fn infer(self, state: &dyn Session) -> Result<Self> {
216 self.infer_options(state).await?.infer_schema(state).await
217 }
218
219 pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result<Self> {
221 match self.options {
222 Some(options) => {
223 let Some(url) = self.table_paths.first() else {
224 return config_err!("No table path found");
225 };
226 let partitions = options
227 .infer_partitions(state, url)
228 .await?
229 .into_iter()
230 .map(|col_name| {
231 (
232 col_name,
233 DataType::Dictionary(
234 Box::new(DataType::UInt16),
235 Box::new(DataType::Utf8),
236 ),
237 )
238 })
239 .collect::<Vec<_>>();
240 let options = options.with_table_partition_cols(partitions);
241 Ok(Self {
242 table_paths: self.table_paths,
243 file_schema: self.file_schema,
244 options: Some(options),
245 })
246 }
247 None => config_err!("No `ListingOptions` set for inferring schema"),
248 }
249 }
250}
251
252#[derive(Clone, Debug)]
254pub struct ListingOptions {
255 pub file_extension: String,
258 pub format: Arc<dyn FileFormat>,
260 pub table_partition_cols: Vec<(String, DataType)>,
263 pub collect_stat: bool,
267 pub target_partitions: usize,
270 pub file_sort_order: Vec<Vec<SortExpr>>,
285}
286
287impl ListingOptions {
288 pub fn new(format: Arc<dyn FileFormat>) -> Self {
295 Self {
296 file_extension: format.get_ext(),
297 format,
298 table_partition_cols: vec![],
299 collect_stat: true,
300 target_partitions: 1,
301 file_sort_order: vec![],
302 }
303 }
304
305 pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
321 self.file_extension = file_extension.into();
322 self
323 }
324
325 pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
343 where
344 S: Into<String>,
345 {
346 if let Some(file_extension) = file_extension {
347 self.file_extension = file_extension.into();
348 }
349 self
350 }
351
352 pub fn with_table_partition_cols(
413 mut self,
414 table_partition_cols: Vec<(String, DataType)>,
415 ) -> Self {
416 self.table_partition_cols = table_partition_cols;
417 self
418 }
419
420 pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
434 self.collect_stat = collect_stat;
435 self
436 }
437
438 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
452 self.target_partitions = target_partitions;
453 self
454 }
455
456 pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
476 self.file_sort_order = file_sort_order;
477 self
478 }
479
480 pub async fn infer_schema<'a>(
487 &'a self,
488 state: &dyn Session,
489 table_path: &'a ListingTableUrl,
490 ) -> Result<SchemaRef> {
491 let store = state.runtime_env().object_store(table_path)?;
492
493 let files: Vec<_> = table_path
494 .list_all_files(state, store.as_ref(), &self.file_extension)
495 .await?
496 .try_filter(|object_meta| future::ready(object_meta.size > 0))
498 .try_collect()
499 .await?;
500
501 let schema = self.format.infer_schema(state, &store, &files).await?;
502
503 Ok(schema)
504 }
505
506 pub async fn validate_partitions(
512 &self,
513 state: &dyn Session,
514 table_path: &ListingTableUrl,
515 ) -> Result<()> {
516 if self.table_partition_cols.is_empty() {
517 return Ok(());
518 }
519
520 if !table_path.is_collection() {
521 return plan_err!(
522 "Can't create a partitioned table backed by a single file, \
523 perhaps the URL is missing a trailing slash?"
524 );
525 }
526
527 let inferred = self.infer_partitions(state, table_path).await?;
528
529 if inferred.is_empty() {
531 return Ok(());
532 }
533
534 let table_partition_names = self
535 .table_partition_cols
536 .iter()
537 .map(|(col_name, _)| col_name.clone())
538 .collect_vec();
539
540 if inferred.len() < table_partition_names.len() {
541 return plan_err!(
542 "Inferred partitions to be {:?}, but got {:?}",
543 inferred,
544 table_partition_names
545 );
546 }
547
548 for (idx, col) in table_partition_names.iter().enumerate() {
550 if &inferred[idx] != col {
551 return plan_err!(
552 "Inferred partitions to be {:?}, but got {:?}",
553 inferred,
554 table_partition_names
555 );
556 }
557 }
558
559 Ok(())
560 }
561
562 pub(crate) async fn infer_partitions(
566 &self,
567 state: &dyn Session,
568 table_path: &ListingTableUrl,
569 ) -> Result<Vec<String>> {
570 let store = state.runtime_env().object_store(table_path)?;
571
572 let files: Vec<_> = table_path
576 .list_all_files(state, store.as_ref(), &self.file_extension)
577 .await?
578 .take(10)
579 .try_collect()
580 .await?;
581
582 let stripped_path_parts = files.iter().map(|file| {
583 table_path
584 .strip_prefix(&file.location)
585 .unwrap()
586 .collect_vec()
587 });
588
589 let partition_keys = stripped_path_parts
590 .map(|path_parts| {
591 path_parts
592 .into_iter()
593 .rev()
594 .skip(1) .rev()
596 .map(|s| s.split('=').take(1).collect())
597 .collect_vec()
598 })
599 .collect_vec();
600
601 match partition_keys.into_iter().all_equal_value() {
602 Ok(v) => Ok(v),
603 Err(None) => Ok(vec![]),
604 Err(Some(diff)) => {
605 let mut sorted_diff = [diff.0, diff.1];
606 sorted_diff.sort();
607 plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
608 }
609 }
610 }
611}
612
613#[derive(Debug)]
715pub struct ListingTable {
716 table_paths: Vec<ListingTableUrl>,
717 file_schema: SchemaRef,
719 table_schema: SchemaRef,
721 options: ListingOptions,
722 definition: Option<String>,
723 collected_statistics: FileStatisticsCache,
724 constraints: Constraints,
725 column_defaults: HashMap<String, Expr>,
726}
727
728impl ListingTable {
729 pub fn try_new(config: ListingTableConfig) -> Result<Self> {
740 let file_schema = config
741 .file_schema
742 .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
743
744 let options = config.options.ok_or_else(|| {
745 DataFusionError::Internal("No ListingOptions provided".into())
746 })?;
747
748 let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
750 for (part_col_name, part_col_type) in &options.table_partition_cols {
751 builder.push(Field::new(part_col_name, part_col_type.clone(), false));
752 }
753
754 let table_schema = Arc::new(
755 builder
756 .finish()
757 .with_metadata(file_schema.metadata().clone()),
758 );
759
760 let table = Self {
761 table_paths: config.table_paths,
762 file_schema,
763 table_schema,
764 options,
765 definition: None,
766 collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
767 constraints: Constraints::empty(),
768 column_defaults: HashMap::new(),
769 };
770
771 Ok(table)
772 }
773
774 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
776 self.constraints = constraints;
777 self
778 }
779
780 pub fn with_column_defaults(
782 mut self,
783 column_defaults: HashMap<String, Expr>,
784 ) -> Self {
785 self.column_defaults = column_defaults;
786 self
787 }
788
789 pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
796 self.collected_statistics =
797 cache.unwrap_or(Arc::new(DefaultFileStatisticsCache::default()));
798 self
799 }
800
801 pub fn with_definition(mut self, definition: Option<String>) -> Self {
803 self.definition = definition;
804 self
805 }
806
807 pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
809 &self.table_paths
810 }
811
812 pub fn options(&self) -> &ListingOptions {
814 &self.options
815 }
816
817 fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
819 create_ordering(&self.table_schema, &self.options.file_sort_order)
820 }
821}
822
823fn can_be_evaluted_for_partition_pruning(
826 partition_column_names: &[&str],
827 expr: &Expr,
828) -> bool {
829 !partition_column_names.is_empty()
830 && expr_applicable_for_cols(partition_column_names, expr)
831}
832
833#[async_trait]
834impl TableProvider for ListingTable {
835 fn as_any(&self) -> &dyn Any {
836 self
837 }
838
839 fn schema(&self) -> SchemaRef {
840 Arc::clone(&self.table_schema)
841 }
842
843 fn constraints(&self) -> Option<&Constraints> {
844 Some(&self.constraints)
845 }
846
847 fn table_type(&self) -> TableType {
848 TableType::Base
849 }
850
851 async fn scan(
852 &self,
853 state: &dyn Session,
854 projection: Option<&Vec<usize>>,
855 filters: &[Expr],
856 limit: Option<usize>,
857 ) -> Result<Arc<dyn ExecutionPlan>> {
858 let table_partition_cols = self
860 .options
861 .table_partition_cols
862 .iter()
863 .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
864 .collect::<Result<Vec<_>>>()?;
865
866 let table_partition_col_names = table_partition_cols
867 .iter()
868 .map(|field| field.name().as_str())
869 .collect::<Vec<_>>();
870 let (partition_filters, filters): (Vec<_>, Vec<_>) =
873 filters.iter().cloned().partition(|filter| {
874 can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
875 });
876 let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
878
879 let statistic_file_limit = if filters.is_empty() { limit } else { None };
882
883 let (mut partitioned_file_lists, statistics) = self
884 .list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
885 .await?;
886
887 if partitioned_file_lists.is_empty() {
889 let projected_schema = project_schema(&self.schema(), projection)?;
890 return Ok(Arc::new(EmptyExec::new(projected_schema)));
891 }
892
893 let output_ordering = self.try_create_output_ordering()?;
894 match state
895 .config_options()
896 .execution
897 .split_file_groups_by_statistics
898 .then(|| {
899 output_ordering.first().map(|output_ordering| {
900 FileScanConfig::split_groups_by_statistics(
901 &self.table_schema,
902 &partitioned_file_lists,
903 output_ordering,
904 )
905 })
906 })
907 .flatten()
908 {
909 Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
910 Some(Ok(new_groups)) => {
911 if new_groups.len() <= self.options.target_partitions {
912 partitioned_file_lists = new_groups;
913 } else {
914 log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
915 }
916 }
917 None => {} };
919
920 let filters = match conjunction(filters.to_vec()) {
921 Some(expr) => {
922 let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
923 let filters = create_physical_expr(
924 &expr,
925 &table_df_schema,
926 state.execution_props(),
927 )?;
928 Some(filters)
929 }
930 None => None,
931 };
932
933 let Some(object_store_url) =
934 self.table_paths.first().map(ListingTableUrl::object_store)
935 else {
936 return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
937 };
938
939 self.options
941 .format
942 .create_physical_plan(
943 session_state,
944 FileScanConfig::new(
945 object_store_url,
946 Arc::clone(&self.file_schema),
947 self.options.format.file_source(),
948 )
949 .with_file_groups(partitioned_file_lists)
950 .with_constraints(self.constraints.clone())
951 .with_statistics(statistics)
952 .with_projection(projection.cloned())
953 .with_limit(limit)
954 .with_output_ordering(output_ordering)
955 .with_table_partition_cols(table_partition_cols),
956 filters.as_ref(),
957 )
958 .await
959 }
960
961 fn supports_filters_pushdown(
962 &self,
963 filters: &[&Expr],
964 ) -> Result<Vec<TableProviderFilterPushDown>> {
965 let partition_column_names = self
966 .options
967 .table_partition_cols
968 .iter()
969 .map(|col| col.0.as_str())
970 .collect::<Vec<_>>();
971 filters
972 .iter()
973 .map(|filter| {
974 if can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
975 {
976 return Ok(TableProviderFilterPushDown::Exact);
978 }
979
980 let supports_pushdown = self.options.format.supports_filters_pushdown(
983 &self.file_schema,
984 &self.table_schema,
985 &[filter],
986 )?;
987
988 if supports_pushdown == FilePushdownSupport::Supported {
989 return Ok(TableProviderFilterPushDown::Exact);
990 }
991
992 Ok(TableProviderFilterPushDown::Inexact)
993 })
994 .collect()
995 }
996
997 fn get_table_definition(&self) -> Option<&str> {
998 self.definition.as_deref()
999 }
1000
1001 async fn insert_into(
1002 &self,
1003 state: &dyn Session,
1004 input: Arc<dyn ExecutionPlan>,
1005 insert_op: InsertOp,
1006 ) -> Result<Arc<dyn ExecutionPlan>> {
1007 self.schema()
1009 .logically_equivalent_names_and_types(&input.schema())?;
1010
1011 let table_path = &self.table_paths()[0];
1012 if !table_path.is_collection() {
1013 return plan_err!(
1014 "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
1015 To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
1016 );
1017 }
1018
1019 let store = state.runtime_env().object_store(table_path)?;
1021
1022 let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
1024 let file_list_stream = pruned_partition_list(
1025 session_state,
1026 store.as_ref(),
1027 table_path,
1028 &[],
1029 &self.options.file_extension,
1030 &self.options.table_partition_cols,
1031 )
1032 .await?;
1033
1034 let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
1035 let keep_partition_by_columns =
1036 state.config_options().execution.keep_partition_by_columns;
1037
1038 let config = FileSinkConfig {
1040 object_store_url: self.table_paths()[0].object_store(),
1041 table_paths: self.table_paths().clone(),
1042 file_groups,
1043 output_schema: self.schema(),
1044 table_partition_cols: self.options.table_partition_cols.clone(),
1045 insert_op,
1046 keep_partition_by_columns,
1047 file_extension: self.options().format.get_ext(),
1048 };
1049
1050 let order_requirements = if !self.options().file_sort_order.is_empty() {
1051 let orderings = self.try_create_output_ordering()?;
1053 let Some(ordering) = orderings.first() else {
1054 return internal_err!(
1055 "Expected ListingTable to have a sort order, but none found!"
1056 );
1057 };
1058 Some(LexRequirement::new(
1060 ordering
1061 .into_iter()
1062 .cloned()
1063 .map(PhysicalSortRequirement::from)
1064 .collect::<Vec<_>>(),
1065 ))
1066 } else {
1067 None
1068 };
1069
1070 self.options()
1071 .format
1072 .create_writer_physical_plan(input, session_state, config, order_requirements)
1073 .await
1074 }
1075
1076 fn get_column_default(&self, column: &str) -> Option<&Expr> {
1077 self.column_defaults.get(column)
1078 }
1079}
1080
1081impl ListingTable {
1082 async fn list_files_for_scan<'a>(
1086 &'a self,
1087 ctx: &'a dyn Session,
1088 filters: &'a [Expr],
1089 limit: Option<usize>,
1090 ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
1091 let store = if let Some(url) = self.table_paths.first() {
1092 ctx.runtime_env().object_store(url)?
1093 } else {
1094 return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
1095 };
1096 let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
1098 pruned_partition_list(
1099 ctx,
1100 store.as_ref(),
1101 table_path,
1102 filters,
1103 &self.options.file_extension,
1104 &self.options.table_partition_cols,
1105 )
1106 }))
1107 .await?;
1108 let file_list = stream::iter(file_list).flatten();
1109 let files = file_list
1111 .map(|part_file| async {
1112 let part_file = part_file?;
1113 if self.options.collect_stat {
1114 let statistics =
1115 self.do_collect_statistics(ctx, &store, &part_file).await?;
1116 Ok((part_file, statistics))
1117 } else {
1118 Ok((
1119 part_file,
1120 Arc::new(Statistics::new_unknown(&self.file_schema)),
1121 ))
1122 }
1123 })
1124 .boxed()
1125 .buffered(ctx.config_options().execution.meta_fetch_concurrency);
1126
1127 let (files, statistics) = get_statistics_with_limit(
1128 files,
1129 self.schema(),
1130 limit,
1131 self.options.collect_stat,
1132 )
1133 .await?;
1134
1135 Ok((
1136 split_files(files, self.options.target_partitions),
1137 statistics,
1138 ))
1139 }
1140
1141 async fn do_collect_statistics(
1147 &self,
1148 ctx: &dyn Session,
1149 store: &Arc<dyn ObjectStore>,
1150 part_file: &PartitionedFile,
1151 ) -> Result<Arc<Statistics>> {
1152 match self
1153 .collected_statistics
1154 .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
1155 {
1156 Some(statistics) => Ok(statistics),
1157 None => {
1158 let statistics = self
1159 .options
1160 .format
1161 .infer_stats(
1162 ctx,
1163 store,
1164 Arc::clone(&self.file_schema),
1165 &part_file.object_meta,
1166 )
1167 .await?;
1168 let statistics = Arc::new(statistics);
1169 self.collected_statistics.put_with_extra(
1170 &part_file.object_meta.location,
1171 Arc::clone(&statistics),
1172 &part_file.object_meta,
1173 );
1174 Ok(statistics)
1175 }
1176 }
1177 }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182 use super::*;
1183 use crate::datasource::file_format::avro::AvroFormat;
1184 use crate::datasource::file_format::csv::CsvFormat;
1185 use crate::datasource::file_format::json::JsonFormat;
1186 #[cfg(feature = "parquet")]
1187 use crate::datasource::file_format::parquet::ParquetFormat;
1188 use crate::datasource::{provider_as_source, DefaultTableSource, MemTable};
1189 use crate::execution::options::ArrowReadOptions;
1190 use crate::prelude::*;
1191 use crate::{
1192 assert_batches_eq,
1193 test::{columns, object_store::register_test_store},
1194 };
1195 use datafusion_physical_plan::collect;
1196
1197 use arrow::compute::SortOptions;
1198 use arrow::record_batch::RecordBatch;
1199 use datafusion_common::stats::Precision;
1200 use datafusion_common::{assert_contains, ScalarValue};
1201 use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
1202 use datafusion_physical_expr::PhysicalSortExpr;
1203 use datafusion_physical_plan::ExecutionPlanProperties;
1204
1205 use tempfile::TempDir;
1206
1207 #[tokio::test]
1208 async fn read_single_file() -> Result<()> {
1209 let ctx = SessionContext::new();
1210
1211 let table = load_table(&ctx, "alltypes_plain.parquet").await?;
1212 let projection = None;
1213 let exec = table
1214 .scan(&ctx.state(), projection, &[], None)
1215 .await
1216 .expect("Scan table");
1217
1218 assert_eq!(exec.children().len(), 0);
1219 assert_eq!(exec.output_partitioning().partition_count(), 1);
1220
1221 assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1223 assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1224
1225 Ok(())
1226 }
1227
1228 #[cfg(feature = "parquet")]
1229 #[tokio::test]
1230 async fn load_table_stats_by_default() -> Result<()> {
1231 use crate::datasource::file_format::parquet::ParquetFormat;
1232
1233 let testdata = crate::test_util::parquet_test_data();
1234 let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1235 let table_path = ListingTableUrl::parse(filename).unwrap();
1236
1237 let ctx = SessionContext::new();
1238 let state = ctx.state();
1239
1240 let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
1241 let schema = opt.infer_schema(&state, &table_path).await?;
1242 let config = ListingTableConfig::new(table_path)
1243 .with_listing_options(opt)
1244 .with_schema(schema);
1245 let table = ListingTable::try_new(config)?;
1246
1247 let exec = table.scan(&state, None, &[], None).await?;
1248 assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1249 assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1251
1252 Ok(())
1253 }
1254
1255 #[cfg(feature = "parquet")]
1256 #[tokio::test]
1257 async fn load_table_stats_when_no_stats() -> Result<()> {
1258 use crate::datasource::file_format::parquet::ParquetFormat;
1259
1260 let testdata = crate::test_util::parquet_test_data();
1261 let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1262 let table_path = ListingTableUrl::parse(filename).unwrap();
1263
1264 let ctx = SessionContext::new();
1265 let state = ctx.state();
1266
1267 let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
1268 .with_collect_stat(false);
1269 let schema = opt.infer_schema(&state, &table_path).await?;
1270 let config = ListingTableConfig::new(table_path)
1271 .with_listing_options(opt)
1272 .with_schema(schema);
1273 let table = ListingTable::try_new(config)?;
1274
1275 let exec = table.scan(&state, None, &[], None).await?;
1276 assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
1277 assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
1278
1279 Ok(())
1280 }
1281
1282 #[cfg(feature = "parquet")]
1283 #[tokio::test]
1284 async fn test_try_create_output_ordering() {
1285 let testdata = crate::test_util::parquet_test_data();
1286 let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1287 let table_path = ListingTableUrl::parse(filename).unwrap();
1288
1289 let ctx = SessionContext::new();
1290 let state = ctx.state();
1291 let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
1292 let schema = options.infer_schema(&state, &table_path).await.unwrap();
1293
1294 use crate::datasource::file_format::parquet::ParquetFormat;
1295 use datafusion_physical_plan::expressions::col as physical_col;
1296 use std::ops::Add;
1297
1298 let cases = vec![
1300 (vec![], Ok(vec![])),
1301 (
1303 vec![vec![
1304 col("int_col").add(lit(1)).sort(true, true),
1305 ]],
1306 Err("Expected single column references in output_ordering, got int_col + Int32(1)"),
1307 ),
1308 (
1310 vec![vec![col("string_col").sort(true, false)]],
1311 Ok(vec![LexOrdering::new(
1312 vec![PhysicalSortExpr {
1313 expr: physical_col("string_col", &schema).unwrap(),
1314 options: SortOptions {
1315 descending: false,
1316 nulls_first: false,
1317 },
1318 }],
1319 )
1320 ])
1321 ),
1322 (
1324 vec![vec![
1325 col("string_col").sort(true, false),
1326 col("int_col").sort(false, true),
1327 ]],
1328 Ok(vec![LexOrdering::new(
1329 vec![
1330 PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
1331 .asc()
1332 .nulls_last(),
1333 PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
1334 .desc()
1335 .nulls_first()
1336 ],
1337 )
1338 ])
1339 ),
1340 ];
1341
1342 for (file_sort_order, expected_result) in cases {
1343 let options = options.clone().with_file_sort_order(file_sort_order);
1344
1345 let config = ListingTableConfig::new(table_path.clone())
1346 .with_listing_options(options)
1347 .with_schema(schema.clone());
1348
1349 let table =
1350 ListingTable::try_new(config.clone()).expect("Creating the table");
1351 let ordering_result = table.try_create_output_ordering();
1352
1353 match (expected_result, ordering_result) {
1354 (Ok(expected), Ok(result)) => {
1355 assert_eq!(expected, result);
1356 }
1357 (Err(expected), Err(result)) => {
1358 let result = result.to_string();
1360 let expected = expected.to_string();
1361 assert_contains!(result.to_string(), expected);
1362 }
1363 (expected_result, ordering_result) => {
1364 panic!(
1365 "expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
1366 );
1367 }
1368 }
1369 }
1370 }
1371
1372 #[tokio::test]
1373 async fn read_empty_table() -> Result<()> {
1374 let ctx = SessionContext::new();
1375 let path = String::from("table/p1=v1/file.avro");
1376 register_test_store(&ctx, &[(&path, 100)]);
1377
1378 let opt = ListingOptions::new(Arc::new(AvroFormat {}))
1379 .with_file_extension(AvroFormat.get_ext())
1380 .with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
1381 .with_target_partitions(4);
1382
1383 let table_path = ListingTableUrl::parse("test:///table/").unwrap();
1384 let file_schema =
1385 Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
1386 let config = ListingTableConfig::new(table_path)
1387 .with_listing_options(opt)
1388 .with_schema(file_schema);
1389 let table = ListingTable::try_new(config)?;
1390
1391 assert_eq!(
1392 columns(&table.schema()),
1393 vec!["a".to_owned(), "p1".to_owned()]
1394 );
1395
1396 let filter = Expr::not_eq(col("p1"), lit("v1"));
1398
1399 let scan = table
1400 .scan(&ctx.state(), None, &[filter], None)
1401 .await
1402 .expect("Empty execution plan");
1403
1404 assert!(scan.as_any().is::<EmptyExec>());
1405 assert_eq!(
1406 columns(&scan.schema()),
1407 vec!["a".to_owned(), "p1".to_owned()]
1408 );
1409
1410 Ok(())
1411 }
1412
1413 #[tokio::test]
1414 async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
1415 assert_list_files_for_scan_grouping(
1417 &[
1418 "bucket/key-prefix/file0",
1419 "bucket/key-prefix/file1",
1420 "bucket/key-prefix/file2",
1421 "bucket/key-prefix/file3",
1422 "bucket/key-prefix/file4",
1423 ],
1424 "test:///bucket/key-prefix/",
1425 12,
1426 5,
1427 Some(""),
1428 )
1429 .await?;
1430
1431 assert_list_files_for_scan_grouping(
1433 &[
1434 "bucket/key-prefix/file0",
1435 "bucket/key-prefix/file1",
1436 "bucket/key-prefix/file2",
1437 "bucket/key-prefix/file3",
1438 ],
1439 "test:///bucket/key-prefix/",
1440 4,
1441 4,
1442 Some(""),
1443 )
1444 .await?;
1445
1446 assert_list_files_for_scan_grouping(
1448 &[
1449 "bucket/key-prefix/file0",
1450 "bucket/key-prefix/file1",
1451 "bucket/key-prefix/file2",
1452 "bucket/key-prefix/file3",
1453 "bucket/key-prefix/file4",
1454 ],
1455 "test:///bucket/key-prefix/",
1456 2,
1457 2,
1458 Some(""),
1459 )
1460 .await?;
1461
1462 assert_list_files_for_scan_grouping(
1464 &[],
1465 "test:///bucket/key-prefix/",
1466 2,
1467 0,
1468 Some(""),
1469 )
1470 .await?;
1471
1472 assert_list_files_for_scan_grouping(
1474 &[
1475 "bucket/key-prefix/file0",
1476 "bucket/key-prefix/file1",
1477 "bucket/other-prefix/roguefile",
1478 ],
1479 "test:///bucket/key-prefix/",
1480 10,
1481 2,
1482 Some(""),
1483 )
1484 .await?;
1485
1486 assert_list_files_for_scan_grouping(
1488 &[
1489 "bucket/key-prefix/file0.avro",
1490 "bucket/key-prefix/file1.parquet",
1491 "bucket/other-prefix/roguefile.avro",
1492 ],
1493 "test:///bucket/key-prefix/",
1494 10,
1495 1,
1496 None,
1497 )
1498 .await?;
1499 Ok(())
1500 }
1501
1502 #[tokio::test]
1503 async fn test_assert_list_files_for_multi_path() -> Result<()> {
1504 assert_list_files_for_multi_paths(
1506 &[
1507 "bucket/key1/file0",
1508 "bucket/key1/file1",
1509 "bucket/key1/file2",
1510 "bucket/key2/file3",
1511 "bucket/key2/file4",
1512 "bucket/key3/file5",
1513 ],
1514 &["test:///bucket/key1/", "test:///bucket/key2/"],
1515 12,
1516 5,
1517 Some(""),
1518 )
1519 .await?;
1520
1521 assert_list_files_for_multi_paths(
1523 &[
1524 "bucket/key1/file0",
1525 "bucket/key1/file1",
1526 "bucket/key1/file2",
1527 "bucket/key2/file3",
1528 "bucket/key2/file4",
1529 "bucket/key3/file5",
1530 ],
1531 &["test:///bucket/key1/", "test:///bucket/key2/"],
1532 5,
1533 5,
1534 Some(""),
1535 )
1536 .await?;
1537
1538 assert_list_files_for_multi_paths(
1540 &[
1541 "bucket/key1/file0",
1542 "bucket/key1/file1",
1543 "bucket/key1/file2",
1544 "bucket/key2/file3",
1545 "bucket/key2/file4",
1546 "bucket/key3/file5",
1547 ],
1548 &["test:///bucket/key1/"],
1549 2,
1550 2,
1551 Some(""),
1552 )
1553 .await?;
1554
1555 assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some(""))
1557 .await?;
1558
1559 assert_list_files_for_multi_paths(
1561 &[
1562 "bucket/key1/file0",
1563 "bucket/key1/file1",
1564 "bucket/key1/file2",
1565 "bucket/key2/file3",
1566 "bucket/key2/file4",
1567 "bucket/key3/file5",
1568 ],
1569 &["test:///bucket/key3/"],
1570 2,
1571 1,
1572 Some(""),
1573 )
1574 .await?;
1575
1576 assert_list_files_for_multi_paths(
1578 &[
1579 "bucket/key1/file0.avro",
1580 "bucket/key1/file1.csv",
1581 "bucket/key1/file2.avro",
1582 "bucket/key2/file3.csv",
1583 "bucket/key2/file4.avro",
1584 "bucket/key3/file5.csv",
1585 ],
1586 &["test:///bucket/key1/", "test:///bucket/key3/"],
1587 2,
1588 2,
1589 None,
1590 )
1591 .await?;
1592 Ok(())
1593 }
1594
1595 async fn load_table(
1596 ctx: &SessionContext,
1597 name: &str,
1598 ) -> Result<Arc<dyn TableProvider>> {
1599 let testdata = crate::test_util::parquet_test_data();
1600 let filename = format!("{testdata}/{name}");
1601 let table_path = ListingTableUrl::parse(filename).unwrap();
1602
1603 let config = ListingTableConfig::new(table_path)
1604 .infer(&ctx.state())
1605 .await?;
1606 let table = ListingTable::try_new(config)?;
1607 Ok(Arc::new(table))
1608 }
1609
1610 async fn assert_list_files_for_scan_grouping(
1613 files: &[&str],
1614 table_prefix: &str,
1615 target_partitions: usize,
1616 output_partitioning: usize,
1617 file_ext: Option<&str>,
1618 ) -> Result<()> {
1619 let ctx = SessionContext::new();
1620 register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1621
1622 let format = AvroFormat {};
1623
1624 let opt = ListingOptions::new(Arc::new(format))
1625 .with_file_extension_opt(file_ext)
1626 .with_target_partitions(target_partitions);
1627
1628 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1629
1630 let table_path = ListingTableUrl::parse(table_prefix).unwrap();
1631 let config = ListingTableConfig::new(table_path)
1632 .with_listing_options(opt)
1633 .with_schema(Arc::new(schema));
1634
1635 let table = ListingTable::try_new(config)?;
1636
1637 let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1638
1639 assert_eq!(file_list.len(), output_partitioning);
1640
1641 Ok(())
1642 }
1643
1644 async fn assert_list_files_for_multi_paths(
1647 files: &[&str],
1648 table_prefix: &[&str],
1649 target_partitions: usize,
1650 output_partitioning: usize,
1651 file_ext: Option<&str>,
1652 ) -> Result<()> {
1653 let ctx = SessionContext::new();
1654 register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1655
1656 let format = AvroFormat {};
1657
1658 let opt = ListingOptions::new(Arc::new(format))
1659 .with_file_extension_opt(file_ext)
1660 .with_target_partitions(target_partitions);
1661
1662 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1663
1664 let table_paths = table_prefix
1665 .iter()
1666 .map(|t| ListingTableUrl::parse(t).unwrap())
1667 .collect();
1668 let config = ListingTableConfig::new_with_multi_paths(table_paths)
1669 .with_listing_options(opt)
1670 .with_schema(Arc::new(schema));
1671
1672 let table = ListingTable::try_new(config)?;
1673
1674 let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1675
1676 assert_eq!(file_list.len(), output_partitioning);
1677
1678 Ok(())
1679 }
1680
1681 #[tokio::test]
1682 async fn test_insert_into_append_new_json_files() -> Result<()> {
1683 let mut config_map: HashMap<String, String> = HashMap::new();
1684 config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1685 config_map.insert(
1686 "datafusion.execution.soft_max_rows_per_output_file".into(),
1687 "10".into(),
1688 );
1689 helper_test_append_new_files_to_table(
1690 JsonFormat::default().get_ext(),
1691 FileCompressionType::UNCOMPRESSED,
1692 Some(config_map),
1693 2,
1694 )
1695 .await?;
1696 Ok(())
1697 }
1698
1699 #[tokio::test]
1700 async fn test_insert_into_append_new_csv_files() -> Result<()> {
1701 let mut config_map: HashMap<String, String> = HashMap::new();
1702 config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1703 config_map.insert(
1704 "datafusion.execution.soft_max_rows_per_output_file".into(),
1705 "10".into(),
1706 );
1707 helper_test_append_new_files_to_table(
1708 CsvFormat::default().get_ext(),
1709 FileCompressionType::UNCOMPRESSED,
1710 Some(config_map),
1711 2,
1712 )
1713 .await?;
1714 Ok(())
1715 }
1716
1717 #[tokio::test]
1718 async fn test_insert_into_append_2_new_parquet_files_defaults() -> Result<()> {
1719 let mut config_map: HashMap<String, String> = HashMap::new();
1720 config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1721 config_map.insert(
1722 "datafusion.execution.soft_max_rows_per_output_file".into(),
1723 "10".into(),
1724 );
1725 helper_test_append_new_files_to_table(
1726 ParquetFormat::default().get_ext(),
1727 FileCompressionType::UNCOMPRESSED,
1728 Some(config_map),
1729 2,
1730 )
1731 .await?;
1732 Ok(())
1733 }
1734
1735 #[tokio::test]
1736 async fn test_insert_into_append_1_new_parquet_files_defaults() -> Result<()> {
1737 let mut config_map: HashMap<String, String> = HashMap::new();
1738 config_map.insert("datafusion.execution.batch_size".into(), "20".into());
1739 config_map.insert(
1740 "datafusion.execution.soft_max_rows_per_output_file".into(),
1741 "20".into(),
1742 );
1743 helper_test_append_new_files_to_table(
1744 ParquetFormat::default().get_ext(),
1745 FileCompressionType::UNCOMPRESSED,
1746 Some(config_map),
1747 1,
1748 )
1749 .await?;
1750 Ok(())
1751 }
1752
1753 #[tokio::test]
1754 async fn test_insert_into_sql_csv_defaults() -> Result<()> {
1755 helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None)
1756 .await?;
1757 Ok(())
1758 }
1759
1760 #[tokio::test]
1761 async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
1762 helper_test_insert_into_sql(
1763 "csv",
1764 FileCompressionType::UNCOMPRESSED,
1765 "",
1766 Some(HashMap::from([("has_header".into(), "true".into())])),
1767 )
1768 .await?;
1769 Ok(())
1770 }
1771
1772 #[tokio::test]
1773 async fn test_insert_into_sql_json_defaults() -> Result<()> {
1774 helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
1775 .await?;
1776 Ok(())
1777 }
1778
1779 #[tokio::test]
1780 async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
1781 helper_test_insert_into_sql(
1782 "parquet",
1783 FileCompressionType::UNCOMPRESSED,
1784 "",
1785 None,
1786 )
1787 .await?;
1788 Ok(())
1789 }
1790
1791 #[tokio::test]
1792 async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
1793 let mut config_map: HashMap<String, String> = HashMap::new();
1794 config_map.insert(
1795 "datafusion.execution.parquet.compression".into(),
1796 "zstd(5)".into(),
1797 );
1798 config_map.insert(
1799 "datafusion.execution.parquet.dictionary_enabled".into(),
1800 "false".into(),
1801 );
1802 config_map.insert(
1803 "datafusion.execution.parquet.dictionary_page_size_limit".into(),
1804 "100".into(),
1805 );
1806 config_map.insert(
1807 "datafusion.execution.parquet.statistics_enabled".into(),
1808 "none".into(),
1809 );
1810 config_map.insert(
1811 "datafusion.execution.parquet.max_statistics_size".into(),
1812 "10".into(),
1813 );
1814 config_map.insert(
1815 "datafusion.execution.parquet.max_row_group_size".into(),
1816 "5".into(),
1817 );
1818 config_map.insert(
1819 "datafusion.execution.parquet.created_by".into(),
1820 "datafusion test".into(),
1821 );
1822 config_map.insert(
1823 "datafusion.execution.parquet.column_index_truncate_length".into(),
1824 "50".into(),
1825 );
1826 config_map.insert(
1827 "datafusion.execution.parquet.data_page_row_count_limit".into(),
1828 "50".into(),
1829 );
1830 config_map.insert(
1831 "datafusion.execution.parquet.bloom_filter_on_write".into(),
1832 "true".into(),
1833 );
1834 config_map.insert(
1835 "datafusion.execution.parquet.bloom_filter_fpp".into(),
1836 "0.01".into(),
1837 );
1838 config_map.insert(
1839 "datafusion.execution.parquet.bloom_filter_ndv".into(),
1840 "1000".into(),
1841 );
1842 config_map.insert(
1843 "datafusion.execution.parquet.writer_version".into(),
1844 "2.0".into(),
1845 );
1846 config_map.insert(
1847 "datafusion.execution.parquet.write_batch_size".into(),
1848 "5".into(),
1849 );
1850 helper_test_insert_into_sql(
1851 "parquet",
1852 FileCompressionType::UNCOMPRESSED,
1853 "",
1854 Some(config_map),
1855 )
1856 .await?;
1857 Ok(())
1858 }
1859
1860 #[tokio::test]
1861 async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
1862 let mut config_map: HashMap<String, String> = HashMap::new();
1863 config_map.insert("datafusion.execution.batch_size".into(), "10".into());
1864 config_map.insert(
1865 "datafusion.execution.soft_max_rows_per_output_file".into(),
1866 "10".into(),
1867 );
1868 config_map.insert(
1869 "datafusion.execution.parquet.compression".into(),
1870 "zstd(5)".into(),
1871 );
1872 config_map.insert(
1873 "datafusion.execution.parquet.dictionary_enabled".into(),
1874 "false".into(),
1875 );
1876 config_map.insert(
1877 "datafusion.execution.parquet.dictionary_page_size_limit".into(),
1878 "100".into(),
1879 );
1880 config_map.insert(
1881 "datafusion.execution.parquet.statistics_enabled".into(),
1882 "none".into(),
1883 );
1884 config_map.insert(
1885 "datafusion.execution.parquet.max_statistics_size".into(),
1886 "10".into(),
1887 );
1888 config_map.insert(
1889 "datafusion.execution.parquet.max_row_group_size".into(),
1890 "5".into(),
1891 );
1892 config_map.insert(
1893 "datafusion.execution.parquet.created_by".into(),
1894 "datafusion test".into(),
1895 );
1896 config_map.insert(
1897 "datafusion.execution.parquet.column_index_truncate_length".into(),
1898 "50".into(),
1899 );
1900 config_map.insert(
1901 "datafusion.execution.parquet.data_page_row_count_limit".into(),
1902 "50".into(),
1903 );
1904 config_map.insert(
1905 "datafusion.execution.parquet.encoding".into(),
1906 "delta_binary_packed".into(),
1907 );
1908 config_map.insert(
1909 "datafusion.execution.parquet.bloom_filter_on_write".into(),
1910 "true".into(),
1911 );
1912 config_map.insert(
1913 "datafusion.execution.parquet.bloom_filter_fpp".into(),
1914 "0.01".into(),
1915 );
1916 config_map.insert(
1917 "datafusion.execution.parquet.bloom_filter_ndv".into(),
1918 "1000".into(),
1919 );
1920 config_map.insert(
1921 "datafusion.execution.parquet.writer_version".into(),
1922 "2.0".into(),
1923 );
1924 config_map.insert(
1925 "datafusion.execution.parquet.write_batch_size".into(),
1926 "5".into(),
1927 );
1928 config_map.insert("datafusion.execution.batch_size".into(), "1".into());
1929 helper_test_append_new_files_to_table(
1930 ParquetFormat::default().get_ext(),
1931 FileCompressionType::UNCOMPRESSED,
1932 Some(config_map),
1933 2,
1934 )
1935 .await?;
1936 Ok(())
1937 }
1938
1939 #[tokio::test]
1940 async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
1941 ) -> Result<()> {
1942 let mut config_map: HashMap<String, String> = HashMap::new();
1943 config_map.insert(
1944 "datafusion.execution.parquet.compression".into(),
1945 "zstd".into(),
1946 );
1947 let e = helper_test_append_new_files_to_table(
1948 ParquetFormat::default().get_ext(),
1949 FileCompressionType::UNCOMPRESSED,
1950 Some(config_map),
1951 2,
1952 )
1953 .await
1954 .expect_err("Example should fail!");
1955 assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
1956
1957 Ok(())
1958 }
1959
1960 async fn helper_test_append_new_files_to_table(
1961 file_type_ext: String,
1962 file_compression_type: FileCompressionType,
1963 session_config_map: Option<HashMap<String, String>>,
1964 expected_n_files_per_insert: usize,
1965 ) -> Result<()> {
1966 let session_ctx = match session_config_map {
1968 Some(cfg) => {
1969 let config = SessionConfig::from_string_hash_map(&cfg)?;
1970 SessionContext::new_with_config(config)
1971 }
1972 None => SessionContext::new(),
1973 };
1974
1975 let schema = Arc::new(Schema::new(vec![Field::new(
1977 "column1",
1978 DataType::Int32,
1979 false,
1980 )]));
1981
1982 let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
1983 Box::new(Expr::Column("column1".into())),
1984 Operator::GtEq,
1985 Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
1986 ));
1987
1988 let batch = RecordBatch::try_new(
1990 schema.clone(),
1991 vec![Arc::new(arrow::array::Int32Array::from(vec![
1992 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
1993 ]))],
1994 )?;
1995
1996 let tmp_dir = TempDir::new()?;
1998 match file_type_ext.as_str() {
1999 "csv" => {
2000 session_ctx
2001 .register_csv(
2002 "t",
2003 tmp_dir.path().to_str().unwrap(),
2004 CsvReadOptions::new()
2005 .schema(schema.as_ref())
2006 .file_compression_type(file_compression_type),
2007 )
2008 .await?;
2009 }
2010 "json" => {
2011 session_ctx
2012 .register_json(
2013 "t",
2014 tmp_dir.path().to_str().unwrap(),
2015 NdJsonReadOptions::default()
2016 .schema(schema.as_ref())
2017 .file_compression_type(file_compression_type),
2018 )
2019 .await?;
2020 }
2021 "parquet" => {
2022 session_ctx
2023 .register_parquet(
2024 "t",
2025 tmp_dir.path().to_str().unwrap(),
2026 ParquetReadOptions::default().schema(schema.as_ref()),
2027 )
2028 .await?;
2029 }
2030 "avro" => {
2031 session_ctx
2032 .register_avro(
2033 "t",
2034 tmp_dir.path().to_str().unwrap(),
2035 AvroReadOptions::default().schema(schema.as_ref()),
2036 )
2037 .await?;
2038 }
2039 "arrow" => {
2040 session_ctx
2041 .register_arrow(
2042 "t",
2043 tmp_dir.path().to_str().unwrap(),
2044 ArrowReadOptions::default().schema(schema.as_ref()),
2045 )
2046 .await?;
2047 }
2048 _ => panic!("Unrecognized file extension {file_type_ext}"),
2049 }
2050
2051 let source_table = Arc::new(MemTable::try_new(
2053 schema.clone(),
2054 vec![vec![batch.clone(), batch.clone()]],
2055 )?);
2056 session_ctx.register_table("source", source_table.clone())?;
2057 let source = provider_as_source(source_table);
2059 let target = session_ctx.table_provider("t").await?;
2060 let target = Arc::new(DefaultTableSource::new(target));
2061 let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
2063 .filter(filter_predicate)?
2064 .build()?;
2065 let insert_into_table =
2069 LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
2070 .build()?;
2071 let plan = session_ctx
2073 .state()
2074 .create_physical_plan(&insert_into_table)
2075 .await?;
2076 let res = collect(plan, session_ctx.task_ctx()).await?;
2078 let expected = [
2080 "+-------+",
2081 "| count |",
2082 "+-------+",
2083 "| 20 |",
2084 "+-------+",
2085 ];
2086
2087 assert_batches_eq!(expected, &res);
2089
2090 let batches = session_ctx
2092 .sql("select count(*) as count from t")
2093 .await?
2094 .collect()
2095 .await?;
2096 let expected = [
2097 "+-------+",
2098 "| count |",
2099 "+-------+",
2100 "| 20 |",
2101 "+-------+",
2102 ];
2103
2104 assert_batches_eq!(expected, &batches);
2106
2107 let num_files = tmp_dir.path().read_dir()?.count();
2109 assert_eq!(num_files, expected_n_files_per_insert);
2110
2111 let plan = session_ctx
2113 .state()
2114 .create_physical_plan(&insert_into_table)
2115 .await?;
2116
2117 let res = collect(plan, session_ctx.task_ctx()).await?;
2119 let expected = [
2121 "+-------+",
2122 "| count |",
2123 "+-------+",
2124 "| 20 |",
2125 "+-------+",
2126 ];
2127
2128 assert_batches_eq!(expected, &res);
2130
2131 let batches = session_ctx
2133 .sql("select count(*) AS count from t")
2134 .await?
2135 .collect()
2136 .await?;
2137
2138 let expected = [
2140 "+-------+",
2141 "| count |",
2142 "+-------+",
2143 "| 40 |",
2144 "+-------+",
2145 ];
2146
2147 assert_batches_eq!(expected, &batches);
2149
2150 let num_files = tmp_dir.path().read_dir()?.count();
2152 assert_eq!(num_files, expected_n_files_per_insert * 2);
2153
2154 Ok(())
2156 }
2157
2158 async fn helper_test_insert_into_sql(
2161 file_type: &str,
2162 _file_compression_type: FileCompressionType,
2164 external_table_options: &str,
2165 session_config_map: Option<HashMap<String, String>>,
2166 ) -> Result<()> {
2167 let session_ctx = match session_config_map {
2169 Some(cfg) => {
2170 let config = SessionConfig::from_string_hash_map(&cfg)?;
2171 SessionContext::new_with_config(config)
2172 }
2173 None => SessionContext::new(),
2174 };
2175
2176 let tmp_dir = TempDir::new()?;
2178 let tmp_path = tmp_dir.into_path();
2179 let str_path = tmp_path.to_str().expect("Temp path should convert to &str");
2180 session_ctx
2181 .sql(&format!(
2182 "create external table foo(a varchar, b varchar, c int) \
2183 stored as {file_type} \
2184 location '{str_path}' \
2185 {external_table_options}"
2186 ))
2187 .await?
2188 .collect()
2189 .await?;
2190
2191 session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
2193 .await?
2194 .collect()
2195 .await?;
2196
2197 let batches = session_ctx
2199 .sql("select * from foo")
2200 .await?
2201 .collect()
2202 .await?;
2203
2204 let expected = [
2205 "+-----+-----+---+",
2206 "| a | b | c |",
2207 "+-----+-----+---+",
2208 "| foo | bar | 1 |",
2209 "| foo | bar | 2 |",
2210 "| foo | bar | 3 |",
2211 "+-----+-----+---+",
2212 ];
2213 assert_batches_eq!(expected, &batches);
2214
2215 Ok(())
2216 }
2217
2218 #[tokio::test]
2219 async fn test_infer_options_compressed_csv() -> Result<()> {
2220 let testdata = crate::test_util::arrow_test_data();
2221 let filename = format!("{}/csv/aggregate_test_100.csv.gz", testdata);
2222 let table_path = ListingTableUrl::parse(filename).unwrap();
2223
2224 let ctx = SessionContext::new();
2225
2226 let config = ListingTableConfig::new(table_path);
2227 let config_with_opts = config.infer_options(&ctx.state()).await?;
2228 let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
2229
2230 let schema = config_with_schema.file_schema.unwrap();
2231
2232 assert_eq!(schema.fields.len(), 13);
2233
2234 Ok(())
2235 }
2236}