1use parquet::file::reader::{FileReader, SerializedFileReader};
2use parquet::record::reader::RowIter;
3use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
4use pyo3::prelude::*;
5use pyo3::types::{PyBool, PyDict, PyList};
6use serde_json::Value;
7use std::{fs::File, path::Path};
8
9struct PyValue(Value);
10
11impl ToPyObject for PyValue {
12 fn to_object(&self, py: Python) -> PyObject {
13 match value_to_py_object(py, &self.0) {
14 Ok(obj) => obj,
15 Err(_) => py.None(), }
17 }
18}
19
20fn value_to_py_object(py: Python, value: &Value) -> PyResult<PyObject> {
22 match value {
23 Value::Null => Ok(py.None()),
24 Value::Bool(b) => Ok(PyBool::new_bound(py, *b).into_py(py)), Value::Number(num) => {
26 if let Some(i) = num.as_i64() {
27 Ok(i.into_py(py))
28 } else if let Some(f) = num.as_f64() {
29 Ok(f.into_py(py))
30 } else {
31 Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
32 "Unsupported number type",
33 ))
34 }
35 }
36 Value::String(s) => Ok(s.into_py(py)),
37 Value::Array(arr) => {
38 let py_list = PyList::empty_bound(py);
39 for item in arr {
40 py_list.append(value_to_py_object(py, item)?)?;
41 }
42 Ok(py_list.into_py(py))
43 }
44 Value::Object(obj) => {
45 let py_dict = PyDict::new_bound(py); for (k, v) in obj {
47 py_dict.set_item(k, value_to_py_object(py, v)?)?;
48 }
49 Ok(py_dict.into_py(py))
50 }
51 }
52}
53
54#[pyfunction]
59fn to_csv_str(path: &str) -> PyResult<String> {
60 let file_path = Path::new(path);
61 let file = File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
62 let reader = SerializedFileReader::new(file).map_err(|e| PyValueError::new_err(e.to_string()))?;
63 let metadata = reader.metadata();
64 let schema = metadata.file_metadata().schema();
65
66 let mut wtr = csv::Writer::from_writer(vec![]);
67 let fields = schema.get_fields();
68 let headers: Vec<String> = fields.iter().map(|f| f.name().to_string()).collect();
69 wtr.write_record(&headers).map_err(|e| PyValueError::new_err(e.to_string()))?;
70
71 let row_iter = reader.get_row_iter(None).map_err(|e| PyValueError::new_err(e.to_string()))?;
72 for row_result in row_iter {
73 let row = row_result.map_err(|e| PyValueError::new_err(e.to_string()))?;
74 let csv_record: Vec<String> = row.get_column_iter().map(|(_col_idx, col)| col.to_string()).collect();
75 wtr.write_record(&csv_record).map_err(|e| PyValueError::new_err(e.to_string()))?;
76 }
77
78 wtr.flush().map_err(|e| PyValueError::new_err(e.to_string()))?;
79 let csv_data = String::from_utf8(wtr.into_inner().map_err(|e| PyValueError::new_err(e.to_string()))?).map_err(|e| PyValueError::new_err(e.to_string()))?;
80 Ok(csv_data)
81}
82
83#[pyfunction]
88fn to_json_str(path: &str) -> PyResult<String> {
89 let file_path = Path::new(path);
90 let file =
91 File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
92 let reader = SerializedFileReader::new(file)
93 .map_err(|e| PyValueError::new_err(e.to_string()))?;
94
95 let mut json_str = "[".to_string();
97 for row in reader.get_row_iter(None).unwrap() {
98 json_str.push_str(&row.unwrap().to_json_value().to_string());
99 json_str.push_str(",");
100 }
101
102 json_str.pop();
103 json_str.push_str("]");
104
105 return Ok(json_str);
107}
108
109#[pyclass]
114struct ParquetRowIterator {
115 iter: RowIter<'static>,
116}
117
118#[pymethods]
119impl ParquetRowIterator {
120 #[new]
121 fn new(path: &str) -> PyResult<Self> {
122 let file_path = Path::new(path);
123 let file = File::open(&file_path)
124 .map_err(|e| PyIOError::new_err(e.to_string()))?;
125 let reader = SerializedFileReader::new(file)
126 .map_err(|e| PyValueError::new_err(e.to_string()))?;
127
128 Ok(Self {
129 iter: RowIter::from_file_into(Box::new(reader)),
130 })
131 }
132
133 fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
134 slf
135 }
136
137 fn __next__(mut slf: PyRefMut<Self>) -> PyResult<PyObject> {
138 let row = slf
139 .iter
140 .next()
141 .ok_or_else(|| PyErr::new::<PyStopIteration, _>("No more rows in parquet file"))?;
142 let row_dict = row.unwrap().to_json_value();
143 let dict = PyDict::new_bound(slf.py());
144 for (key, value) in row_dict.as_object().unwrap() {
145 dict.set_item(key, PyValue(value.clone()))?;
146 }
147 Ok(dict.into())
148 }
149}
150
151#[pyfunction]
156fn to_iter(path: &str) -> PyResult<ParquetRowIterator> {
157 let file_path = Path::new(path);
158 let file =
159 File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
160 let reader = SerializedFileReader::new(file)
161 .map_err(|e| PyValueError::new_err(e.to_string()))?;
162
163 Ok(ParquetRowIterator {
164 iter: RowIter::from_file_into(Box::new(reader)),
165 })
166}
167
168#[pyfunction]
173fn to_list(path: &str, py: Python) -> PyResult<PyObject> {
174 let file_path = Path::new(path);
175 let file =
176 File::open(&file_path).map_err(|e| PyIOError::new_err(e.to_string()))?;
177 let reader = SerializedFileReader::new(file)
178 .map_err(|e| PyValueError::new_err(e.to_string()))?;
179 let list = PyList::empty_bound(py);
180 for row in reader.get_row_iter(None).unwrap() {
181 let row_dict = row.unwrap().to_json_value();
182 let dict = PyDict::new_bound(py);
183 for (key, value) in row_dict.as_object().unwrap() {
184 dict.set_item(key, PyValue(value.clone()))?;
185 list.append(&dict)?;
186 }
187 }
188
189 Ok(list.into())
190}
191
192#[pymodule]
194fn lib(m: &Bound<'_, PyModule>) -> PyResult<()> {
195 m.add_function(wrap_pyfunction!(to_json_str, m)?)?;
196 m.add_function(wrap_pyfunction!(to_csv_str, m)?)?;
197 m.add_function(wrap_pyfunction!(to_list, m)?)?;
198 m.add_function(wrap_pyfunction!(to_iter, m)?)?;
199 m.add_class::<ParquetRowIterator>()?;
200 Ok(())
201}