gmt_lom/
table.rs

1use arrow::compute::concat_batches;
2use arrow::record_batch::{RecordBatch, RecordBatchReader};
3use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
4use parquet::arrow::ArrowWriter;
5use std::io::BufWriter;
6use std::path::PathBuf;
7use std::{fs::File, path::Path};
8
9#[derive(Debug, thiserror::Error)]
10pub enum TableError {
11    #[cfg(feature = "object_store")]
12    #[error("failed to load a parquet table from an S3 bucket")]
13    S3Table(#[from] store::StoredTableError),
14    #[error("Parqutet file: {1:?} not found")]
15    ParquetFile(#[source] std::io::Error, PathBuf),
16    #[error("parquet read failed")]
17    Parquet(#[from] parquet::errors::ParquetError),
18    #[error("arrow record get failed")]
19    Arrow(#[from] arrow::error::ArrowError),
20}
21
22pub struct Table {
23    record: RecordBatch,
24}
25
26impl Table {
27    /// Loads a table from a [parquet](https://docs.rs/parquet/latest/parquet/index.html) file
28    pub fn from_parquet<P>(path: P) -> Result<Self, TableError>
29    where
30        P: AsRef<Path>,
31    {
32        let file = File::open(&path)
33            .map_err(|e| TableError::ParquetFile(e, path.as_ref().to_path_buf()))?;
34        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
35            .with_batch_size(2048)
36            .build()?;
37        let schema = parquet_reader.schema();
38        let records: std::result::Result<Vec<_>, arrow::error::ArrowError> =
39            parquet_reader.collect();
40        let record = concat_batches(&schema, records?.as_slice())?;
41        Ok(Self { record })
42    }
43    /// Returns a reference to the [record](https://docs.rs/arrow/latest/arrow/array/struct.RecordBatch.html)
44    pub fn table(&self) -> &RecordBatch {
45        &self.record
46    }
47    /// Saves a table to a [parquet](https://docs.rs/parquet/latest/parquet/index.html) file
48    pub fn to_parquet(&self, path: impl AsRef<Path>) -> Result<(), TableError> {
49        let file = File::create(&path)
50            .map_err(|e| TableError::ParquetFile(e, path.as_ref().to_path_buf()))?;
51        let mut buffer = BufWriter::new(file);
52        let mut writer = ArrowWriter::try_new(&mut buffer, self.record.schema(), None)?;
53        writer.write(&self.record)?;
54        writer.close()?;
55        Ok(())
56    }
57    pub fn to_mem(&self) -> Result<Vec<u8>, TableError> {
58        let mut buffer = Vec::new();
59        let mut writer = ArrowWriter::try_new(&mut buffer, self.record.schema(), None)?;
60        writer.write(&self.record)?;
61        writer.close()?;
62        Ok(buffer)
63    }
64}
65
66impl From<RecordBatch> for Table {
67    fn from(record: RecordBatch) -> Self {
68        Self { record }
69    }
70}
71
72#[cfg(feature = "object_store")]
73pub mod store;