xls_rs/columnar/
parquet.rs1use anyhow::{Context, Result};
4use std::fs::File;
5use std::sync::Arc;
6
7use arrow_array::{ArrayRef, BooleanArray, Float64Array, Int64Array, RecordBatch, StringArray};
8use arrow_schema::{DataType, Field, Schema};
9use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::csv_handler::CellRange;
14use crate::helpers::{default_column_names, filter_by_range, max_column_count};
15use crate::traits::{DataReader, DataWriteOptions, DataWriter, FileHandler, SchemaProvider};
16
17#[derive(Default)]
19pub struct ParquetHandler;
20
21impl ParquetHandler {
22 pub fn new() -> Self {
23 Self::default()
24 }
25
26 pub fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
28 let file =
29 File::open(path).with_context(|| format!("Failed to open Parquet file: {path}"))?;
30
31 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
32 let reader = builder.build()?;
33
34 let mut all_rows: Vec<Vec<String>> = Vec::new();
35
36 for batch_result in reader {
37 let batch = batch_result?;
38 let num_rows = batch.num_rows();
39 let num_cols = batch.num_columns();
40
41 for row_idx in 0..num_rows {
42 let mut row: Vec<String> = Vec::with_capacity(num_cols);
43 for col_idx in 0..num_cols {
44 let col = batch.column(col_idx);
45 let value = self.array_value_to_string(col, row_idx);
46 row.push(value);
47 }
48 all_rows.push(row);
49 }
50 }
51
52 Ok(all_rows)
53 }
54
55 pub fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
57 let file =
58 File::open(path).with_context(|| format!("Failed to open Parquet file: {path}"))?;
59
60 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
61 let schema = builder.schema().clone();
62 let reader = builder.build()?;
63
64 let mut all_rows: Vec<Vec<String>> = Vec::new();
65
66 let headers: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
68 all_rows.push(headers);
69
70 for batch_result in reader {
71 let batch = batch_result?;
72 let num_rows = batch.num_rows();
73 let num_cols = batch.num_columns();
74
75 for row_idx in 0..num_rows {
76 let mut row: Vec<String> = Vec::with_capacity(num_cols);
77 for col_idx in 0..num_cols {
78 let col = batch.column(col_idx);
79 let value = self.array_value_to_string(col, row_idx);
80 row.push(value);
81 }
82 all_rows.push(row);
83 }
84 }
85
86 Ok(all_rows)
87 }
88
89 pub fn write(
91 &self,
92 path: &str,
93 data: &[Vec<String>],
94 column_names: Option<&[String]>,
95 ) -> Result<()> {
96 let num_cols = match column_names {
97 Some(names) if names.is_empty() => {
98 anyhow::bail!("Column names cannot be empty");
99 }
100 Some(names) => names.len(),
101 None if data.is_empty() => {
102 anyhow::bail!("Cannot write empty data to Parquet");
103 }
104 None => max_column_count(data),
105 };
106
107 if num_cols == 0 {
108 anyhow::bail!("Cannot write Parquet with zero columns");
109 }
110
111 let col_names: Vec<String> = column_names
113 .map(|names| names.to_vec())
114 .unwrap_or_else(|| default_column_names(num_cols, "col"));
115
116 let fields: Vec<Field> = col_names
118 .iter()
119 .map(|name| Field::new(name, DataType::Utf8, true))
120 .collect();
121 let schema = Arc::new(Schema::new(fields));
122
123 let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_cols);
125 for col_idx in 0..num_cols {
126 let values: Vec<Option<&str>> = data
127 .iter()
128 .map(|row| row.get(col_idx).map(|s| s.as_str()))
129 .collect();
130 let array = StringArray::from(values);
131 columns.push(Arc::new(array));
132 }
133
134 let batch = RecordBatch::try_new(schema.clone(), columns)?;
135
136 let file =
137 File::create(path).with_context(|| format!("Failed to create Parquet file: {path}"))?;
138
139 let props = WriterProperties::builder().build();
140 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
141 writer.write(&batch)?;
142 writer.close()?;
143
144 Ok(())
145 }
146
147 pub fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
149 let file =
150 File::open(path).with_context(|| format!("Failed to open Parquet file: {path}"))?;
151
152 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
153 let schema = builder.schema();
154
155 let fields: Vec<(String, String)> = schema
156 .fields()
157 .iter()
158 .map(|f| (f.name().clone(), format!("{:?}", f.data_type())))
159 .collect();
160
161 Ok(fields)
162 }
163
164 fn array_value_to_string(&self, array: &ArrayRef, idx: usize) -> String {
165 if array.is_null(idx) {
166 return String::new();
167 }
168
169 match array.data_type() {
170 DataType::Utf8 => array
171 .as_any()
172 .downcast_ref::<StringArray>()
173 .map(|arr| arr.value(idx).to_string())
174 .unwrap_or_else(|| format!("{:?}", array)),
175 DataType::LargeUtf8 => array
176 .as_any()
177 .downcast_ref::<arrow_array::LargeStringArray>()
178 .map(|arr| arr.value(idx).to_string())
179 .unwrap_or_else(|| format!("{:?}", array)),
180 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
181 array
182 .as_any()
183 .downcast_ref::<Int64Array>()
184 .map(|a| a.value(idx).to_string())
185 .unwrap_or_else(|| format!("{:?}", array))
186 }
187 DataType::Float32 | DataType::Float64 => array
188 .as_any()
189 .downcast_ref::<Float64Array>()
190 .map(|a| a.value(idx).to_string())
191 .unwrap_or_else(|| format!("{:?}", array)),
192 DataType::Boolean => array
193 .as_any()
194 .downcast_ref::<BooleanArray>()
195 .map(|arr| arr.value(idx).to_string())
196 .unwrap_or_else(|| format!("{:?}", array)),
197 _ => format!("{:?}", array.data_type()),
198 }
199 }
200}
201
202impl DataReader for ParquetHandler {
203 fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
204 self.read(path)
205 }
206
207 fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
208 self.read_with_headers(path)
209 }
210
211 fn read_range(&self, path: &str, range: &CellRange) -> Result<Vec<Vec<String>>> {
212 let all_data = self.read(path)?;
213 Ok(filter_by_range(&all_data, range))
214 }
215
216 fn read_as_json(&self, path: &str) -> Result<String> {
217 let data = self.read(path)?;
218 serde_json::to_string_pretty(&data).with_context(|| "Failed to serialize to JSON")
219 }
220
221 fn supports_format(&self, path: &str) -> bool {
222 path.to_lowercase().ends_with(".parquet")
223 }
224}
225
226impl DataWriter for ParquetHandler {
227 fn write(&self, path: &str, data: &[Vec<String>], options: DataWriteOptions) -> Result<()> {
228 if let Some(ref names) = options.column_names {
229 return self.write(path, data, Some(names.as_slice()));
230 }
231 if options.include_headers && !data.is_empty() {
232 let body = data.get(1..).unwrap_or(&[]);
233 return self.write(path, body, Some(&data[0]));
234 }
235 self.write(path, data, None)
236 }
237
238 fn write_range(
239 &self,
240 path: &str,
241 data: &[Vec<String>],
242 _start_row: usize,
243 _start_col: usize,
244 ) -> Result<()> {
245 self.write(path, data, None)
247 }
248
249 fn append(&self, _path: &str, _data: &[Vec<String>]) -> Result<()> {
250 anyhow::bail!("Append operation not supported for Parquet files")
251 }
252
253 fn supports_format(&self, path: &str) -> bool {
254 path.to_lowercase().ends_with(".parquet")
255 }
256}
257
258impl FileHandler for ParquetHandler {
259 fn format_name(&self) -> &'static str {
260 "parquet"
261 }
262
263 fn supported_extensions(&self) -> &'static [&'static str] {
264 &["parquet"]
265 }
266}
267
268impl SchemaProvider for ParquetHandler {
269 fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
270 self.get_schema(path)
271 }
272
273 fn get_column_names(&self, path: &str) -> Result<Vec<String>> {
274 let schema = self.get_schema(path)?;
275 Ok(schema.into_iter().map(|(name, _)| name).collect())
276 }
277
278 fn get_row_count(&self, path: &str) -> Result<usize> {
279 let data = self.read(path)?;
280 Ok(data.len())
281 }
282
283 fn get_column_count(&self, path: &str) -> Result<usize> {
284 let data = self.read(path)?;
285 Ok(data.first().map(|r| r.len()).unwrap_or(0))
286 }
287}