supertable_core/
writer.rs1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::Bytes;
4use parquet::arrow::ArrowWriter;
5use parquet::file::properties::WriterProperties;
6use std::io::Cursor;
7
8use crate::storage::Storage;
9
10pub struct TableWriter {
12 storage: Storage,
13 table_location: String,
14 schema: SchemaRef,
15}
16
17impl TableWriter {
18 pub fn new(storage: Storage, table_location: String, schema: SchemaRef) -> Self {
19 Self {
20 storage,
21 table_location,
22 schema,
23 }
24 }
25
26 pub async fn write_batch(
29 &self,
30 batch: &RecordBatch,
31 file_id: &str,
32 ) -> anyhow::Result<crate::manifest::DataFile> {
33 let validator = crate::validation::SchemaValidator::new(self.schema.clone());
35 validator.validate(batch)?;
36
37 let props = WriterProperties::builder().build();
38
39 let mut buffer = Cursor::new(Vec::new());
41 {
42 let mut writer = ArrowWriter::try_new(&mut buffer, self.schema.clone(), Some(props))?;
43 writer.write(batch)?;
44 writer.close()?;
45 }
46
47 let data = buffer.into_inner();
48 let file_size = data.len() as i64;
49 let record_count = batch.num_rows() as i64;
50 let path = format!("{}/data/{}.parquet", self.table_location, file_id);
51
52 self.storage.write(&path, Bytes::from(data)).await?;
53
54 let stats = crate::statistics::calculate_stats(batch)?;
55
56 let mut data_file = crate::manifest::DataFile::new(
57 path,
58 crate::manifest::FileFormat::Parquet,
59 record_count,
60 file_size,
61 );
62 data_file.statistics = stats;
63
64 Ok(data_file)
65 }
66}