supertable_core/writer/
table_writer.rs1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::Bytes;
4use parquet::arrow::ArrowWriter;
5use parquet::file::properties::WriterProperties;
6use std::io::Cursor;
7
8use crate::storage::Storage;
9
10pub struct TableWriter {
12 storage: Storage,
13 table_location: String,
14 schema: SchemaRef,
15}
16
17impl TableWriter {
18 pub fn new(storage: Storage, table_location: String, schema: SchemaRef) -> Self {
19 Self {
20 storage,
21 table_location,
22 schema,
23 }
24 }
25
26 pub async fn write_batch(
29 &self,
30 batch: &RecordBatch,
31 file_id: &str,
32 ) -> anyhow::Result<crate::manifest::DataFile> {
33 let validator = crate::validation::SchemaValidator::new(self.schema.clone());
35 if let Err(e) = validator.validate(batch) {
36 return Err(e.into());
37 }
38
39 let props = WriterProperties::builder().build();
40
41 let mut buffer = Cursor::new(Vec::new());
43 {
44 let mut writer = ArrowWriter::try_new(&mut buffer, self.schema.clone(), Some(props))?;
45 writer.write(&batch)?;
46 writer.close()?;
47 }
48
49 let data = buffer.into_inner();
50 let file_size = data.len() as i64;
51 let record_count = batch.num_rows() as i64;
52 let path = format!("{}/data/{}.parquet", self.table_location, file_id);
53
54 self.storage.write(&path, Bytes::from(data)).await?;
55
56 let stats = crate::statistics::calculate_stats(batch)?;
61
62 let mut data_file = crate::manifest::DataFile::new(
63 path,
64 crate::manifest::FileFormat::Parquet,
65 record_count,
66 file_size,
67 );
68 data_file.statistics = stats;
69
70 Ok(data_file)
71 }
72}