Skip to main content

fluss/row/compacted/
compacted_row_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::row::Decimal;
19use crate::row::binary::BinaryWriter;
20use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
21use crate::util::varint::{write_unsigned_varint_to_slice, write_unsigned_varint_u64_to_slice};
22use bytes::{Bytes, BytesMut};
23use std::cmp;
24
25// Writer for CompactedRow
26// Reference implementation:
27// https://github.com/apache/fluss/blob/d4a72fad240d4b81563aaf83fa3b09b5058674ed/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java#L71
28#[allow(dead_code)]
29pub struct CompactedRowWriter {
30    header_size_in_bytes: usize,
31    position: usize,
32    buffer: BytesMut,
33}
34
35#[allow(dead_code)]
36impl CompactedRowWriter {
37    pub const MAX_INT_SIZE: usize = 5;
38    pub const MAX_LONG_SIZE: usize = 10;
39
40    pub fn new(field_count: usize) -> Self {
41        let header_size = calculate_bit_set_width_in_bytes(field_count);
42        let cap = cmp::max(64, header_size);
43
44        let mut buffer = BytesMut::with_capacity(cap);
45        buffer.resize(cap, 0);
46
47        Self {
48            header_size_in_bytes: header_size,
49            position: header_size,
50            buffer,
51        }
52    }
53
54    pub fn position(&self) -> usize {
55        self.position
56    }
57
58    pub fn buffer(&self) -> &[u8] {
59        &self.buffer[..self.position]
60    }
61
62    pub fn to_bytes(&self) -> Bytes {
63        Bytes::copy_from_slice(&self.buffer[..self.position])
64    }
65
66    /// Flushes writer's ByteMut, resetting writer's inner state and returns Byte of flushed state
67    pub fn flush_bytes(&mut self) -> Bytes {
68        let used = self.buffer.split_to(self.position);
69        self.position = self.header_size_in_bytes;
70        if self.buffer.len() < self.header_size_in_bytes {
71            self.buffer.resize(self.header_size_in_bytes.max(64), 0);
72        } else {
73            self.buffer[..self.header_size_in_bytes].fill(0);
74        }
75        used.freeze()
76    }
77
78    fn ensure_capacity(&mut self, need_len: usize) {
79        if (self.buffer.len() - self.position) < need_len {
80            let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + need_len);
81            self.buffer.resize(new_len, 0);
82        }
83    }
84
85    fn write_raw(&mut self, src: &[u8]) {
86        let end = self.position + src.len();
87        self.ensure_capacity(src.len());
88        self.buffer[self.position..end].copy_from_slice(src);
89        self.position = end;
90    }
91}
92
93impl BinaryWriter for CompactedRowWriter {
94    fn reset(&mut self) {
95        self.position = self.header_size_in_bytes;
96        self.buffer[..self.header_size_in_bytes].fill(0);
97    }
98
99    fn set_null_at(&mut self, pos: usize) {
100        let byte_index = pos >> 3;
101        let bit = pos & 7;
102        debug_assert!(byte_index < self.header_size_in_bytes);
103        self.buffer[byte_index] |= 1u8 << bit;
104    }
105
106    fn write_boolean(&mut self, value: bool) {
107        let b = if value { 1u8 } else { 0u8 };
108        self.write_raw(&[b])
109    }
110
111    fn write_byte(&mut self, value: u8) {
112        self.write_raw(&[value])
113    }
114
115    fn write_bytes(&mut self, value: &[u8]) {
116        let len_i32 = i32::try_from(value.len())
117            .expect("Byte slice too large to encode length as i32: exceeds i32::MAX");
118        self.write_int(len_i32);
119        self.write_raw(value)
120    }
121
122    fn write_char(&mut self, value: &str, _length: usize) {
123        // TODO: currently, we encoding CHAR(length) as the same with STRING, the length info can be
124        //  omitted and the bytes length should be enforced in the future.
125        self.write_string(value)
126    }
127
128    fn write_string(&mut self, value: &str) {
129        self.write_bytes(value.as_ref())
130    }
131
132    fn write_short(&mut self, value: i16) {
133        // Use native endianness to match Java's UnsafeUtils.putShort behavior
134        // Java uses sun.misc.Unsafe which writes in native byte order (typically LE on x86/ARM)
135        self.write_raw(&value.to_ne_bytes())
136    }
137
138    fn write_int(&mut self, value: i32) {
139        self.ensure_capacity(Self::MAX_INT_SIZE);
140        let bytes_written =
141            write_unsigned_varint_to_slice(value as u32, &mut self.buffer[self.position..]);
142        self.position += bytes_written;
143    }
144
145    fn write_long(&mut self, value: i64) {
146        self.ensure_capacity(Self::MAX_LONG_SIZE);
147        let bytes_written =
148            write_unsigned_varint_u64_to_slice(value as u64, &mut self.buffer[self.position..]);
149        self.position += bytes_written;
150    }
151
152    fn write_float(&mut self, value: f32) {
153        // Use native endianness to match Java's UnsafeUtils.putFloat behavior
154        self.write_raw(&value.to_ne_bytes())
155    }
156
157    fn write_double(&mut self, value: f64) {
158        // Use native endianness to match Java's UnsafeUtils.putDouble behavior
159        self.write_raw(&value.to_ne_bytes())
160    }
161
162    fn write_binary(&mut self, bytes: &[u8], length: usize) {
163        // TODO: currently, we encoding BINARY(length) as the same with BYTES, the length info can
164        //  be omitted and the bytes length should be enforced in the future.
165        self.write_bytes(&bytes[..length.min(bytes.len())])
166    }
167
168    fn complete(&mut self) {
169        // do nothing
170    }
171
172    fn write_decimal(&mut self, value: &Decimal, precision: u32) {
173        // Decimal is already validated and rescaled during construction.
174        // Just serialize the precomputed unscaled representation.
175        if Decimal::is_compact_precision(precision) {
176            self.write_long(
177                value
178                    .to_unscaled_long()
179                    .expect("Decimal should fit in i64 for compact precision"),
180            )
181        } else {
182            self.write_bytes(&value.to_unscaled_bytes())
183        }
184    }
185
186    fn write_time(&mut self, value: i32, _precision: u32) {
187        // TIME is always encoded as i32 (milliseconds since midnight) regardless of precision
188        self.write_int(value)
189    }
190
191    fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz, precision: u32) {
192        if crate::row::datum::TimestampNtz::is_compact(precision) {
193            // Compact: write only milliseconds
194            self.write_long(value.get_millisecond());
195        } else {
196            // Non-compact: write milliseconds + nanoOfMillisecond
197            self.write_long(value.get_millisecond());
198            self.write_int(value.get_nano_of_millisecond());
199        }
200    }
201
202    fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32) {
203        if crate::row::datum::TimestampLtz::is_compact(precision) {
204            // Compact: write only epoch milliseconds
205            self.write_long(value.get_epoch_millisecond());
206        } else {
207            // Non-compact: write epoch milliseconds + nanoOfMillisecond
208            self.write_long(value.get_epoch_millisecond());
209            self.write_int(value.get_nano_of_millisecond());
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use bigdecimal::{BigDecimal, num_bigint::BigInt};
218
219    #[test]
220    fn test_write_decimal_compact() {
221        // Compact decimal (precision <= 18)
222        let bd = BigDecimal::new(BigInt::from(12345), 2); // 123.45
223        let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
224
225        let mut w = CompactedRowWriter::new(1);
226        w.write_decimal(&decimal, 10);
227
228        let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
229            w.buffer(),
230            w.header_size_in_bytes,
231            CompactedRowWriter::MAX_LONG_SIZE,
232        )
233        .unwrap();
234        assert_eq!(val as i64, 12345);
235    }
236
237    #[test]
238    fn test_write_decimal_rounding() {
239        // Test HALF_UP rounding: 12.345 → 12.35
240        let bd = BigDecimal::new(BigInt::from(12345), 3);
241        let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
242
243        let mut w = CompactedRowWriter::new(1);
244        w.write_decimal(&decimal, 10);
245
246        let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
247            w.buffer(),
248            w.header_size_in_bytes,
249            CompactedRowWriter::MAX_LONG_SIZE,
250        )
251        .unwrap();
252        assert_eq!(val as i64, 1235); // 12.35 with scale 2
253    }
254
255    #[test]
256    fn test_write_decimal_non_compact() {
257        // Non-compact (precision > 18): uses byte array
258        let bd = BigDecimal::new(BigInt::from(12345), 0);
259        let decimal = Decimal::from_big_decimal(bd, 28, 0).unwrap();
260
261        let mut w = CompactedRowWriter::new(1);
262        w.write_decimal(&decimal, 28);
263
264        // Verify something was written (at least length varint + some bytes)
265        assert!(w.position() > w.header_size_in_bytes);
266    }
267}