Skip to main content

supertable_core/writer/
delete_writer.rs

1use crate::manifest::{DataFile, FileContent, FileFormat};
2use crate::storage::Storage;
3use arrow::array::{Int64Array, RecordBatch, StringArray};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5use bytes::Bytes;
6use parquet::arrow::ArrowWriter;
7use parquet::file::properties::WriterProperties;
8use std::io::Cursor;
9use std::sync::Arc;
10
11/// Writes position deletes (file_path, pos) to Parquet files.
12pub struct PositionDeleteWriter {
13    storage: Storage,
14    table_location: String,
15}
16
17impl PositionDeleteWriter {
18    pub fn new(storage: Storage, table_location: String) -> Self {
19        Self {
20            storage,
21            table_location,
22        }
23    }
24
25    /// Writes a batch of position deletes for a specific data file.
26    pub async fn write_deletes(
27        &self,
28        target_file_path: &str,
29        positions: Vec<i64>,
30        file_id: &str,
31    ) -> anyhow::Result<DataFile> {
32        let schema = Arc::new(Schema::new(vec![
33            Field::new("file_path", DataType::Utf8, false),
34            Field::new("pos", DataType::Int64, false),
35        ]));
36
37        let len = positions.len();
38        let file_path_array = StringArray::from(vec![target_file_path; len]);
39        let pos_array = Int64Array::from(positions);
40
41        let batch = RecordBatch::try_new(
42            schema.clone(),
43            vec![Arc::new(file_path_array), Arc::new(pos_array)],
44        )?;
45
46        // Write Parquet
47        let props = WriterProperties::builder().build();
48        let mut buffer = Cursor::new(Vec::new());
49        {
50            let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props))?;
51            writer.write(&batch)?;
52            writer.close()?;
53        }
54
55        let data = buffer.into_inner();
56        let file_size = data.len() as i64;
57        let record_count = len as i64;
58        let path = format!("{}/data/{}-delete.parquet", self.table_location, file_id);
59
60        self.storage.write(&path, Bytes::from(data)).await?;
61
62        // Format is currently Data, should be PositionDeletes
63        let data_file = DataFile::new(path, FileFormat::Parquet, record_count, file_size)
64            .with_content(FileContent::PositionDeletes);
65
66        Ok(data_file)
67    }
68}
69
70/// Writes equality deletes to Parquet files.
71pub struct EqualityDeleteWriter {
72    storage: Storage,
73    table_location: String,
74    schema: SchemaRef,
75    equality_ids: Vec<i32>,
76}
77
78impl EqualityDeleteWriter {
79    pub fn new(
80        storage: Storage,
81        table_location: String,
82        schema: SchemaRef,
83        equality_ids: Vec<i32>,
84    ) -> Self {
85        Self {
86            storage,
87            table_location,
88            schema,
89            equality_ids,
90        }
91    }
92
93    /// Writes a batch of equality deletes.
94    pub async fn write_batch(
95        &self,
96        batch: &RecordBatch,
97        file_id: &str,
98    ) -> anyhow::Result<DataFile> {
99        let props = WriterProperties::builder().build();
100        let mut buffer = Cursor::new(Vec::new());
101        {
102            let mut writer = ArrowWriter::try_new(&mut buffer, self.schema.clone(), Some(props))?;
103            writer.write(batch)?;
104            writer.close()?;
105        }
106
107        let data = buffer.into_inner();
108        let file_size = data.len() as i64;
109        let record_count = batch.num_rows() as i64;
110        let path = format!("{}/data/{}-eq-delete.parquet", self.table_location, file_id);
111
112        self.storage.write(&path, Bytes::from(data)).await?;
113
114        // Calculate stats for pruning
115        let stats = crate::statistics::calculate_stats(batch)?;
116
117        let mut data_file = DataFile::new(path, FileFormat::Parquet, record_count, file_size)
118            .with_content(FileContent::EqualityDeletes)
119            .with_equality_ids(self.equality_ids.clone());
120
121        data_file.statistics = stats;
122
123        Ok(data_file)
124    }
125}