datafusion/datasource/
mod.rs1pub mod dynamic_file;
23pub mod empty;
24pub mod file_format;
25pub mod listing;
26pub mod listing_table_factory;
27mod memory_test;
28pub mod physical_plan;
29pub mod provider;
30mod view_test;
31
32pub use self::default_table_source::{
34 DefaultTableSource, provider_as_source, source_as_provider,
35};
36pub use self::memory::MemTable;
37pub use self::view::ViewTable;
38pub use crate::catalog::TableProvider;
39pub use crate::logical_expr::TableType;
40pub use datafusion_catalog::cte_worktable;
41pub use datafusion_catalog::default_table_source;
42pub use datafusion_catalog::memory;
43pub use datafusion_catalog::stream;
44pub use datafusion_catalog::view;
45pub use datafusion_datasource::schema_adapter;
46pub use datafusion_datasource::sink;
47pub use datafusion_datasource::source;
48pub use datafusion_datasource::table_schema;
49pub use datafusion_execution::object_store;
50pub use datafusion_physical_expr::create_ordering;
51
52#[cfg(all(test, feature = "parquet"))]
53mod tests {
54
55 use crate::prelude::SessionContext;
56 use ::object_store::{ObjectMeta, path::Path};
57 use arrow::{
58 array::Int32Array,
59 datatypes::{DataType, Field, Schema, SchemaRef},
60 record_batch::RecordBatch,
61 };
62 use datafusion_common::{
63 Result, ScalarValue,
64 test_util::batches_to_sort_string,
65 tree_node::{Transformed, TransformedResult, TreeNode},
66 };
67 use datafusion_datasource::{
68 PartitionedFile, file_scan_config::FileScanConfigBuilder, source::DataSourceExec,
69 };
70 use datafusion_datasource_parquet::source::ParquetSource;
71 use datafusion_physical_expr::expressions::{Column, Literal};
72 use datafusion_physical_expr_adapter::{
73 PhysicalExprAdapter, PhysicalExprAdapterFactory,
74 };
75 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
76 use datafusion_physical_plan::collect;
77 use std::{fs, sync::Arc};
78 use tempfile::TempDir;
79
80 #[tokio::test]
81 async fn can_override_physical_expr_adapter() {
82 use datafusion_execution::object_store::ObjectStoreUrl;
87 let tmp_dir = TempDir::new().unwrap();
88 let table_dir = tmp_dir.path().join("parquet_test");
89 fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
90 let f1 = Field::new("id", DataType::Int32, true);
91
92 let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
93 let filename = "part.parquet".to_string();
94 let path = table_dir.as_path().join(filename.clone());
95 let file = fs::File::create(path.clone()).unwrap();
96 let mut writer =
97 parquet::arrow::ArrowWriter::try_new(file, file_schema.clone(), None)
98 .unwrap();
99
100 let ids = Arc::new(Int32Array::from(vec![1i32]));
101 let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap();
102
103 writer.write(&rec_batch).unwrap();
104 writer.close().unwrap();
105
106 let location = Path::parse(path.to_str().unwrap()).unwrap();
107 let metadata = fs::metadata(path.as_path()).expect("Local file metadata");
108 let meta = ObjectMeta {
109 location,
110 last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
111 size: metadata.len(),
112 e_tag: None,
113 version: None,
114 };
115
116 let partitioned_file = PartitionedFile::new_from_meta(meta);
117
118 let f1 = Field::new("id", DataType::Int32, true);
119 let f2 = Field::new("extra_column", DataType::Utf8, true);
120
121 let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
122 let source = Arc::new(ParquetSource::new(Arc::clone(&schema)));
123 let base_conf =
124 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
125 .with_file(partitioned_file)
126 .with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory)))
127 .build();
128
129 let parquet_exec = DataSourceExec::from_data_source(base_conf);
130
131 let session_ctx = SessionContext::new();
132 let task_ctx = session_ctx.task_ctx();
133 let read = collect(parquet_exec, task_ctx).await.unwrap();
134
135 insta::assert_snapshot!(batches_to_sort_string(&read),@r"
136 +----+--------------+
137 | id | extra_column |
138 +----+--------------+
139 | 1 | foo |
140 +----+--------------+
141 ");
142 }
143
144 #[derive(Debug)]
145 struct TestPhysicalExprAdapterFactory;
146
147 impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
148 fn create(
149 &self,
150 _logical_file_schema: SchemaRef,
151 physical_file_schema: SchemaRef,
152 ) -> Result<Arc<dyn PhysicalExprAdapter>> {
153 Ok(Arc::new(TestPhysicalExprAdapter {
154 physical_file_schema,
155 }))
156 }
157 }
158
159 #[derive(Debug)]
160 struct TestPhysicalExprAdapter {
161 physical_file_schema: SchemaRef,
162 }
163
164 impl PhysicalExprAdapter for TestPhysicalExprAdapter {
165 fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
166 expr.transform(|e| {
167 if let Some(column) = e.as_any().downcast_ref::<Column>() {
168 if column.name() == "extra_column"
170 && self.physical_file_schema.index_of("extra_column").is_err()
171 {
172 return Ok(Transformed::yes(Arc::new(Literal::new(
173 ScalarValue::Utf8(Some("foo".to_string())),
174 ))
175 as Arc<dyn PhysicalExpr>));
176 }
177 }
178 Ok(Transformed::no(e))
179 })
180 .data()
181 }
182 }
183}