pub mod parquet;
use std::any::Any;
use std::collections::HashMap;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, error::Error, path::PathBuf, sync::Arc};
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
use crate::error::Result;
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::options::ReadOptions;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::{Statistics, TableReference};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
#[macro_export]
macro_rules! assert_batches_eq {
($EXPECTED_LINES: expr, $CHUNKS: expr) => {
let expected_lines: Vec<String> =
$EXPECTED_LINES.iter().map(|&s| s.into()).collect();
let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
.unwrap()
.to_string();
let actual_lines: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
#[macro_export]
macro_rules! assert_batches_sorted_eq {
($EXPECTED_LINES: expr, $CHUNKS: expr) => {
let mut expected_lines: Vec<String> =
$EXPECTED_LINES.iter().map(|&s| s.into()).collect();
let num_lines = expected_lines.len();
if num_lines > 3 {
expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
.unwrap()
.to_string();
let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
let num_lines = actual_lines.len();
if num_lines > 3 {
actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
pub fn arrow_test_data() -> String {
match get_data_dir("ARROW_TEST_DATA", "../../testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get arrow data dir: {err}"),
}
}
pub fn parquet_test_data() -> String {
match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get parquet data dir: {err}"),
}
}
pub fn get_data_dir(
udf_env: &str,
submodule_data: &str,
) -> Result<PathBuf, Box<dyn Error>> {
if let Ok(dir) = env::var(udf_env) {
let trimmed = dir.trim().to_string();
if !trimmed.is_empty() {
let pb = PathBuf::from(trimmed);
if pb.is_dir() {
return Ok(pb);
} else {
return Err(format!(
"the data dir `{}` defined by env {} not found",
pb.display(),
udf_env
)
.into());
}
}
}
let dir = env!("CARGO_MANIFEST_DIR");
let pb = PathBuf::from(dir).join(submodule_data);
if pb.is_dir() {
Ok(pb)
} else {
Err(format!(
"env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
HINT: try running `git submodule update --init`",
udf_env,
pb.display(),
).into())
}
}
pub fn scan_empty(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}
pub fn scan_empty_with_partitions(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
partitions: usize,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(
vec![("testing".into(), "test".into())].into_iter(),
));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]);
Arc::new(schema)
}
pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(
vec![("testing".into(), "test".into())].into_iter(),
));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
Field::new("missing_col", DataType::Int64, true),
]);
Arc::new(schema)
}
pub struct TestTableFactory {}
#[async_trait]
impl TableProviderFactory for TestTableFactory {
async fn create(
&self,
_: &SessionState,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
Ok(Arc::new(TestTableProvider {
url: cmd.location.to_string(),
schema: Arc::new(cmd.schema.as_ref().into()),
}))
}
}
pub struct TestTableProvider {
pub url: String,
pub schema: SchemaRef,
}
impl TestTableProvider {}
#[async_trait]
impl TableProvider for TestTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
unimplemented!("TestTableProvider is a stub for testing.")
}
async fn scan(
&self,
_state: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("TestTableProvider is a stub for testing.")
}
}
#[derive(Debug, Clone)]
pub struct UnboundedExec {
batch_produce: Option<usize>,
batch: RecordBatch,
partitions: usize,
}
impl UnboundedExec {
pub fn new(
batch_produce: Option<usize>,
batch: RecordBatch,
partitions: usize,
) -> Self {
Self {
batch_produce,
batch,
partitions,
}
}
}
impl ExecutionPlan for UnboundedExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions)
}
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(self.batch_produce.is_none())
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(UnboundedStream {
batch_produce: self.batch_produce,
count: 0,
batch: self.batch.clone(),
}))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"UnboundableExec: unbounded={}",
self.batch_produce.is_none(),
)
}
}
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
#[derive(Debug)]
struct UnboundedStream {
batch_produce: Option<usize>,
count: usize,
batch: RecordBatch,
}
impl Stream for UnboundedStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(val) = self.batch_produce {
if val <= self.count {
return Poll::Ready(None);
}
}
self.count += 1;
Poll::Ready(Some(Ok(self.batch.clone())))
}
}
impl RecordBatchStream for UnboundedStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[test]
fn test_data_dir() {
let udf_env = "get_data_dir";
let cwd = env::current_dir().unwrap();
let existing_pb = cwd.join("..");
let existing = existing_pb.display().to_string();
let existing_str = existing.as_str();
let non_existing = cwd.join("non-existing-dir").display().to_string();
let non_existing_str = non_existing.as_str();
env::set_var(udf_env, non_existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_err());
env::set_var(udf_env, "");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
env::set_var(udf_env, " ");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
env::set_var(udf_env, existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
env::remove_var(udf_env);
let res = get_data_dir(udf_env, non_existing_str);
assert!(res.is_err());
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
}
#[test]
fn test_happy() {
let res = arrow_test_data();
assert!(PathBuf::from(res).is_dir());
let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}
}
pub async fn register_unbounded_file_with_ordering(
ctx: &SessionContext,
schema: SchemaRef,
file_path: &Path,
table_name: &str,
file_sort_order: Vec<Vec<Expr>>,
with_unbounded_execution: bool,
) -> Result<()> {
let fifo_options = CsvReadOptions::new()
.schema(schema.as_ref())
.mark_infinite(with_unbounded_execution);
let options_sort = fifo_options
.to_listing_options(&ctx.copied_config())
.with_file_sort_order(file_sort_order);
ctx.register_listing_table(
table_name,
file_path.as_os_str().to_str().unwrap(),
options_sort,
Some(schema),
None,
)
.await?;
Ok(())
}