Skip to main content

supertable_core/
writer.rs

1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::Bytes;
4use parquet::arrow::ArrowWriter;
5use parquet::file::properties::WriterProperties;
6use std::io::Cursor;
7
8use crate::storage::Storage;
9
10/// Writes Arrow RecordBatches to Parquet files and commits them to storage.
11pub struct TableWriter {
12    storage: Storage,
13    table_location: String,
14    schema: SchemaRef,
15}
16
17impl TableWriter {
18    pub fn new(storage: Storage, table_location: String, schema: SchemaRef) -> Self {
19        Self {
20            storage,
21            table_location,
22            schema,
23        }
24    }
25
26    /// Writes a batch of records to a new Parquet file.
27    /// Returns the DataFile metadata for the written file.
28    pub async fn write_batch(
29        &self,
30        batch: &RecordBatch,
31        file_id: &str,
32    ) -> anyhow::Result<crate::manifest::DataFile> {
33        // Validate schema before writing
34        let validator = crate::validation::SchemaValidator::new(self.schema.clone());
35        validator.validate(batch)?;
36
37        let props = WriterProperties::builder().build();
38
39        // Write to an in-memory buffer first
40        let mut buffer = Cursor::new(Vec::new());
41        {
42            let mut writer = ArrowWriter::try_new(&mut buffer, self.schema.clone(), Some(props))?;
43            writer.write(batch)?;
44            writer.close()?;
45        }
46
47        let data = buffer.into_inner();
48        let file_size = data.len() as i64;
49        let record_count = batch.num_rows() as i64;
50        let path = format!("{}/data/{}.parquet", self.table_location, file_id);
51
52        self.storage.write(&path, Bytes::from(data)).await?;
53
54        let stats = crate::statistics::calculate_stats(batch)?;
55
56        let mut data_file = crate::manifest::DataFile::new(
57            path,
58            crate::manifest::FileFormat::Parquet,
59            record_count,
60            file_size,
61        );
62        data_file.statistics = stats;
63
64        Ok(data_file)
65    }
66}