Skip to main content

ssh_commander_pg_parquet/
lib.rs

1//! Parquet export pipeline. Writer handles live in a process-wide
2//! registry so the FFI can stream rows from Swift across multiple
3//! `pgFetchPage` calls without buffering the full result.
4//!
5//! All columns serialize as Utf8 — the explorer never has typed
6//! values to begin with (everything comes back as the server's text
7//! representation), and a Parquet file with text columns reads
8//! correctly into every analysis tool that supports the format.
9//! Per-OID Arrow type inference would be the polished v2; for v1,
10//! "string Parquet that round-trips through pandas, DuckDB, and
11//! Spark" is the right balance against the dep cost.
12
13use std::collections::HashMap;
14use std::fs::File;
15use std::path::Path;
16use std::sync::{Arc, Mutex, OnceLock};
17
18use arrow_array::{ArrayRef, RecordBatch, StringArray};
19use arrow_schema::{DataType, Field, Schema};
20use parquet::arrow::ArrowWriter;
21use parquet::file::properties::WriterProperties;
22
23#[derive(Debug, thiserror::Error)]
24pub enum ParquetExportError {
25    #[error("io error: {0}")]
26    Io(#[from] std::io::Error),
27    #[error("arrow error: {0}")]
28    Arrow(#[from] arrow_schema::ArrowError),
29    #[error("parquet error: {0}")]
30    Parquet(#[from] parquet::errors::ParquetError),
31    #[error("unknown parquet writer id")]
32    UnknownWriter,
33}
34
35struct ParquetExporter {
36    writer: ArrowWriter<File>,
37    schema: Arc<Schema>,
38    column_count: usize,
39}
40
41impl ParquetExporter {
42    fn create(path: &Path, columns: &[String]) -> Result<Self, ParquetExportError> {
43        let fields = columns
44            .iter()
45            .map(|name| Field::new(name, DataType::Utf8, true))
46            .collect::<Vec<_>>();
47        let schema = Arc::new(Schema::new(fields));
48        let file = File::create(path)?;
49        let props = WriterProperties::builder().build();
50        let writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
51        Ok(Self {
52            writer,
53            schema,
54            column_count: columns.len(),
55        })
56    }
57
58    fn append_rows(&mut self, rows: &[Vec<Option<String>>]) -> Result<(), ParquetExportError> {
59        if rows.is_empty() {
60            return Ok(());
61        }
62        // One Arrow column per Parquet column; collect the slice of
63        // values for each column index across the batch's rows.
64        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.column_count);
65        for col_idx in 0..self.column_count {
66            let values: Vec<Option<&str>> = rows
67                .iter()
68                .map(|row| row.get(col_idx).and_then(|v| v.as_deref()))
69                .collect();
70            columns.push(Arc::new(StringArray::from(values)));
71        }
72        let batch = RecordBatch::try_new(self.schema.clone(), columns)?;
73        self.writer.write(&batch)?;
74        Ok(())
75    }
76
77    fn close(self) -> Result<(), ParquetExportError> {
78        self.writer.close()?;
79        Ok(())
80    }
81}
82
83/// Registry of in-flight Parquet writers, keyed by an opaque u64
84/// handle. The FFI returns the handle to Swift on `open`; Swift
85/// passes it back on every `append` and `close`.
86pub struct ParquetRegistry {
87    next_id: Mutex<u64>,
88    exporters: Mutex<HashMap<u64, ParquetExporter>>,
89}
90
91impl ParquetRegistry {
92    pub fn global() -> &'static Self {
93        static REGISTRY: OnceLock<ParquetRegistry> = OnceLock::new();
94        REGISTRY.get_or_init(|| ParquetRegistry {
95            next_id: Mutex::new(1),
96            exporters: Mutex::new(HashMap::new()),
97        })
98    }
99
100    pub fn open(&self, path: &Path, columns: &[String]) -> Result<u64, ParquetExportError> {
101        let exporter = ParquetExporter::create(path, columns)?;
102        let id = {
103            let mut next = self.next_id.lock().expect("registry poisoned");
104            let id = *next;
105            *next += 1;
106            id
107        };
108        self.exporters
109            .lock()
110            .expect("registry poisoned")
111            .insert(id, exporter);
112        Ok(id)
113    }
114
115    pub fn append(&self, id: u64, rows: &[Vec<Option<String>>]) -> Result<(), ParquetExportError> {
116        let mut map = self.exporters.lock().expect("registry poisoned");
117        let exporter = map.get_mut(&id).ok_or(ParquetExportError::UnknownWriter)?;
118        exporter.append_rows(rows)
119    }
120
121    pub fn close(&self, id: u64) -> Result<(), ParquetExportError> {
122        let exporter = self
123            .exporters
124            .lock()
125            .expect("registry poisoned")
126            .remove(&id)
127            .ok_or(ParquetExportError::UnknownWriter)?;
128        exporter.close()
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use tempfile::tempdir;
136
137    #[test]
138    fn write_round_trip() {
139        let dir = tempdir().expect("tempdir");
140        let path = dir.path().join("test.parquet");
141        let registry = ParquetRegistry::global();
142        let id = registry
143            .open(&path, &["id".into(), "name".into()])
144            .expect("open");
145        registry
146            .append(
147                id,
148                &[
149                    vec![Some("1".into()), Some("alice".into())],
150                    vec![Some("2".into()), None],
151                ],
152            )
153            .expect("append");
154        registry.close(id).expect("close");
155        // File exists with non-zero size — closing flushed the
156        // metadata footer.
157        let metadata = std::fs::metadata(&path).expect("file exists");
158        assert!(metadata.len() > 0);
159    }
160
161    #[test]
162    fn append_after_close_errors() {
163        let dir = tempdir().expect("tempdir");
164        let path = dir.path().join("test2.parquet");
165        let registry = ParquetRegistry::global();
166        let id = registry.open(&path, &["x".into()]).expect("open");
167        registry.close(id).expect("close");
168        let res = registry.append(id, &[vec![Some("z".into())]]);
169        assert!(matches!(res, Err(ParquetExportError::UnknownWriter)));
170    }
171}