Skip to main content

fluss/record/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::metadata::TableBucket;
19use crate::row::ColumnarRow;
20use ::arrow::array::RecordBatch;
21use core::fmt;
22use std::collections::HashMap;
23
24mod arrow;
25mod error;
26pub mod kv;
27
28pub use arrow::*;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub enum ChangeType {
32    /// Append-only operation
33    AppendOnly,
34    /// Insert operation
35    Insert,
36    /// Update operation containing the previous content of the updated row
37    UpdateBefore,
38    /// Update operation containing the new content of the updated row
39    UpdateAfter,
40    /// Delete operation
41    Delete,
42}
43
44impl ChangeType {
45    /// Returns a short string representation of this ChangeType
46    pub fn short_string(&self) -> &'static str {
47        match self {
48            ChangeType::AppendOnly => "+A",
49            ChangeType::Insert => "+I",
50            ChangeType::UpdateBefore => "-U",
51            ChangeType::UpdateAfter => "+U",
52            ChangeType::Delete => "-D",
53        }
54    }
55
56    /// Returns the byte value representation used for serialization
57    pub fn to_byte_value(&self) -> u8 {
58        match self {
59            ChangeType::AppendOnly => 0,
60            ChangeType::Insert => 1,
61            ChangeType::UpdateBefore => 2,
62            ChangeType::UpdateAfter => 3,
63            ChangeType::Delete => 4,
64        }
65    }
66
67    /// Creates a ChangeType from its byte value representation
68    ///
69    /// # Errors
70    /// Returns an error if the byte value doesn't correspond to any ChangeType
71    pub fn from_byte_value(value: u8) -> Result<Self, String> {
72        match value {
73            0 => Ok(ChangeType::AppendOnly),
74            1 => Ok(ChangeType::Insert),
75            2 => Ok(ChangeType::UpdateBefore),
76            3 => Ok(ChangeType::UpdateAfter),
77            4 => Ok(ChangeType::Delete),
78            _ => Err(format!("Unsupported byte value '{value}' for change type")),
79        }
80    }
81}
82
83impl fmt::Display for ChangeType {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        write!(f, "{}", self.short_string())
86    }
87}
88
89#[derive(Clone)]
90pub struct ScanRecord {
91    pub row: ColumnarRow,
92    offset: i64,
93    timestamp: i64,
94    change_type: ChangeType,
95}
96
97impl ScanRecord {
98    const INVALID: i64 = -1;
99
100    pub fn new_default(row: ColumnarRow) -> Self {
101        ScanRecord {
102            row,
103            offset: Self::INVALID,
104            timestamp: Self::INVALID,
105            change_type: ChangeType::Insert,
106        }
107    }
108
109    pub fn new(row: ColumnarRow, offset: i64, timestamp: i64, change_type: ChangeType) -> Self {
110        ScanRecord {
111            row,
112            offset,
113            timestamp,
114            change_type,
115        }
116    }
117
118    pub fn row(&self) -> &ColumnarRow {
119        &self.row
120    }
121
122    /// Returns the position in the log
123    pub fn offset(&self) -> i64 {
124        self.offset
125    }
126
127    /// Returns the timestamp
128    pub fn timestamp(&self) -> i64 {
129        self.timestamp
130    }
131
132    /// Returns the change type
133    pub fn change_type(&self) -> &ChangeType {
134        &self.change_type
135    }
136}
137
138pub struct ScanRecords {
139    records: HashMap<TableBucket, Vec<ScanRecord>>,
140}
141
142impl ScanRecords {
143    pub fn empty() -> Self {
144        Self {
145            records: HashMap::new(),
146        }
147    }
148
149    pub fn new(records: HashMap<TableBucket, Vec<ScanRecord>>) -> Self {
150        Self { records }
151    }
152
153    pub fn records(&self, scan_bucket: &TableBucket) -> &[ScanRecord] {
154        self.records.get(scan_bucket).map_or(&[], |records| records)
155    }
156
157    pub fn count(&self) -> usize {
158        self.records.values().map(|v| v.len()).sum()
159    }
160
161    pub fn is_empty(&self) -> bool {
162        self.records.is_empty()
163    }
164
165    pub fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>> {
166        &self.records
167    }
168
169    pub fn into_records_by_buckets(self) -> HashMap<TableBucket, Vec<ScanRecord>> {
170        self.records
171    }
172}
173
174/// A batch of records with metadata about bucket and offsets.
175///
176/// This is the batch-level equivalent of [`ScanRecord`], providing efficient
177/// access to Arrow RecordBatches while preserving the bucket and offset information
178/// needed for tracking consumption progress.
179#[derive(Debug, Clone)]
180pub struct ScanBatch {
181    /// The bucket this batch belongs to
182    bucket: TableBucket,
183    /// The Arrow RecordBatch containing the data
184    batch: RecordBatch,
185    /// Offset of the first record in this batch
186    base_offset: i64,
187}
188
189impl ScanBatch {
190    pub fn new(bucket: TableBucket, batch: RecordBatch, base_offset: i64) -> Self {
191        Self {
192            bucket,
193            batch,
194            base_offset,
195        }
196    }
197
198    pub fn bucket(&self) -> &TableBucket {
199        &self.bucket
200    }
201
202    pub fn batch(&self) -> &RecordBatch {
203        &self.batch
204    }
205
206    pub fn into_batch(self) -> RecordBatch {
207        self.batch
208    }
209
210    pub fn base_offset(&self) -> i64 {
211        self.base_offset
212    }
213
214    pub fn num_records(&self) -> usize {
215        self.batch.num_rows()
216    }
217
218    /// Returns the offset of the last record in this batch.
219    pub fn last_offset(&self) -> i64 {
220        if self.batch.num_rows() == 0 {
221            self.base_offset - 1
222        } else {
223            self.base_offset + self.batch.num_rows() as i64 - 1
224        }
225    }
226}
227
228impl IntoIterator for ScanRecords {
229    type Item = ScanRecord;
230    type IntoIter = std::vec::IntoIter<ScanRecord>;
231
232    fn into_iter(self) -> Self::IntoIter {
233        self.records
234            .into_values()
235            .flatten()
236            .collect::<Vec<_>>()
237            .into_iter()
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use ::arrow::array::{Int32Array, RecordBatch};
245    use ::arrow::datatypes::{DataType, Field, Schema};
246    use std::sync::Arc;
247
248    fn make_row(values: Vec<i32>, row_id: usize) -> ColumnarRow {
249        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
250        let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))])
251            .expect("record batch");
252        ColumnarRow::new_with_row_id(Arc::new(batch), row_id)
253    }
254
255    #[test]
256    fn change_type_round_trip() {
257        let cases = [
258            (ChangeType::AppendOnly, "+A", 0),
259            (ChangeType::Insert, "+I", 1),
260            (ChangeType::UpdateBefore, "-U", 2),
261            (ChangeType::UpdateAfter, "+U", 3),
262            (ChangeType::Delete, "-D", 4),
263        ];
264
265        for (change_type, short, byte) in cases {
266            assert_eq!(change_type.short_string(), short);
267            assert_eq!(change_type.to_byte_value(), byte);
268            assert_eq!(ChangeType::from_byte_value(byte).unwrap(), change_type);
269        }
270
271        let err = ChangeType::from_byte_value(9).unwrap_err();
272        assert!(err.contains("Unsupported byte value"));
273    }
274
275    #[test]
276    fn scan_records_counts_and_iterates() {
277        let bucket0 = TableBucket::new(1, 0);
278        let bucket1 = TableBucket::new(1, 1);
279        let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, ChangeType::Insert);
280        let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, ChangeType::Delete);
281
282        let mut records = HashMap::new();
283        records.insert(bucket0.clone(), vec![record0.clone(), record1.clone()]);
284
285        let scan_records = ScanRecords::new(records);
286        assert_eq!(scan_records.records(&bucket0).len(), 2);
287        assert!(scan_records.records(&bucket1).is_empty());
288        assert_eq!(scan_records.count(), 2);
289
290        let collected: Vec<_> = scan_records.into_iter().collect();
291        assert_eq!(collected.len(), 2);
292    }
293
294    #[test]
295    fn scan_record_default_values() {
296        let record = ScanRecord::new_default(make_row(vec![1], 0));
297        assert_eq!(record.offset(), -1);
298        assert_eq!(record.timestamp(), -1);
299        assert_eq!(record.change_type(), &ChangeType::Insert);
300    }
301
302    #[test]
303    fn scan_batch_last_offset() {
304        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
305        let bucket = TableBucket::new(1, 0);
306
307        // Batch with 3 records starting at offset 100 -> last_offset = 102
308        let batch = RecordBatch::try_new(
309            schema.clone(),
310            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
311        )
312        .unwrap();
313        let scan_batch = ScanBatch::new(bucket.clone(), batch, 100);
314        assert_eq!(scan_batch.num_records(), 3);
315        assert_eq!(scan_batch.last_offset(), 102);
316
317        // Empty batch -> last_offset = base_offset - 1
318        let empty_batch = RecordBatch::new_empty(schema);
319        let empty_scan_batch = ScanBatch::new(bucket, empty_batch, 100);
320        assert_eq!(empty_scan_batch.num_records(), 0);
321        assert_eq!(empty_scan_batch.last_offset(), 99);
322    }
323}