1use arrow::compute::concat_batches;
2use arrow::record_batch::{RecordBatch, RecordBatchReader};
3use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
4use parquet::arrow::ArrowWriter;
5use std::io::BufWriter;
6use std::path::PathBuf;
7use std::{fs::File, path::Path};
8
9#[derive(Debug, thiserror::Error)]
10pub enum TableError {
11 #[cfg(feature = "object_store")]
12 #[error("failed to load a parquet table from an S3 bucket")]
13 S3Table(#[from] store::StoredTableError),
14 #[error("Parqutet file: {1:?} not found")]
15 ParquetFile(#[source] std::io::Error, PathBuf),
16 #[error("parquet read failed")]
17 Parquet(#[from] parquet::errors::ParquetError),
18 #[error("arrow record get failed")]
19 Arrow(#[from] arrow::error::ArrowError),
20}
21
22pub struct Table {
23 record: RecordBatch,
24}
25
26impl Table {
27 pub fn from_parquet<P>(path: P) -> Result<Self, TableError>
29 where
30 P: AsRef<Path>,
31 {
32 let file = File::open(&path)
33 .map_err(|e| TableError::ParquetFile(e, path.as_ref().to_path_buf()))?;
34 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
35 .with_batch_size(2048)
36 .build()?;
37 let schema = parquet_reader.schema();
38 let records: std::result::Result<Vec<_>, arrow::error::ArrowError> =
39 parquet_reader.collect();
40 let record = concat_batches(&schema, records?.as_slice())?;
41 Ok(Self { record })
42 }
43 pub fn table(&self) -> &RecordBatch {
45 &self.record
46 }
47 pub fn to_parquet(&self, path: impl AsRef<Path>) -> Result<(), TableError> {
49 let file = File::create(&path)
50 .map_err(|e| TableError::ParquetFile(e, path.as_ref().to_path_buf()))?;
51 let mut buffer = BufWriter::new(file);
52 let mut writer = ArrowWriter::try_new(&mut buffer, self.record.schema(), None)?;
53 writer.write(&self.record)?;
54 writer.close()?;
55 Ok(())
56 }
57 pub fn to_mem(&self) -> Result<Vec<u8>, TableError> {
58 let mut buffer = Vec::new();
59 let mut writer = ArrowWriter::try_new(&mut buffer, self.record.schema(), None)?;
60 writer.write(&self.record)?;
61 writer.close()?;
62 Ok(buffer)
63 }
64}
65
66impl From<RecordBatch> for Table {
67 fn from(record: RecordBatch) -> Self {
68 Self { record }
69 }
70}
71
72#[cfg(feature = "object_store")]
73pub mod store;