term_guard/sources/
parquet.rs

1//! Parquet file source implementation.
2
3use super::DataSource;
4use crate::prelude::*;
5use async_trait::async_trait;
6use datafusion::arrow::datatypes::Schema;
7use datafusion::datasource::file_format::parquet::ParquetFormat;
8use datafusion::datasource::listing::{
9    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::*;
12use std::sync::Arc;
13use tracing::instrument;
14
15/// Options for configuring Parquet file reading.
16#[derive(Debug, Clone, Default)]
17pub struct ParquetOptions {
18    /// Schema to use (if None, will be read from file metadata)
19    pub schema: Option<Arc<Schema>>,
20    /// Whether to use pruning based on Parquet statistics
21    pub enable_pruning: bool,
22    /// Batch size for reading
23    pub batch_size: usize,
24    /// Maximum number of threads to use for reading
25    pub max_threads: Option<usize>,
26}
27
28impl ParquetOptions {
29    /// Creates default options with pruning enabled.
30    pub fn new() -> Self {
31        Self {
32            schema: None,
33            enable_pruning: true,
34            batch_size: 8192,
35            max_threads: None,
36        }
37    }
38}
39
40/// A Parquet file data source with metadata reading and efficient querying.
41///
42/// # Examples
43///
44/// ```rust,ignore
45/// use term_guard::sources::{ParquetSource, ParquetOptions};
46///
47/// # async fn example() -> Result<()> {
48/// // Simple Parquet file
49/// let source = ParquetSource::new("data/events.parquet")?;
50///
51/// // Parquet with custom options
52/// let options = ParquetOptions {
53///     enable_pruning: false,
54///     batch_size: 16384,
55///     ..Default::default()
56/// };
57/// let source = ParquetSource::with_options("data/events.parquet", options)?;
58///
59/// // Multiple files with glob pattern
60/// let source = ParquetSource::from_glob("data/year=2023/*.parquet").await?;
61/// # Ok(())
62/// # }
63/// ```
64#[derive(Debug, Clone)]
65pub struct ParquetSource {
66    paths: Vec<String>,
67    options: ParquetOptions,
68    metadata_schema: Option<Arc<Schema>>,
69}
70
71impl ParquetSource {
72    /// Creates a new Parquet source from a single file path.
73    pub fn new(path: impl Into<String>) -> Result<Self> {
74        Ok(Self {
75            paths: vec![path.into()],
76            options: ParquetOptions::new(),
77            metadata_schema: None,
78        })
79    }
80
81    /// Creates a new Parquet source with custom options.
82    pub fn with_options(path: impl Into<String>, options: ParquetOptions) -> Result<Self> {
83        Ok(Self {
84            paths: vec![path.into()],
85            options,
86            metadata_schema: None,
87        })
88    }
89
90    /// Creates a Parquet source from multiple file paths.
91    pub fn from_paths(paths: Vec<String>) -> Result<Self> {
92        if paths.is_empty() {
93            return Err(TermError::Configuration(
94                "At least one path must be provided".to_string(),
95            ));
96        }
97        Ok(Self {
98            paths,
99            options: ParquetOptions::new(),
100            metadata_schema: None,
101        })
102    }
103
104    /// Creates a Parquet source from a glob pattern.
105    pub async fn from_glob(pattern: impl Into<String>) -> Result<Self> {
106        let patterns = vec![pattern.into()];
107        let paths = super::expand_globs(&patterns).await?;
108        Self::from_paths(paths)
109    }
110
111    /// Creates a Parquet source from multiple glob patterns.
112    pub async fn from_globs(patterns: Vec<String>) -> Result<Self> {
113        let paths = super::expand_globs(&patterns).await?;
114        Self::from_paths(paths)
115    }
116
117    /// Sets custom options for this Parquet source.
118    pub fn with_custom_options(mut self, options: ParquetOptions) -> Self {
119        self.options = options;
120        self
121    }
122
123    /// Reads schema from Parquet file metadata.
124    #[instrument(skip(self))]
125    #[allow(dead_code)]
126    async fn read_metadata_schema(&mut self) -> Result<Arc<Schema>> {
127        if let Some(schema) = &self.options.schema {
128            return Ok(schema.clone());
129        }
130
131        if let Some(schema) = &self.metadata_schema {
132            return Ok(schema.clone());
133        }
134
135        // Create a temporary context for schema inference
136        let ctx = SessionContext::new();
137
138        // For Parquet, we can read schema directly from file metadata
139        let first_path = &self.paths[0];
140        let options = ParquetReadOptions::default();
141        let df = ctx.read_parquet(first_path, options).await?;
142        let schema = df.schema().inner().clone();
143
144        self.metadata_schema = Some(schema.clone());
145        Ok(schema)
146    }
147}
148
149#[async_trait]
150impl DataSource for ParquetSource {
151    #[instrument(skip(self, ctx, telemetry), fields(table_name = %table_name, source_type = "parquet", file_count = self.paths.len()))]
152    async fn register_with_telemetry(
153        &self,
154        ctx: &SessionContext,
155        table_name: &str,
156        telemetry: Option<&Arc<TermTelemetry>>,
157    ) -> Result<()> {
158        // Create telemetry span for data source loading
159        let mut _datasource_span = if let Some(tel) = telemetry {
160            tel.start_datasource_span("parquet", table_name)
161        } else {
162            TermSpan::noop()
163        };
164        // Handle multiple paths
165        if self.paths.len() == 1 {
166            // Single file - use register_parquet for simplicity
167            let mut options = ParquetReadOptions::default();
168            if let Some(schema) = &self.options.schema {
169                options = options.schema(schema);
170            }
171
172            ctx.register_parquet(table_name, &self.paths[0], options)
173                .await?;
174        } else {
175            // Multiple files - use ListingTable
176            // For multiple files, we need to use the directory path, not a specific file
177            let first_path = std::path::Path::new(&self.paths[0]);
178            let dir_path = first_path
179                .parent()
180                .ok_or_else(|| TermError::Configuration("Invalid file path".to_string()))?;
181            let dir_path_str = dir_path.to_str().ok_or_else(|| {
182                TermError::Configuration("Path contains invalid UTF-8".to_string())
183            })?;
184            let table_path = ListingTableUrl::parse(dir_path_str)?;
185
186            let format = ParquetFormat::new().with_enable_pruning(self.options.enable_pruning);
187
188            let listing_options =
189                ListingOptions::new(Arc::new(format)).with_file_extension(".parquet");
190
191            // Infer schema if not provided
192            let schema = if let Some(schema) = &self.options.schema {
193                schema.clone()
194            } else {
195                // Infer schema using a mutable clone
196                let mut source_clone = self.clone();
197                source_clone.read_metadata_schema().await?
198            };
199
200            let config = ListingTableConfig::new(table_path)
201                .with_listing_options(listing_options)
202                .with_schema(schema);
203
204            let table = ListingTable::try_new(config)?;
205            ctx.register_table(table_name, Arc::new(table))?;
206        }
207
208        Ok(())
209    }
210
211    fn schema(&self) -> Option<&Arc<Schema>> {
212        self.options
213            .schema
214            .as_ref()
215            .or(self.metadata_schema.as_ref())
216    }
217
218    fn description(&self) -> String {
219        if self.paths.len() == 1 {
220            let path = &self.paths[0];
221            format!("Parquet file: {path}")
222        } else {
223            let count = self.paths.len();
224            format!("Parquet files: {count} files")
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use arrow::array::{Int32Array, StringArray};
233    use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
234    use arrow::record_batch::RecordBatch;
235    use datafusion::parquet::arrow::ArrowWriter;
236    use std::fs::File;
237    use tempfile::NamedTempFile;
238
239    fn create_test_parquet() -> NamedTempFile {
240        let file = NamedTempFile::with_suffix(".parquet").unwrap();
241
242        // Create schema
243        let schema = Arc::new(ArrowSchema::new(vec![
244            Field::new("id", DataType::Int32, false),
245            Field::new("name", DataType::Utf8, false),
246        ]));
247
248        // Create data
249        let batch = RecordBatch::try_new(
250            schema.clone(),
251            vec![
252                Arc::new(Int32Array::from(vec![1, 2, 3])),
253                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
254            ],
255        )
256        .unwrap();
257
258        // Write to Parquet
259        let props = Default::default();
260        let file_handle = File::create(file.path()).unwrap();
261        let mut writer = ArrowWriter::try_new(file_handle, schema, props).unwrap();
262        writer.write(&batch).unwrap();
263        writer.close().unwrap();
264
265        file
266    }
267
268    #[tokio::test]
269    async fn test_parquet_source_single_file() {
270        let file = create_test_parquet();
271        let source = ParquetSource::new(file.path().to_str().unwrap()).unwrap();
272
273        assert_eq!(source.paths.len(), 1);
274        assert!(source.description().contains("Parquet file"));
275    }
276
277    #[tokio::test]
278    async fn test_parquet_source_with_options() {
279        let file = create_test_parquet();
280        let options = ParquetOptions {
281            enable_pruning: false,
282            batch_size: 16384,
283            ..Default::default()
284        };
285
286        let source = ParquetSource::with_options(file.path().to_str().unwrap(), options).unwrap();
287        assert!(!source.options.enable_pruning);
288        assert_eq!(source.options.batch_size, 16384);
289    }
290
291    #[tokio::test]
292    async fn test_parquet_source_multiple_files() {
293        let file1 = create_test_parquet();
294        let file2 = create_test_parquet();
295
296        let paths = vec![
297            file1.path().to_str().unwrap().to_string(),
298            file2.path().to_str().unwrap().to_string(),
299        ];
300
301        let source = ParquetSource::from_paths(paths).unwrap();
302        assert_eq!(source.paths.len(), 2);
303        assert!(source.description().contains("2 files"));
304    }
305
306    #[tokio::test]
307    async fn test_parquet_metadata_reading() {
308        let file = create_test_parquet();
309        let mut source = ParquetSource::new(file.path().to_str().unwrap()).unwrap();
310
311        let schema = source.read_metadata_schema().await.unwrap();
312        assert_eq!(schema.fields().len(), 2);
313        assert_eq!(schema.field(0).name(), "id");
314        assert_eq!(schema.field(1).name(), "name");
315    }
316
317    #[tokio::test]
318    async fn test_parquet_registration() {
319        let file = create_test_parquet();
320        let source = ParquetSource::new(file.path().to_str().unwrap()).unwrap();
321
322        let ctx = SessionContext::new();
323        source.register(&ctx, "test_table").await.unwrap();
324
325        // Verify table is registered
326        let df = ctx
327            .sql("SELECT COUNT(*) as count FROM test_table")
328            .await
329            .unwrap();
330        let batches = df.collect().await.unwrap();
331        assert!(!batches.is_empty());
332    }
333}