supertable_core/writer/
delete_writer.rs1use crate::manifest::{DataFile, FileContent, FileFormat};
2use crate::storage::Storage;
3use arrow::array::{Int64Array, RecordBatch, StringArray};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5use bytes::Bytes;
6use parquet::arrow::ArrowWriter;
7use parquet::file::properties::WriterProperties;
8use std::io::Cursor;
9use std::sync::Arc;
10
11pub struct PositionDeleteWriter {
13 storage: Storage,
14 table_location: String,
15}
16
17impl PositionDeleteWriter {
18 pub fn new(storage: Storage, table_location: String) -> Self {
19 Self {
20 storage,
21 table_location,
22 }
23 }
24
25 pub async fn write_deletes(
27 &self,
28 target_file_path: &str,
29 positions: Vec<i64>,
30 file_id: &str,
31 ) -> anyhow::Result<DataFile> {
32 let schema = Arc::new(Schema::new(vec![
33 Field::new("file_path", DataType::Utf8, false),
34 Field::new("pos", DataType::Int64, false),
35 ]));
36
37 let len = positions.len();
38 let file_path_array = StringArray::from(vec![target_file_path; len]);
39 let pos_array = Int64Array::from(positions);
40
41 let batch = RecordBatch::try_new(
42 schema.clone(),
43 vec![Arc::new(file_path_array), Arc::new(pos_array)],
44 )?;
45
46 let props = WriterProperties::builder().build();
48 let mut buffer = Cursor::new(Vec::new());
49 {
50 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props))?;
51 writer.write(&batch)?;
52 writer.close()?;
53 }
54
55 let data = buffer.into_inner();
56 let file_size = data.len() as i64;
57 let record_count = len as i64;
58 let path = format!("{}/data/{}-delete.parquet", self.table_location, file_id);
59
60 self.storage.write(&path, Bytes::from(data)).await?;
61
62 let data_file = DataFile::new(path, FileFormat::Parquet, record_count, file_size)
64 .with_content(FileContent::PositionDeletes);
65
66 Ok(data_file)
67 }
68}
69
70pub struct EqualityDeleteWriter {
72 storage: Storage,
73 table_location: String,
74 schema: SchemaRef,
75 equality_ids: Vec<i32>,
76}
77
78impl EqualityDeleteWriter {
79 pub fn new(
80 storage: Storage,
81 table_location: String,
82 schema: SchemaRef,
83 equality_ids: Vec<i32>,
84 ) -> Self {
85 Self {
86 storage,
87 table_location,
88 schema,
89 equality_ids,
90 }
91 }
92
93 pub async fn write_batch(
95 &self,
96 batch: &RecordBatch,
97 file_id: &str,
98 ) -> anyhow::Result<DataFile> {
99 let props = WriterProperties::builder().build();
100 let mut buffer = Cursor::new(Vec::new());
101 {
102 let mut writer = ArrowWriter::try_new(&mut buffer, self.schema.clone(), Some(props))?;
103 writer.write(batch)?;
104 writer.close()?;
105 }
106
107 let data = buffer.into_inner();
108 let file_size = data.len() as i64;
109 let record_count = batch.num_rows() as i64;
110 let path = format!("{}/data/{}-eq-delete.parquet", self.table_location, file_id);
111
112 self.storage.write(&path, Bytes::from(data)).await?;
113
114 let stats = crate::statistics::calculate_stats(batch)?;
116
117 let mut data_file = DataFile::new(path, FileFormat::Parquet, record_count, file_size)
118 .with_content(FileContent::EqualityDeletes)
119 .with_equality_ids(self.equality_ids.clone());
120
121 data_file.statistics = stats;
122
123 Ok(data_file)
124 }
125}