Skip to main content

plotlars_core/io/
parquet.rs

1use std::path::{Path, PathBuf};
2
3use polars::frame::DataFrame;
4use polars::prelude::*;
5
6use super::PlotlarsError;
7
8/// A Parquet file reader.
9///
10/// Uses a fluent builder pattern: construct with [`ParquetReader::new`], chain
11/// optional configuration methods, then call [`ParquetReader::finish`] to load
12/// the data into a [`DataFrame`].
13///
14/// # Example
15///
16/// ```rust,no_run
17/// use plotlars_core::io::ParquetReader;
18///
19/// let df = ParquetReader::new("data/sales.parquet").finish().unwrap();
20/// ```
21#[derive(Clone)]
22pub struct ParquetReader {
23    path: PathBuf,
24    columns: Option<Vec<String>>,
25    n_rows: Option<usize>,
26}
27
28impl ParquetReader {
29    /// Create a new Parquet reader for the given file path.
30    pub fn new(path: impl AsRef<Path>) -> Self {
31        Self {
32            path: path.as_ref().to_path_buf(),
33            columns: None,
34            n_rows: None,
35        }
36    }
37
38    /// Select specific columns to load (projection pushdown).
39    pub fn columns(mut self, columns: Vec<&str>) -> Self {
40        self.columns = Some(columns.into_iter().map(|s| s.to_string()).collect());
41        self
42    }
43
44    /// Limit the number of rows to read.
45    pub fn n_rows(mut self, n_rows: usize) -> Self {
46        self.n_rows = Some(n_rows);
47        self
48    }
49
50    /// Execute the read and return a [`DataFrame`].
51    ///
52    /// # Errors
53    ///
54    /// Returns [`PlotlarsError::ParquetParse`] if the file cannot be read or parsed.
55    pub fn finish(self) -> Result<DataFrame, PlotlarsError> {
56        let path_str = self.path.display().to_string();
57
58        let mut args = ScanArgsParquet::default();
59
60        if let Some(n) = self.n_rows {
61            args.n_rows = Some(n);
62        }
63
64        let mut lf = LazyFrame::scan_parquet(PlRefPath::new(&path_str), args).map_err(|e| {
65            PlotlarsError::ParquetParse {
66                path: path_str.clone(),
67                source: Box::new(e),
68            }
69        })?;
70
71        if let Some(cols) = self.columns {
72            let exprs: Vec<Expr> = cols.iter().map(col).collect();
73            lf = lf.select(exprs);
74        }
75
76        lf.collect().map_err(|e| PlotlarsError::ParquetParse {
77            path: path_str,
78            source: Box::new(e),
79        })
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    fn create_test_parquet() -> PathBuf {
88        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../target/test_data.parquet");
89
90        let mut df = df!(
91            "a" => [1, 2, 3],
92            "b" => ["x", "y", "z"]
93        )
94        .unwrap();
95
96        let file = std::fs::File::create(&path).unwrap();
97        ParquetWriter::new(file).finish(&mut df).unwrap();
98        path
99    }
100
101    #[test]
102    fn read_parquet_default() {
103        let path = create_test_parquet();
104        let df = ParquetReader::new(&path).finish().unwrap();
105        assert_eq!(df.height(), 3);
106        assert_eq!(df.width(), 2);
107    }
108
109    #[test]
110    fn read_parquet_select_columns() {
111        let path = create_test_parquet();
112        let df = ParquetReader::new(&path)
113            .columns(vec!["a"])
114            .finish()
115            .unwrap();
116        assert_eq!(df.width(), 1);
117    }
118
119    #[test]
120    fn read_parquet_n_rows() {
121        let path = create_test_parquet();
122        let df = ParquetReader::new(&path).n_rows(2).finish().unwrap();
123        assert_eq!(df.height(), 2);
124    }
125
126    #[test]
127    fn read_parquet_file_not_found() {
128        let result = ParquetReader::new("nonexistent.parquet").finish();
129        assert!(result.is_err());
130    }
131}