Skip to main content

xls_rs/columnar/
parquet.rs

1//! Parquet file handling
2
3use anyhow::{Context, Result};
4use std::fs::File;
5use std::sync::Arc;
6
7use arrow_array::{ArrayRef, BooleanArray, Float64Array, Int64Array, RecordBatch, StringArray};
8use arrow_schema::{DataType, Field, Schema};
9use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::csv_handler::CellRange;
14use crate::helpers::{default_column_names, filter_by_range, max_column_count};
15use crate::traits::{DataReader, DataWriteOptions, DataWriter, FileHandler, SchemaProvider};
16
17/// Handler for Parquet files
18#[derive(Default)]
19pub struct ParquetHandler;
20
21impl ParquetHandler {
22    pub fn new() -> Self {
23        Self::default()
24    }
25
26    /// Read Parquet file into `Vec<Vec<String>>`
27    pub fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
28        let file =
29            File::open(path).with_context(|| format!("Failed to open Parquet file: {path}"))?;
30
31        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
32        let reader = builder.build()?;
33
34        let mut all_rows: Vec<Vec<String>> = Vec::new();
35
36        for batch_result in reader {
37            let batch = batch_result?;
38            let num_rows = batch.num_rows();
39            let num_cols = batch.num_columns();
40
41            for row_idx in 0..num_rows {
42                let mut row: Vec<String> = Vec::with_capacity(num_cols);
43                for col_idx in 0..num_cols {
44                    let col = batch.column(col_idx);
45                    let value = self.array_value_to_string(col, row_idx);
46                    row.push(value);
47                }
48                all_rows.push(row);
49            }
50        }
51
52        Ok(all_rows)
53    }
54
55    /// Read Parquet file with column names as first row
56    pub fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
57        let file =
58            File::open(path).with_context(|| format!("Failed to open Parquet file: {path}"))?;
59
60        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
61        let schema = builder.schema().clone();
62        let reader = builder.build()?;
63
64        let mut all_rows: Vec<Vec<String>> = Vec::new();
65
66        // Add header row
67        let headers: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
68        all_rows.push(headers);
69
70        for batch_result in reader {
71            let batch = batch_result?;
72            let num_rows = batch.num_rows();
73            let num_cols = batch.num_columns();
74
75            for row_idx in 0..num_rows {
76                let mut row: Vec<String> = Vec::with_capacity(num_cols);
77                for col_idx in 0..num_cols {
78                    let col = batch.column(col_idx);
79                    let value = self.array_value_to_string(col, row_idx);
80                    row.push(value);
81                }
82                all_rows.push(row);
83            }
84        }
85
86        Ok(all_rows)
87    }
88
89    /// Write data to Parquet file (all columns as strings)
90    pub fn write(
91        &self,
92        path: &str,
93        data: &[Vec<String>],
94        column_names: Option<&[String]>,
95    ) -> Result<()> {
96        let num_cols = match column_names {
97            Some(names) if names.is_empty() => {
98                anyhow::bail!("Column names cannot be empty");
99            }
100            Some(names) => names.len(),
101            None if data.is_empty() => {
102                anyhow::bail!("Cannot write empty data to Parquet");
103            }
104            None => max_column_count(data),
105        };
106
107        if num_cols == 0 {
108            anyhow::bail!("Cannot write Parquet with zero columns");
109        }
110
111        // Generate column names if not provided
112        let col_names: Vec<String> = column_names
113            .map(|names| names.to_vec())
114            .unwrap_or_else(|| default_column_names(num_cols, "col"));
115
116        // Create schema with string columns
117        let fields: Vec<Field> = col_names
118            .iter()
119            .map(|name| Field::new(name, DataType::Utf8, true))
120            .collect();
121        let schema = Arc::new(Schema::new(fields));
122
123        // Build column arrays
124        let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_cols);
125        for col_idx in 0..num_cols {
126            let values: Vec<Option<&str>> = data
127                .iter()
128                .map(|row| row.get(col_idx).map(|s| s.as_str()))
129                .collect();
130            let array = StringArray::from(values);
131            columns.push(Arc::new(array));
132        }
133
134        let batch = RecordBatch::try_new(schema.clone(), columns)?;
135
136        let file =
137            File::create(path).with_context(|| format!("Failed to create Parquet file: {path}"))?;
138
139        let props = WriterProperties::builder().build();
140        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
141        writer.write(&batch)?;
142        writer.close()?;
143
144        Ok(())
145    }
146
147    /// Get schema information from Parquet file
148    pub fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
149        let file =
150            File::open(path).with_context(|| format!("Failed to open Parquet file: {path}"))?;
151
152        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
153        let schema = builder.schema();
154
155        let fields: Vec<(String, String)> = schema
156            .fields()
157            .iter()
158            .map(|f| (f.name().clone(), format!("{:?}", f.data_type())))
159            .collect();
160
161        Ok(fields)
162    }
163
164    fn array_value_to_string(&self, array: &ArrayRef, idx: usize) -> String {
165        if array.is_null(idx) {
166            return String::new();
167        }
168
169        match array.data_type() {
170            DataType::Utf8 => array
171                .as_any()
172                .downcast_ref::<StringArray>()
173                .map(|arr| arr.value(idx).to_string())
174                .unwrap_or_else(|| format!("{:?}", array)),
175            DataType::LargeUtf8 => array
176                .as_any()
177                .downcast_ref::<arrow_array::LargeStringArray>()
178                .map(|arr| arr.value(idx).to_string())
179                .unwrap_or_else(|| format!("{:?}", array)),
180            DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
181                array
182                    .as_any()
183                    .downcast_ref::<Int64Array>()
184                    .map(|a| a.value(idx).to_string())
185                    .unwrap_or_else(|| format!("{:?}", array))
186            }
187            DataType::Float32 | DataType::Float64 => array
188                .as_any()
189                .downcast_ref::<Float64Array>()
190                .map(|a| a.value(idx).to_string())
191                .unwrap_or_else(|| format!("{:?}", array)),
192            DataType::Boolean => array
193                .as_any()
194                .downcast_ref::<BooleanArray>()
195                .map(|arr| arr.value(idx).to_string())
196                .unwrap_or_else(|| format!("{:?}", array)),
197            _ => format!("{:?}", array.data_type()),
198        }
199    }
200}
201
202impl DataReader for ParquetHandler {
203    fn read(&self, path: &str) -> Result<Vec<Vec<String>>> {
204        self.read(path)
205    }
206
207    fn read_with_headers(&self, path: &str) -> Result<Vec<Vec<String>>> {
208        self.read_with_headers(path)
209    }
210
211    fn read_range(&self, path: &str, range: &CellRange) -> Result<Vec<Vec<String>>> {
212        let all_data = self.read(path)?;
213        Ok(filter_by_range(&all_data, range))
214    }
215
216    fn read_as_json(&self, path: &str) -> Result<String> {
217        let data = self.read(path)?;
218        serde_json::to_string_pretty(&data).with_context(|| "Failed to serialize to JSON")
219    }
220
221    fn supports_format(&self, path: &str) -> bool {
222        path.to_lowercase().ends_with(".parquet")
223    }
224}
225
226impl DataWriter for ParquetHandler {
227    fn write(&self, path: &str, data: &[Vec<String>], options: DataWriteOptions) -> Result<()> {
228        if let Some(ref names) = options.column_names {
229            return self.write(path, data, Some(names.as_slice()));
230        }
231        if options.include_headers && !data.is_empty() {
232            let body = data.get(1..).unwrap_or(&[]);
233            return self.write(path, body, Some(&data[0]));
234        }
235        self.write(path, data, None)
236    }
237
238    fn write_range(
239        &self,
240        path: &str,
241        data: &[Vec<String>],
242        _start_row: usize,
243        _start_col: usize,
244    ) -> Result<()> {
245        // For Parquet, we write the entire dataset
246        self.write(path, data, None)
247    }
248
249    fn append(&self, _path: &str, _data: &[Vec<String>]) -> Result<()> {
250        anyhow::bail!("Append operation not supported for Parquet files")
251    }
252
253    fn supports_format(&self, path: &str) -> bool {
254        path.to_lowercase().ends_with(".parquet")
255    }
256}
257
258impl FileHandler for ParquetHandler {
259    fn format_name(&self) -> &'static str {
260        "parquet"
261    }
262
263    fn supported_extensions(&self) -> &'static [&'static str] {
264        &["parquet"]
265    }
266}
267
268impl SchemaProvider for ParquetHandler {
269    fn get_schema(&self, path: &str) -> Result<Vec<(String, String)>> {
270        self.get_schema(path)
271    }
272
273    fn get_column_names(&self, path: &str) -> Result<Vec<String>> {
274        let schema = self.get_schema(path)?;
275        Ok(schema.into_iter().map(|(name, _)| name).collect())
276    }
277
278    fn get_row_count(&self, path: &str) -> Result<usize> {
279        let data = self.read(path)?;
280        Ok(data.len())
281    }
282
283    fn get_column_count(&self, path: &str) -> Result<usize> {
284        let data = self.read(path)?;
285        Ok(data.first().map(|r| r.len()).unwrap_or(0))
286    }
287}