plotlars_core/io/
parquet.rs1use std::path::{Path, PathBuf};
2
3use polars::frame::DataFrame;
4use polars::prelude::*;
5
6use super::PlotlarsError;
7
8#[derive(Clone)]
22pub struct ParquetReader {
23 path: PathBuf,
24 columns: Option<Vec<String>>,
25 n_rows: Option<usize>,
26}
27
28impl ParquetReader {
29 pub fn new(path: impl AsRef<Path>) -> Self {
31 Self {
32 path: path.as_ref().to_path_buf(),
33 columns: None,
34 n_rows: None,
35 }
36 }
37
38 pub fn columns(mut self, columns: Vec<&str>) -> Self {
40 self.columns = Some(columns.into_iter().map(|s| s.to_string()).collect());
41 self
42 }
43
44 pub fn n_rows(mut self, n_rows: usize) -> Self {
46 self.n_rows = Some(n_rows);
47 self
48 }
49
50 pub fn finish(self) -> Result<DataFrame, PlotlarsError> {
56 let path_str = self.path.display().to_string();
57
58 let mut args = ScanArgsParquet::default();
59
60 if let Some(n) = self.n_rows {
61 args.n_rows = Some(n);
62 }
63
64 let mut lf = LazyFrame::scan_parquet(PlRefPath::new(&path_str), args).map_err(|e| {
65 PlotlarsError::ParquetParse {
66 path: path_str.clone(),
67 source: Box::new(e),
68 }
69 })?;
70
71 if let Some(cols) = self.columns {
72 let exprs: Vec<Expr> = cols.iter().map(col).collect();
73 lf = lf.select(exprs);
74 }
75
76 lf.collect().map_err(|e| PlotlarsError::ParquetParse {
77 path: path_str,
78 source: Box::new(e),
79 })
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86
87 fn create_test_parquet() -> PathBuf {
88 let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../target/test_data.parquet");
89
90 let mut df = df!(
91 "a" => [1, 2, 3],
92 "b" => ["x", "y", "z"]
93 )
94 .unwrap();
95
96 let file = std::fs::File::create(&path).unwrap();
97 ParquetWriter::new(file).finish(&mut df).unwrap();
98 path
99 }
100
101 #[test]
102 fn read_parquet_default() {
103 let path = create_test_parquet();
104 let df = ParquetReader::new(&path).finish().unwrap();
105 assert_eq!(df.height(), 3);
106 assert_eq!(df.width(), 2);
107 }
108
109 #[test]
110 fn read_parquet_select_columns() {
111 let path = create_test_parquet();
112 let df = ParquetReader::new(&path)
113 .columns(vec!["a"])
114 .finish()
115 .unwrap();
116 assert_eq!(df.width(), 1);
117 }
118
119 #[test]
120 fn read_parquet_n_rows() {
121 let path = create_test_parquet();
122 let df = ParquetReader::new(&path).n_rows(2).finish().unwrap();
123 assert_eq!(df.height(), 2);
124 }
125
126 #[test]
127 fn read_parquet_file_not_found() {
128 let result = ParquetReader::new("nonexistent.parquet").finish();
129 assert!(result.is_err());
130 }
131}