ssh_commander_pg_parquet/
lib.rs1use std::collections::HashMap;
14use std::fs::File;
15use std::path::Path;
16use std::sync::{Arc, Mutex, OnceLock};
17
18use arrow_array::{ArrayRef, RecordBatch, StringArray};
19use arrow_schema::{DataType, Field, Schema};
20use parquet::arrow::ArrowWriter;
21use parquet::file::properties::WriterProperties;
22
23#[derive(Debug, thiserror::Error)]
24pub enum ParquetExportError {
25 #[error("io error: {0}")]
26 Io(#[from] std::io::Error),
27 #[error("arrow error: {0}")]
28 Arrow(#[from] arrow_schema::ArrowError),
29 #[error("parquet error: {0}")]
30 Parquet(#[from] parquet::errors::ParquetError),
31 #[error("unknown parquet writer id")]
32 UnknownWriter,
33}
34
35struct ParquetExporter {
36 writer: ArrowWriter<File>,
37 schema: Arc<Schema>,
38 column_count: usize,
39}
40
41impl ParquetExporter {
42 fn create(path: &Path, columns: &[String]) -> Result<Self, ParquetExportError> {
43 let fields = columns
44 .iter()
45 .map(|name| Field::new(name, DataType::Utf8, true))
46 .collect::<Vec<_>>();
47 let schema = Arc::new(Schema::new(fields));
48 let file = File::create(path)?;
49 let props = WriterProperties::builder().build();
50 let writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
51 Ok(Self {
52 writer,
53 schema,
54 column_count: columns.len(),
55 })
56 }
57
58 fn append_rows(&mut self, rows: &[Vec<Option<String>>]) -> Result<(), ParquetExportError> {
59 if rows.is_empty() {
60 return Ok(());
61 }
62 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.column_count);
65 for col_idx in 0..self.column_count {
66 let values: Vec<Option<&str>> = rows
67 .iter()
68 .map(|row| row.get(col_idx).and_then(|v| v.as_deref()))
69 .collect();
70 columns.push(Arc::new(StringArray::from(values)));
71 }
72 let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
73 self.writer.write(&batch)?;
74 Ok(())
75 }
76
77 fn close(self) -> Result<(), ParquetExportError> {
78 self.writer.close()?;
79 Ok(())
80 }
81}
82
83pub struct ParquetRegistry {
87 next_id: Mutex<u64>,
88 exporters: Mutex<HashMap<u64, ParquetExporter>>,
89}
90
91impl ParquetRegistry {
92 pub fn global() -> &'static Self {
93 static REGISTRY: OnceLock<ParquetRegistry> = OnceLock::new();
94 REGISTRY.get_or_init(|| ParquetRegistry {
95 next_id: Mutex::new(1),
96 exporters: Mutex::new(HashMap::new()),
97 })
98 }
99
100 pub fn open(&self, path: &Path, columns: &[String]) -> Result<u64, ParquetExportError> {
101 let exporter = ParquetExporter::create(path, columns)?;
102 let id = {
103 let mut next = self.next_id.lock().expect("registry poisoned");
104 let id = *next;
105 *next += 1;
106 id
107 };
108 self.exporters
109 .lock()
110 .expect("registry poisoned")
111 .insert(id, exporter);
112 Ok(id)
113 }
114
115 pub fn append(&self, id: u64, rows: &[Vec<Option<String>>]) -> Result<(), ParquetExportError> {
116 let mut map = self.exporters.lock().expect("registry poisoned");
117 let exporter = map.get_mut(&id).ok_or(ParquetExportError::UnknownWriter)?;
118 exporter.append_rows(rows)
119 }
120
121 pub fn close(&self, id: u64) -> Result<(), ParquetExportError> {
122 let exporter = self
123 .exporters
124 .lock()
125 .expect("registry poisoned")
126 .remove(&id)
127 .ok_or(ParquetExportError::UnknownWriter)?;
128 exporter.close()
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use tempfile::tempdir;
136
137 #[test]
138 fn write_round_trip() {
139 let dir = tempdir().expect("tempdir");
140 let path = dir.path().join("test.parquet");
141 let registry = ParquetRegistry::global();
142 let id = registry
143 .open(&path, &["id".into(), "name".into()])
144 .expect("open");
145 registry
146 .append(
147 id,
148 &[
149 vec![Some("1".into()), Some("alice".into())],
150 vec![Some("2".into()), None],
151 ],
152 )
153 .expect("append");
154 registry.close(id).expect("close");
155 let metadata = std::fs::metadata(&path).expect("file exists");
158 assert!(metadata.len() > 0);
159 }
160
161 #[test]
162 fn append_after_close_errors() {
163 let dir = tempdir().expect("tempdir");
164 let path = dir.path().join("test2.parquet");
165 let registry = ParquetRegistry::global();
166 let id = registry.open(&path, &["x".into()]).expect("open");
167 registry.close(id).expect("close");
168 let res = registry.append(id, &[vec![Some("z".into())]]);
169 assert!(matches!(res, Err(ParquetExportError::UnknownWriter)));
170 }
171}