Skip to main content

supertable_core/writer/
table_writer.rs

1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::Bytes;
4use parquet::arrow::ArrowWriter;
5use parquet::file::properties::WriterProperties;
6use std::io::Cursor;
7
8use crate::storage::Storage;
9
10/// Writes Arrow RecordBatches to Parquet files and commits them to storage.
11pub struct TableWriter {
12    storage: Storage,
13    table_location: String,
14    schema: SchemaRef,
15}
16
17impl TableWriter {
18    pub fn new(storage: Storage, table_location: String, schema: SchemaRef) -> Self {
19        Self {
20            storage,
21            table_location,
22            schema,
23        }
24    }
25
26    /// Writes a batch of records to a new Parquet file.
27    /// Returns the DataFile metadata for the written file.
28    pub async fn write_batch(
29        &self,
30        batch: &RecordBatch,
31        file_id: &str,
32    ) -> anyhow::Result<crate::manifest::DataFile> {
33        // Validate schema before writing
34        let validator = crate::validation::SchemaValidator::new(self.schema.clone());
35        if let Err(e) = validator.validate(batch) {
36            return Err(e.into());
37        }
38
39        let props = WriterProperties::builder().build();
40
41        // Write to an in-memory buffer first
42        let mut buffer = Cursor::new(Vec::new());
43        {
44            let mut writer = ArrowWriter::try_new(&mut buffer, self.schema.clone(), Some(props))?;
45            writer.write(&batch)?;
46            writer.close()?;
47        }
48
49        let data = buffer.into_inner();
50        let file_size = data.len() as i64;
51        let record_count = batch.num_rows() as i64;
52        let path = format!("{}/data/{}.parquet", self.table_location, file_id);
53
54        self.storage.write(&path, Bytes::from(data)).await?;
55
56        // Calculate stats (assuming stats module exists and works similarly)
57        // Note: crate::statistics::calculate_stats might need adjustment if I changed imports,
58        // but here I'm just copying code.
59        // Wait, original code usage: crate::statistics::calculate_stats(batch)?
60        let stats = crate::statistics::calculate_stats(batch)?;
61
62        let mut data_file = crate::manifest::DataFile::new(
63            path,
64            crate::manifest::FileFormat::Parquet,
65            record_count,
66            file_size,
67        );
68        data_file.statistics = stats;
69
70        Ok(data_file)
71    }
72}