datafusion/datasource/
mod.rs1pub mod dynamic_file;
23pub mod file_format;
24pub mod listing;
25pub mod listing_table_factory;
26mod memory_test;
27pub mod physical_plan;
28pub mod provider;
29mod view_test;
30
31pub use self::default_table_source::{
33 DefaultTableSource, provider_as_source, source_as_provider,
34};
35pub use self::memory::MemTable;
36pub use self::view::ViewTable;
37pub use crate::catalog::TableProvider;
38pub use crate::logical_expr::TableType;
39pub use datafusion_catalog::cte_worktable;
40pub use datafusion_catalog::default_table_source;
41pub use datafusion_catalog::empty;
42pub use datafusion_catalog::memory;
43pub use datafusion_catalog::stream;
44pub use datafusion_catalog::view;
45pub use datafusion_datasource::projection;
46pub use datafusion_datasource::schema_adapter;
47pub use datafusion_datasource::sink;
48pub use datafusion_datasource::source;
49pub use datafusion_datasource::table_schema;
50pub use datafusion_execution::object_store;
51pub use datafusion_physical_expr::create_ordering;
52
53#[cfg(all(test, feature = "parquet"))]
54mod tests {
55
56 use crate::prelude::SessionContext;
57 use ::object_store::{ObjectMeta, path::Path};
58 use arrow::{
59 array::Int32Array,
60 datatypes::{DataType, Field, Schema, SchemaRef},
61 record_batch::RecordBatch,
62 };
63 use datafusion_common::{
64 Result, ScalarValue,
65 test_util::batches_to_sort_string,
66 tree_node::{Transformed, TransformedResult, TreeNode},
67 };
68 use datafusion_datasource::{
69 PartitionedFile, file_scan_config::FileScanConfigBuilder, source::DataSourceExec,
70 };
71 use datafusion_datasource_parquet::source::ParquetSource;
72 use datafusion_physical_expr::expressions::{Column, Literal};
73 use datafusion_physical_expr_adapter::{
74 PhysicalExprAdapter, PhysicalExprAdapterFactory,
75 };
76 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
77 use datafusion_physical_plan::collect;
78 use std::{fs, sync::Arc};
79 use tempfile::TempDir;
80 use url::Url;
81
82 #[tokio::test]
83 async fn can_override_physical_expr_adapter() {
84 use datafusion_execution::object_store::ObjectStoreUrl;
89 let tmp_dir = TempDir::new().unwrap();
90 let table_dir = tmp_dir.path().join("parquet_test");
91 fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
92 let f1 = Field::new("id", DataType::Int32, true);
93
94 let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
95 let filename = "part.parquet".to_string();
96 let path = table_dir.as_path().join(filename.clone());
97 let file = fs::File::create(path.clone()).unwrap();
98 let mut writer =
99 parquet::arrow::ArrowWriter::try_new(file, file_schema.clone(), None)
100 .unwrap();
101
102 let ids = Arc::new(Int32Array::from(vec![1i32]));
103 let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap();
104
105 writer.write(&rec_batch).unwrap();
106 writer.close().unwrap();
107
108 let url = Url::from_file_path(path.canonicalize().unwrap()).unwrap();
109 let location = Path::from_url_path(url.path()).unwrap();
110 let metadata = fs::metadata(path.as_path()).expect("Local file metadata");
111 let meta = ObjectMeta {
112 location,
113 last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
114 size: metadata.len(),
115 e_tag: None,
116 version: None,
117 };
118
119 let partitioned_file = PartitionedFile::new_from_meta(meta);
120
121 let f1 = Field::new("id", DataType::Int32, true);
122 let f2 = Field::new("extra_column", DataType::Utf8, true);
123
124 let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
125 let source = Arc::new(ParquetSource::new(Arc::clone(&schema)));
126 let base_conf =
127 FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
128 .with_file(partitioned_file)
129 .with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory)))
130 .build();
131
132 let parquet_exec = DataSourceExec::from_data_source(base_conf);
133
134 let session_ctx = SessionContext::new();
135 let task_ctx = session_ctx.task_ctx();
136 let read = collect(parquet_exec, task_ctx).await.unwrap();
137
138 insta::assert_snapshot!(batches_to_sort_string(&read),@r"
139 +----+--------------+
140 | id | extra_column |
141 +----+--------------+
142 | 1 | foo |
143 +----+--------------+
144 ");
145 }
146
147 #[derive(Debug)]
148 struct TestPhysicalExprAdapterFactory;
149
150 impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
151 fn create(
152 &self,
153 _logical_file_schema: SchemaRef,
154 physical_file_schema: SchemaRef,
155 ) -> Result<Arc<dyn PhysicalExprAdapter>> {
156 Ok(Arc::new(TestPhysicalExprAdapter {
157 physical_file_schema,
158 }))
159 }
160 }
161
162 #[derive(Debug)]
163 struct TestPhysicalExprAdapter {
164 physical_file_schema: SchemaRef,
165 }
166
167 impl PhysicalExprAdapter for TestPhysicalExprAdapter {
168 fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
169 expr.transform(|e| {
170 if let Some(column) = e.downcast_ref::<Column>() {
171 if column.name() == "extra_column"
173 && self.physical_file_schema.index_of("extra_column").is_err()
174 {
175 return Ok(Transformed::yes(Arc::new(Literal::new(
176 ScalarValue::Utf8(Some("foo".to_string())),
177 ))
178 as Arc<dyn PhysicalExpr>));
179 }
180 }
181 Ok(Transformed::no(e))
182 })
183 .data()
184 }
185 }
186}