Skip to main content

plotlars_core/io/
parquet.rs

1use std::fs::File;
2use std::path::{Path, PathBuf};
3
4use polars::frame::DataFrame;
5use polars::prelude::ParquetReader as PlParquetReader;
6use polars::prelude::*;
7
8use super::PlotlarsError;
9
10/// A Parquet file reader.
11///
12/// Uses a fluent builder pattern: construct with [`ParquetReader::new`], chain
13/// optional configuration methods, then call [`ParquetReader::finish`] to load
14/// the data into a [`DataFrame`].
15///
16/// # Example
17///
18/// ```rust,no_run
19/// use plotlars_core::io::ParquetReader;
20///
21/// let df = ParquetReader::new("data/sales.parquet").finish().unwrap();
22/// ```
23#[derive(Clone)]
24pub struct ParquetReader {
25    path: PathBuf,
26    columns: Option<Vec<String>>,
27    n_rows: Option<usize>,
28}
29
30impl ParquetReader {
31    /// Create a new Parquet reader for the given file path.
32    pub fn new(path: impl AsRef<Path>) -> Self {
33        Self {
34            path: path.as_ref().to_path_buf(),
35            columns: None,
36            n_rows: None,
37        }
38    }
39
40    /// Select specific columns to load (projection pushdown).
41    pub fn columns(mut self, columns: Vec<&str>) -> Self {
42        self.columns = Some(columns.into_iter().map(|s| s.to_string()).collect());
43        self
44    }
45
46    /// Limit the number of rows to read.
47    pub fn n_rows(mut self, n_rows: usize) -> Self {
48        self.n_rows = Some(n_rows);
49        self
50    }
51
52    /// Execute the read and return a [`DataFrame`].
53    ///
54    /// # Errors
55    ///
56    /// Returns [`PlotlarsError::ParquetParse`] if the file cannot be read or parsed.
57    pub fn finish(self) -> Result<DataFrame, PlotlarsError> {
58        let path_str = self.path.display().to_string();
59
60        let file = File::open(&self.path).map_err(|e| PlotlarsError::ParquetParse {
61            path: path_str.clone(),
62            source: Box::new(e),
63        })?;
64
65        let mut reader = PlParquetReader::new(file);
66
67        if let Some(n) = self.n_rows {
68            reader = reader.with_slice(Some((0, n)));
69        }
70
71        if let Some(cols) = self.columns {
72            reader = reader.with_columns(Some(cols));
73        }
74
75        reader.finish().map_err(|e| PlotlarsError::ParquetParse {
76            path: path_str,
77            source: Box::new(e),
78        })
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    fn create_test_parquet() -> PathBuf {
87        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../target/test_data.parquet");
88
89        let mut df = df!(
90            "a" => [1, 2, 3],
91            "b" => ["x", "y", "z"]
92        )
93        .unwrap();
94
95        let file = std::fs::File::create(&path).unwrap();
96        ParquetWriter::new(file).finish(&mut df).unwrap();
97        path
98    }
99
100    #[test]
101    fn read_parquet_default() {
102        let path = create_test_parquet();
103        let df = ParquetReader::new(&path).finish().unwrap();
104        assert_eq!(df.height(), 3);
105        assert_eq!(df.width(), 2);
106    }
107
108    #[test]
109    fn read_parquet_select_columns() {
110        let path = create_test_parquet();
111        let df = ParquetReader::new(&path)
112            .columns(vec!["a"])
113            .finish()
114            .unwrap();
115        assert_eq!(df.width(), 1);
116    }
117
118    #[test]
119    fn read_parquet_n_rows() {
120        let path = create_test_parquet();
121        let df = ParquetReader::new(&path).n_rows(2).finish().unwrap();
122        assert_eq!(df.height(), 2);
123    }
124
125    #[test]
126    fn read_parquet_file_not_found() {
127        let result = ParquetReader::new("nonexistent.parquet").finish();
128        assert!(result.is_err());
129    }
130}