1use crate::metadata::TableBucket;
19use crate::row::ColumnarRow;
20use ::arrow::array::RecordBatch;
21use core::fmt;
22use std::collections::HashMap;
23
24mod arrow;
25mod error;
26pub mod kv;
27
28pub use arrow::*;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub enum ChangeType {
32 AppendOnly,
34 Insert,
36 UpdateBefore,
38 UpdateAfter,
40 Delete,
42}
43
44impl ChangeType {
45 pub fn short_string(&self) -> &'static str {
47 match self {
48 ChangeType::AppendOnly => "+A",
49 ChangeType::Insert => "+I",
50 ChangeType::UpdateBefore => "-U",
51 ChangeType::UpdateAfter => "+U",
52 ChangeType::Delete => "-D",
53 }
54 }
55
56 pub fn to_byte_value(&self) -> u8 {
58 match self {
59 ChangeType::AppendOnly => 0,
60 ChangeType::Insert => 1,
61 ChangeType::UpdateBefore => 2,
62 ChangeType::UpdateAfter => 3,
63 ChangeType::Delete => 4,
64 }
65 }
66
67 pub fn from_byte_value(value: u8) -> Result<Self, String> {
72 match value {
73 0 => Ok(ChangeType::AppendOnly),
74 1 => Ok(ChangeType::Insert),
75 2 => Ok(ChangeType::UpdateBefore),
76 3 => Ok(ChangeType::UpdateAfter),
77 4 => Ok(ChangeType::Delete),
78 _ => Err(format!("Unsupported byte value '{value}' for change type")),
79 }
80 }
81}
82
83impl fmt::Display for ChangeType {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 write!(f, "{}", self.short_string())
86 }
87}
88
89#[derive(Clone)]
90pub struct ScanRecord {
91 pub row: ColumnarRow,
92 offset: i64,
93 timestamp: i64,
94 change_type: ChangeType,
95}
96
97impl ScanRecord {
98 const INVALID: i64 = -1;
99
100 pub fn new_default(row: ColumnarRow) -> Self {
101 ScanRecord {
102 row,
103 offset: Self::INVALID,
104 timestamp: Self::INVALID,
105 change_type: ChangeType::Insert,
106 }
107 }
108
109 pub fn new(row: ColumnarRow, offset: i64, timestamp: i64, change_type: ChangeType) -> Self {
110 ScanRecord {
111 row,
112 offset,
113 timestamp,
114 change_type,
115 }
116 }
117
118 pub fn row(&self) -> &ColumnarRow {
119 &self.row
120 }
121
122 pub fn offset(&self) -> i64 {
124 self.offset
125 }
126
127 pub fn timestamp(&self) -> i64 {
129 self.timestamp
130 }
131
132 pub fn change_type(&self) -> &ChangeType {
134 &self.change_type
135 }
136}
137
138pub struct ScanRecords {
139 records: HashMap<TableBucket, Vec<ScanRecord>>,
140}
141
142impl ScanRecords {
143 pub fn empty() -> Self {
144 Self {
145 records: HashMap::new(),
146 }
147 }
148
149 pub fn new(records: HashMap<TableBucket, Vec<ScanRecord>>) -> Self {
150 Self { records }
151 }
152
153 pub fn records(&self, scan_bucket: &TableBucket) -> &[ScanRecord] {
154 self.records.get(scan_bucket).map_or(&[], |records| records)
155 }
156
157 pub fn count(&self) -> usize {
158 self.records.values().map(|v| v.len()).sum()
159 }
160
161 pub fn is_empty(&self) -> bool {
162 self.records.is_empty()
163 }
164
165 pub fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>> {
166 &self.records
167 }
168
169 pub fn into_records_by_buckets(self) -> HashMap<TableBucket, Vec<ScanRecord>> {
170 self.records
171 }
172}
173
174#[derive(Debug, Clone)]
180pub struct ScanBatch {
181 bucket: TableBucket,
183 batch: RecordBatch,
185 base_offset: i64,
187}
188
189impl ScanBatch {
190 pub fn new(bucket: TableBucket, batch: RecordBatch, base_offset: i64) -> Self {
191 Self {
192 bucket,
193 batch,
194 base_offset,
195 }
196 }
197
198 pub fn bucket(&self) -> &TableBucket {
199 &self.bucket
200 }
201
202 pub fn batch(&self) -> &RecordBatch {
203 &self.batch
204 }
205
206 pub fn into_batch(self) -> RecordBatch {
207 self.batch
208 }
209
210 pub fn base_offset(&self) -> i64 {
211 self.base_offset
212 }
213
214 pub fn num_records(&self) -> usize {
215 self.batch.num_rows()
216 }
217
218 pub fn last_offset(&self) -> i64 {
220 if self.batch.num_rows() == 0 {
221 self.base_offset - 1
222 } else {
223 self.base_offset + self.batch.num_rows() as i64 - 1
224 }
225 }
226}
227
228impl IntoIterator for ScanRecords {
229 type Item = ScanRecord;
230 type IntoIter = std::vec::IntoIter<ScanRecord>;
231
232 fn into_iter(self) -> Self::IntoIter {
233 self.records
234 .into_values()
235 .flatten()
236 .collect::<Vec<_>>()
237 .into_iter()
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use ::arrow::array::{Int32Array, RecordBatch};
245 use ::arrow::datatypes::{DataType, Field, Schema};
246 use std::sync::Arc;
247
248 fn make_row(values: Vec<i32>, row_id: usize) -> ColumnarRow {
249 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
250 let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))])
251 .expect("record batch");
252 ColumnarRow::new_with_row_id(Arc::new(batch), row_id)
253 }
254
255 #[test]
256 fn change_type_round_trip() {
257 let cases = [
258 (ChangeType::AppendOnly, "+A", 0),
259 (ChangeType::Insert, "+I", 1),
260 (ChangeType::UpdateBefore, "-U", 2),
261 (ChangeType::UpdateAfter, "+U", 3),
262 (ChangeType::Delete, "-D", 4),
263 ];
264
265 for (change_type, short, byte) in cases {
266 assert_eq!(change_type.short_string(), short);
267 assert_eq!(change_type.to_byte_value(), byte);
268 assert_eq!(ChangeType::from_byte_value(byte).unwrap(), change_type);
269 }
270
271 let err = ChangeType::from_byte_value(9).unwrap_err();
272 assert!(err.contains("Unsupported byte value"));
273 }
274
275 #[test]
276 fn scan_records_counts_and_iterates() {
277 let bucket0 = TableBucket::new(1, 0);
278 let bucket1 = TableBucket::new(1, 1);
279 let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, ChangeType::Insert);
280 let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, ChangeType::Delete);
281
282 let mut records = HashMap::new();
283 records.insert(bucket0.clone(), vec![record0.clone(), record1.clone()]);
284
285 let scan_records = ScanRecords::new(records);
286 assert_eq!(scan_records.records(&bucket0).len(), 2);
287 assert!(scan_records.records(&bucket1).is_empty());
288 assert_eq!(scan_records.count(), 2);
289
290 let collected: Vec<_> = scan_records.into_iter().collect();
291 assert_eq!(collected.len(), 2);
292 }
293
294 #[test]
295 fn scan_record_default_values() {
296 let record = ScanRecord::new_default(make_row(vec![1], 0));
297 assert_eq!(record.offset(), -1);
298 assert_eq!(record.timestamp(), -1);
299 assert_eq!(record.change_type(), &ChangeType::Insert);
300 }
301
302 #[test]
303 fn scan_batch_last_offset() {
304 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
305 let bucket = TableBucket::new(1, 0);
306
307 let batch = RecordBatch::try_new(
309 schema.clone(),
310 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
311 )
312 .unwrap();
313 let scan_batch = ScanBatch::new(bucket.clone(), batch, 100);
314 assert_eq!(scan_batch.num_records(), 3);
315 assert_eq!(scan_batch.last_offset(), 102);
316
317 let empty_batch = RecordBatch::new_empty(schema);
319 let empty_scan_batch = ScanBatch::new(bucket, empty_batch, 100);
320 assert_eq!(empty_scan_batch.num_records(), 0);
321 assert_eq!(empty_scan_batch.last_offset(), 99);
322 }
323}