parq/
lib.rs

1use parquet::file::reader::{FileReader, SerializedFileReader};
2use parquet::record::reader::RowIter;
3use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
4use pyo3::prelude::*;
5use pyo3::types::{PyBool, PyDict, PyList};
6use serde_json::Value;
7use std::{fs::File, path::Path};
8
9struct PyValue(Value);
10
11impl ToPyObject for PyValue {
12    fn to_object(&self, py: Python) -> PyObject {
13        match value_to_py_object(py, &self.0) {
14            Ok(obj) => obj,
15            Err(_) => py.None(), // Fallback to None in case of error, adjust as needed
16        }
17    }
18}
19
20/// Converts a serde_json `Value` to a PyObject.
21fn value_to_py_object(py: Python, value: &Value) -> PyResult<PyObject> {
22    match value {
23        Value::Null => Ok(py.None()),
24        Value::Bool(b) => Ok(PyBool::new_bound(py, *b).into_py(py)), // Adjusted for PyBool
25        Value::Number(num) => {
26            if let Some(i) = num.as_i64() {
27                Ok(i.into_py(py))
28            } else if let Some(f) = num.as_f64() {
29                Ok(f.into_py(py))
30            } else {
31                Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
32                    "Unsupported number type",
33                ))
34            }
35        }
36        Value::String(s) => Ok(s.into_py(py)),
37        Value::Array(arr) => {
38            let py_list = PyList::empty_bound(py);
39            for item in arr {
40                py_list.append(value_to_py_object(py, item)?)?;
41            }
42            Ok(py_list.into_py(py))
43        }
44        Value::Object(obj) => {
45            let py_dict = PyDict::new_bound(py); // Correct usage of PyDict
46            for (k, v) in obj {
47                py_dict.set_item(k, value_to_py_object(py, v)?)?;
48            }
49            Ok(py_dict.into_py(py))
50        }
51    }
52}
53
54/// to_csv_str(path: str) -> str
55/// --
56///
57/// Read parquet file and convert to csv string.
58#[pyfunction]
59fn to_csv_str(path: &str) -> PyResult<String> {
60    let file_path = Path::new(path);
61    let file = File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
62    let reader = SerializedFileReader::new(file).map_err(|e| PyValueError::new_err(e.to_string()))?;
63    let metadata = reader.metadata();
64    let schema = metadata.file_metadata().schema();
65
66    let mut wtr = csv::Writer::from_writer(vec![]);
67    let fields = schema.get_fields();
68    let headers: Vec<String> = fields.iter().map(|f| f.name().to_string()).collect();
69    wtr.write_record(&headers).map_err(|e| PyValueError::new_err(e.to_string()))?;
70
71    let row_iter = reader.get_row_iter(None).map_err(|e| PyValueError::new_err(e.to_string()))?;
72    for row_result in row_iter {
73        let row = row_result.map_err(|e| PyValueError::new_err(e.to_string()))?;
74        let csv_record: Vec<String> = row.get_column_iter().map(|(_col_idx, col)| col.to_string()).collect();
75        wtr.write_record(&csv_record).map_err(|e| PyValueError::new_err(e.to_string()))?;
76    }
77
78    wtr.flush().map_err(|e| PyValueError::new_err(e.to_string()))?;
79    let csv_data = String::from_utf8(wtr.into_inner().map_err(|e| PyValueError::new_err(e.to_string()))?).map_err(|e| PyValueError::new_err(e.to_string()))?;
80    Ok(csv_data)
81}
82
83/// to_json_str(path: str) -> str
84/// --
85///
86/// Read parquet file and convert to JSON string.
87#[pyfunction]
88fn to_json_str(path: &str) -> PyResult<String> {
89    let file_path = Path::new(path);
90    let file =
91        File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
92    let reader = SerializedFileReader::new(file)
93        .map_err(|e| PyValueError::new_err(e.to_string()))?;
94
95    // iterate through reader and add to json list string
96    let mut json_str = "[".to_string();
97    for row in reader.get_row_iter(None).unwrap() {
98        json_str.push_str(&row.unwrap().to_json_value().to_string());
99        json_str.push_str(",");
100    }
101
102    json_str.pop();
103    json_str.push_str("]");
104
105    // return json string
106    return Ok(json_str);
107}
108
109/// ParquetRowIterator
110/// --
111///
112/// Iterator over rows in parquet file.
113#[pyclass]
114struct ParquetRowIterator {
115    iter: RowIter<'static>,
116}
117
118#[pymethods]
119impl ParquetRowIterator {
120    #[new]
121    fn new(path: &str) -> PyResult<Self> {
122        let file_path = Path::new(path);
123        let file = File::open(&file_path)
124            .map_err(|e| PyIOError::new_err(e.to_string()))?;
125        let reader = SerializedFileReader::new(file)
126            .map_err(|e| PyValueError::new_err(e.to_string()))?;
127
128        Ok(Self {
129            iter: RowIter::from_file_into(Box::new(reader)),
130        })
131    }
132
133    fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
134        slf
135    }
136
137    fn __next__(mut slf: PyRefMut<Self>) -> PyResult<PyObject> {
138        let row = slf
139            .iter
140            .next()
141            .ok_or_else(|| PyErr::new::<PyStopIteration, _>("No more rows in parquet file"))?;
142        let row_dict = row.unwrap().to_json_value();
143        let dict = PyDict::new_bound(slf.py());
144        for (key, value) in row_dict.as_object().unwrap() {
145            dict.set_item(key, PyValue(value.clone()))?;
146        }
147        Ok(dict.into())
148    }
149}
150
151/// to_iter(path: str) -> ParquetRowIterator
152/// --
153///
154/// Return iterator over rows in parquet file.
155#[pyfunction]
156fn to_iter(path: &str) -> PyResult<ParquetRowIterator> {
157    let file_path = Path::new(path);
158    let file =
159        File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
160    let reader = SerializedFileReader::new(file)
161        .map_err(|e| PyValueError::new_err(e.to_string()))?;
162
163    Ok(ParquetRowIterator {
164        iter: RowIter::from_file_into(Box::new(reader)),
165    })
166}
167
168/// to_list(path: str) -> List[Dict[str, Any]]
169/// --
170///
171/// Read parquet file and convert to list of dictionaries.
172#[pyfunction]
173fn to_list(path: &str, py: Python) -> PyResult<PyObject> {
174    let file_path = Path::new(path);
175    let file =
176        File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
177    let reader = SerializedFileReader::new(file)
178        .map_err(|e| PyValueError::new_err(e.to_string()))?;
179    let list = PyList::empty_bound(py);
180    for row in reader.get_row_iter(None).unwrap() {
181        let row_dict = row.unwrap().to_json_value();
182        let dict = PyDict::new_bound(py);
183        for (key, value) in row_dict.as_object().unwrap() {
184            dict.set_item(key, PyValue(value.clone()))?;
185            list.append(&dict)?;
186        }
187    }
188
189    Ok(list.into())
190}
191
192/// A Parquet file reader and converter, written in Rust.
193#[pymodule]
194fn lib(m: &Bound<'_, PyModule>) -> PyResult<()> {
195    m.add_function(wrap_pyfunction!(to_json_str, m)?)?;
196    m.add_function(wrap_pyfunction!(to_csv_str, m)?)?;
197    m.add_function(wrap_pyfunction!(to_list, m)?)?;
198    m.add_function(wrap_pyfunction!(to_iter, m)?)?;
199    m.add_class::<ParquetRowIterator>()?;
200    Ok(())
201}