term_guard/sources/
parquet.rs1use super::DataSource;
4use crate::prelude::*;
5use async_trait::async_trait;
6use datafusion::arrow::datatypes::Schema;
7use datafusion::datasource::file_format::parquet::ParquetFormat;
8use datafusion::datasource::listing::{
9 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::*;
12use std::sync::Arc;
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
17pub struct ParquetOptions {
18 pub schema: Option<Arc<Schema>>,
20 pub enable_pruning: bool,
22 pub batch_size: usize,
24 pub max_threads: Option<usize>,
26}
27
28impl ParquetOptions {
29 pub fn new() -> Self {
31 Self {
32 schema: None,
33 enable_pruning: true,
34 batch_size: 8192,
35 max_threads: None,
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
65pub struct ParquetSource {
66 paths: Vec<String>,
67 options: ParquetOptions,
68 metadata_schema: Option<Arc<Schema>>,
69}
70
71impl ParquetSource {
72 pub fn new(path: impl Into<String>) -> Result<Self> {
74 Ok(Self {
75 paths: vec![path.into()],
76 options: ParquetOptions::new(),
77 metadata_schema: None,
78 })
79 }
80
81 pub fn with_options(path: impl Into<String>, options: ParquetOptions) -> Result<Self> {
83 Ok(Self {
84 paths: vec![path.into()],
85 options,
86 metadata_schema: None,
87 })
88 }
89
90 pub fn from_paths(paths: Vec<String>) -> Result<Self> {
92 if paths.is_empty() {
93 return Err(TermError::Configuration(
94 "At least one path must be provided".to_string(),
95 ));
96 }
97 Ok(Self {
98 paths,
99 options: ParquetOptions::new(),
100 metadata_schema: None,
101 })
102 }
103
104 pub async fn from_glob(pattern: impl Into<String>) -> Result<Self> {
106 let patterns = vec![pattern.into()];
107 let paths = super::expand_globs(&patterns).await?;
108 Self::from_paths(paths)
109 }
110
111 pub async fn from_globs(patterns: Vec<String>) -> Result<Self> {
113 let paths = super::expand_globs(&patterns).await?;
114 Self::from_paths(paths)
115 }
116
117 pub fn with_custom_options(mut self, options: ParquetOptions) -> Self {
119 self.options = options;
120 self
121 }
122
123 #[instrument(skip(self))]
125 #[allow(dead_code)]
126 async fn read_metadata_schema(&mut self) -> Result<Arc<Schema>> {
127 if let Some(schema) = &self.options.schema {
128 return Ok(schema.clone());
129 }
130
131 if let Some(schema) = &self.metadata_schema {
132 return Ok(schema.clone());
133 }
134
135 let ctx = SessionContext::new();
137
138 let first_path = &self.paths[0];
140 let options = ParquetReadOptions::default();
141 let df = ctx.read_parquet(first_path, options).await?;
142 let schema = df.schema().inner().clone();
143
144 self.metadata_schema = Some(schema.clone());
145 Ok(schema)
146 }
147}
148
149#[async_trait]
150impl DataSource for ParquetSource {
151 #[instrument(skip(self, ctx, telemetry), fields(table_name = %table_name, source_type = "parquet", file_count = self.paths.len()))]
152 async fn register_with_telemetry(
153 &self,
154 ctx: &SessionContext,
155 table_name: &str,
156 telemetry: Option<&Arc<TermTelemetry>>,
157 ) -> Result<()> {
158 let mut _datasource_span = if let Some(tel) = telemetry {
160 tel.start_datasource_span("parquet", table_name)
161 } else {
162 TermSpan::noop()
163 };
164 if self.paths.len() == 1 {
166 let mut options = ParquetReadOptions::default();
168 if let Some(schema) = &self.options.schema {
169 options = options.schema(schema);
170 }
171
172 ctx.register_parquet(table_name, &self.paths[0], options)
173 .await?;
174 } else {
175 let first_path = std::path::Path::new(&self.paths[0]);
178 let dir_path = first_path
179 .parent()
180 .ok_or_else(|| TermError::Configuration("Invalid file path".to_string()))?;
181 let dir_path_str = dir_path.to_str().ok_or_else(|| {
182 TermError::Configuration("Path contains invalid UTF-8".to_string())
183 })?;
184 let table_path = ListingTableUrl::parse(dir_path_str)?;
185
186 let format = ParquetFormat::new().with_enable_pruning(self.options.enable_pruning);
187
188 let listing_options =
189 ListingOptions::new(Arc::new(format)).with_file_extension(".parquet");
190
191 let schema = if let Some(schema) = &self.options.schema {
193 schema.clone()
194 } else {
195 let mut source_clone = self.clone();
197 source_clone.read_metadata_schema().await?
198 };
199
200 let config = ListingTableConfig::new(table_path)
201 .with_listing_options(listing_options)
202 .with_schema(schema);
203
204 let table = ListingTable::try_new(config)?;
205 ctx.register_table(table_name, Arc::new(table))?;
206 }
207
208 Ok(())
209 }
210
211 fn schema(&self) -> Option<&Arc<Schema>> {
212 self.options
213 .schema
214 .as_ref()
215 .or(self.metadata_schema.as_ref())
216 }
217
218 fn description(&self) -> String {
219 if self.paths.len() == 1 {
220 let path = &self.paths[0];
221 format!("Parquet file: {path}")
222 } else {
223 let count = self.paths.len();
224 format!("Parquet files: {count} files")
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use arrow::array::{Int32Array, StringArray};
233 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
234 use arrow::record_batch::RecordBatch;
235 use datafusion::parquet::arrow::ArrowWriter;
236 use std::fs::File;
237 use tempfile::NamedTempFile;
238
239 fn create_test_parquet() -> NamedTempFile {
240 let file = NamedTempFile::with_suffix(".parquet").unwrap();
241
242 let schema = Arc::new(ArrowSchema::new(vec![
244 Field::new("id", DataType::Int32, false),
245 Field::new("name", DataType::Utf8, false),
246 ]));
247
248 let batch = RecordBatch::try_new(
250 schema.clone(),
251 vec![
252 Arc::new(Int32Array::from(vec![1, 2, 3])),
253 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
254 ],
255 )
256 .unwrap();
257
258 let props = Default::default();
260 let file_handle = File::create(file.path()).unwrap();
261 let mut writer = ArrowWriter::try_new(file_handle, schema, props).unwrap();
262 writer.write(&batch).unwrap();
263 writer.close().unwrap();
264
265 file
266 }
267
268 #[tokio::test]
269 async fn test_parquet_source_single_file() {
270 let file = create_test_parquet();
271 let source = ParquetSource::new(file.path().to_str().unwrap()).unwrap();
272
273 assert_eq!(source.paths.len(), 1);
274 assert!(source.description().contains("Parquet file"));
275 }
276
277 #[tokio::test]
278 async fn test_parquet_source_with_options() {
279 let file = create_test_parquet();
280 let options = ParquetOptions {
281 enable_pruning: false,
282 batch_size: 16384,
283 ..Default::default()
284 };
285
286 let source = ParquetSource::with_options(file.path().to_str().unwrap(), options).unwrap();
287 assert!(!source.options.enable_pruning);
288 assert_eq!(source.options.batch_size, 16384);
289 }
290
291 #[tokio::test]
292 async fn test_parquet_source_multiple_files() {
293 let file1 = create_test_parquet();
294 let file2 = create_test_parquet();
295
296 let paths = vec![
297 file1.path().to_str().unwrap().to_string(),
298 file2.path().to_str().unwrap().to_string(),
299 ];
300
301 let source = ParquetSource::from_paths(paths).unwrap();
302 assert_eq!(source.paths.len(), 2);
303 assert!(source.description().contains("2 files"));
304 }
305
306 #[tokio::test]
307 async fn test_parquet_metadata_reading() {
308 let file = create_test_parquet();
309 let mut source = ParquetSource::new(file.path().to_str().unwrap()).unwrap();
310
311 let schema = source.read_metadata_schema().await.unwrap();
312 assert_eq!(schema.fields().len(), 2);
313 assert_eq!(schema.field(0).name(), "id");
314 assert_eq!(schema.field(1).name(), "name");
315 }
316
317 #[tokio::test]
318 async fn test_parquet_registration() {
319 let file = create_test_parquet();
320 let source = ParquetSource::new(file.path().to_str().unwrap()).unwrap();
321
322 let ctx = SessionContext::new();
323 source.register(&ctx, "test_table").await.unwrap();
324
325 let df = ctx
327 .sql("SELECT COUNT(*) as count FROM test_table")
328 .await
329 .unwrap();
330 let batches = df.collect().await.unwrap();
331 assert!(!batches.is_empty());
332 }
333}