pub use datafusion_datasource_avro::source::*;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::prelude::SessionContext;
use crate::test::object_store::local_unpartitioned_file;
use arrow::datatypes::{DataType, Field, SchemaBuilder};
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{Result, ScalarValue, test_util};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_datasource_avro::AvroFormat;
use datafusion_datasource_avro::source::AvroSource;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_datasource::source::DataSourceExec;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::ObjectStore;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use rstest::*;
use url::Url;
#[tokio::test]
async fn avro_exec_without_partition() -> Result<()> {
test_with_stores(Arc::new(LocalFileSystem::new())).await
}
#[rstest]
#[tokio::test]
async fn test_chunked_avro(
#[values(10, 20, 30, 40)] chunk_size: usize,
) -> Result<()> {
test_with_stores(Arc::new(ChunkedStore::new(
Arc::new(LocalFileSystem::new()),
chunk_size,
)))
.await
}
async fn test_with_stores(store: Arc<dyn ObjectStore>) -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let url = Url::parse("file://").unwrap();
session_ctx.register_object_store(&url, store.clone());
let testdata = test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
let meta = local_unpartitioned_file(filename);
let file_schema = AvroFormat {}
.infer_schema(&state, &store, std::slice::from_ref(&meta))
.await?;
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
let conf = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(meta.into())
.with_projection_indices(Some(vec![0, 1, 2]))?
.build();
let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
source_exec
.properties()
.output_partitioning()
.partition_count(),
1
);
let mut results = source_exec
.execute(0, state.task_ctx())
.expect("plan execution failed");
let batch = results
.next()
.await
.expect("plan iterator empty")
.expect("plan iterator returned an error");
insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r"
+----+----------+-------------+
| id | bool_col | tinyint_col |
+----+----------+-------------+
| 4 | true | 0 |
| 5 | false | 1 |
| 6 | true | 0 |
| 7 | false | 1 |
| 2 | true | 0 |
| 3 | false | 1 |
| 0 | true | 0 |
| 1 | false | 1 |
+----+----------+-------------+
");}
let batch = results.next().await;
assert!(batch.is_none());
let batch = results.next().await;
assert!(batch.is_none());
let batch = results.next().await;
assert!(batch.is_none());
Ok(())
}
#[tokio::test]
async fn avro_exec_missing_column() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let testdata = test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
let object_store = Arc::new(LocalFileSystem::new()) as _;
let object_store_url = ObjectStoreUrl::local_filesystem();
let meta = local_unpartitioned_file(filename);
let actual_schema = AvroFormat {}
.infer_schema(&state, &object_store, std::slice::from_ref(&meta))
.await?;
let mut builder = SchemaBuilder::from(actual_schema.fields());
builder.push(Field::new("missing_col", DataType::Int32, true));
let file_schema = Arc::new(builder.finish());
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
let conf = FileScanConfigBuilder::new(object_store_url, source)
.with_file(meta.into())
.with_projection_indices(projection)?
.build();
let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
source_exec
.properties()
.output_partitioning()
.partition_count(),
1
);
let mut results = source_exec
.execute(0, state.task_ctx())
.expect("plan execution failed");
let batch = results
.next()
.await
.expect("plan iterator empty")
.expect("plan iterator returned an error");
insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r"
+----+----------+-------------+-------------+
| id | bool_col | tinyint_col | missing_col |
+----+----------+-------------+-------------+
| 4 | true | 0 | |
| 5 | false | 1 | |
| 6 | true | 0 | |
| 7 | false | 1 | |
| 2 | true | 0 | |
| 3 | false | 1 | |
| 0 | true | 0 | |
| 1 | false | 1 | |
+----+----------+-------------+-------------+
");}
let batch = results.next().await;
assert!(batch.is_none());
let batch = results.next().await;
assert!(batch.is_none());
let batch = results.next().await;
assert!(batch.is_none());
Ok(())
}
#[tokio::test]
async fn avro_exec_with_partition() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let testdata = test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
let object_store = Arc::new(LocalFileSystem::new()) as _;
let object_store_url = ObjectStoreUrl::local_filesystem();
let meta = local_unpartitioned_file(filename);
let file_schema = AvroFormat {}
.infer_schema(&state, &object_store, std::slice::from_ref(&meta))
.await?;
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let table_schema = TableSchema::new(
file_schema.clone(),
vec![Arc::new(Field::new("date", DataType::Utf8, false))],
);
let source = Arc::new(AvroSource::new(table_schema.clone()));
let conf = FileScanConfigBuilder::new(object_store_url, source)
.with_projection_indices(projection)?
.with_file(partitioned_file)
.build();
let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
source_exec
.properties()
.output_partitioning()
.partition_count(),
1
);
let mut results = source_exec
.execute(0, state.task_ctx())
.expect("plan execution failed");
let batch = results
.next()
.await
.expect("plan iterator empty")
.expect("plan iterator returned an error");
insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r"
+----+----------+------------+-------------+
| id | bool_col | date | tinyint_col |
+----+----------+------------+-------------+
| 4 | true | 2021-10-26 | 0 |
| 5 | false | 2021-10-26 | 1 |
| 6 | true | 2021-10-26 | 0 |
| 7 | false | 2021-10-26 | 1 |
| 2 | true | 2021-10-26 | 0 |
| 3 | false | 2021-10-26 | 1 |
| 0 | true | 2021-10-26 | 0 |
| 1 | false | 2021-10-26 | 1 |
+----+----------+------------+-------------+
");}
let batch = results.next().await;
assert!(batch.is_none());
Ok(())
}
}