fluss/row/compacted/
compacted_row_writer.rs1use crate::row::Decimal;
19use crate::row::binary::BinaryWriter;
20use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
21use crate::util::varint::{write_unsigned_varint_to_slice, write_unsigned_varint_u64_to_slice};
22use bytes::{Bytes, BytesMut};
23use std::cmp;
24
25#[allow(dead_code)]
29pub struct CompactedRowWriter {
30 header_size_in_bytes: usize,
31 position: usize,
32 buffer: BytesMut,
33}
34
35#[allow(dead_code)]
36impl CompactedRowWriter {
37 pub const MAX_INT_SIZE: usize = 5;
38 pub const MAX_LONG_SIZE: usize = 10;
39
40 pub fn new(field_count: usize) -> Self {
41 let header_size = calculate_bit_set_width_in_bytes(field_count);
42 let cap = cmp::max(64, header_size);
43
44 let mut buffer = BytesMut::with_capacity(cap);
45 buffer.resize(cap, 0);
46
47 Self {
48 header_size_in_bytes: header_size,
49 position: header_size,
50 buffer,
51 }
52 }
53
54 pub fn position(&self) -> usize {
55 self.position
56 }
57
58 pub fn buffer(&self) -> &[u8] {
59 &self.buffer[..self.position]
60 }
61
62 pub fn to_bytes(&self) -> Bytes {
63 Bytes::copy_from_slice(&self.buffer[..self.position])
64 }
65
66 pub fn flush_bytes(&mut self) -> Bytes {
68 let used = self.buffer.split_to(self.position);
69 self.position = self.header_size_in_bytes;
70 if self.buffer.len() < self.header_size_in_bytes {
71 self.buffer.resize(self.header_size_in_bytes.max(64), 0);
72 } else {
73 self.buffer[..self.header_size_in_bytes].fill(0);
74 }
75 used.freeze()
76 }
77
78 fn ensure_capacity(&mut self, need_len: usize) {
79 if (self.buffer.len() - self.position) < need_len {
80 let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + need_len);
81 self.buffer.resize(new_len, 0);
82 }
83 }
84
85 fn write_raw(&mut self, src: &[u8]) {
86 let end = self.position + src.len();
87 self.ensure_capacity(src.len());
88 self.buffer[self.position..end].copy_from_slice(src);
89 self.position = end;
90 }
91}
92
93impl BinaryWriter for CompactedRowWriter {
94 fn reset(&mut self) {
95 self.position = self.header_size_in_bytes;
96 self.buffer[..self.header_size_in_bytes].fill(0);
97 }
98
99 fn set_null_at(&mut self, pos: usize) {
100 let byte_index = pos >> 3;
101 let bit = pos & 7;
102 debug_assert!(byte_index < self.header_size_in_bytes);
103 self.buffer[byte_index] |= 1u8 << bit;
104 }
105
106 fn write_boolean(&mut self, value: bool) {
107 let b = if value { 1u8 } else { 0u8 };
108 self.write_raw(&[b])
109 }
110
111 fn write_byte(&mut self, value: u8) {
112 self.write_raw(&[value])
113 }
114
115 fn write_bytes(&mut self, value: &[u8]) {
116 let len_i32 = i32::try_from(value.len())
117 .expect("Byte slice too large to encode length as i32: exceeds i32::MAX");
118 self.write_int(len_i32);
119 self.write_raw(value)
120 }
121
122 fn write_char(&mut self, value: &str, _length: usize) {
123 self.write_string(value)
126 }
127
128 fn write_string(&mut self, value: &str) {
129 self.write_bytes(value.as_ref())
130 }
131
132 fn write_short(&mut self, value: i16) {
133 self.write_raw(&value.to_ne_bytes())
136 }
137
138 fn write_int(&mut self, value: i32) {
139 self.ensure_capacity(Self::MAX_INT_SIZE);
140 let bytes_written =
141 write_unsigned_varint_to_slice(value as u32, &mut self.buffer[self.position..]);
142 self.position += bytes_written;
143 }
144
145 fn write_long(&mut self, value: i64) {
146 self.ensure_capacity(Self::MAX_LONG_SIZE);
147 let bytes_written =
148 write_unsigned_varint_u64_to_slice(value as u64, &mut self.buffer[self.position..]);
149 self.position += bytes_written;
150 }
151
152 fn write_float(&mut self, value: f32) {
153 self.write_raw(&value.to_ne_bytes())
155 }
156
157 fn write_double(&mut self, value: f64) {
158 self.write_raw(&value.to_ne_bytes())
160 }
161
162 fn write_binary(&mut self, bytes: &[u8], length: usize) {
163 self.write_bytes(&bytes[..length.min(bytes.len())])
166 }
167
168 fn complete(&mut self) {
169 }
171
172 fn write_decimal(&mut self, value: &Decimal, precision: u32) {
173 if Decimal::is_compact_precision(precision) {
176 self.write_long(
177 value
178 .to_unscaled_long()
179 .expect("Decimal should fit in i64 for compact precision"),
180 )
181 } else {
182 self.write_bytes(&value.to_unscaled_bytes())
183 }
184 }
185
186 fn write_time(&mut self, value: i32, _precision: u32) {
187 self.write_int(value)
189 }
190
191 fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz, precision: u32) {
192 if crate::row::datum::TimestampNtz::is_compact(precision) {
193 self.write_long(value.get_millisecond());
195 } else {
196 self.write_long(value.get_millisecond());
198 self.write_int(value.get_nano_of_millisecond());
199 }
200 }
201
202 fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32) {
203 if crate::row::datum::TimestampLtz::is_compact(precision) {
204 self.write_long(value.get_epoch_millisecond());
206 } else {
207 self.write_long(value.get_epoch_millisecond());
209 self.write_int(value.get_nano_of_millisecond());
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use bigdecimal::{BigDecimal, num_bigint::BigInt};
218
219 #[test]
220 fn test_write_decimal_compact() {
221 let bd = BigDecimal::new(BigInt::from(12345), 2); let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
224
225 let mut w = CompactedRowWriter::new(1);
226 w.write_decimal(&decimal, 10);
227
228 let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
229 w.buffer(),
230 w.header_size_in_bytes,
231 CompactedRowWriter::MAX_LONG_SIZE,
232 )
233 .unwrap();
234 assert_eq!(val as i64, 12345);
235 }
236
237 #[test]
238 fn test_write_decimal_rounding() {
239 let bd = BigDecimal::new(BigInt::from(12345), 3);
241 let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
242
243 let mut w = CompactedRowWriter::new(1);
244 w.write_decimal(&decimal, 10);
245
246 let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
247 w.buffer(),
248 w.header_size_in_bytes,
249 CompactedRowWriter::MAX_LONG_SIZE,
250 )
251 .unwrap();
252 assert_eq!(val as i64, 1235); }
254
255 #[test]
256 fn test_write_decimal_non_compact() {
257 let bd = BigDecimal::new(BigInt::from(12345), 0);
259 let decimal = Decimal::from_big_decimal(bd, 28, 0).unwrap();
260
261 let mut w = CompactedRowWriter::new(1);
262 w.write_decimal(&decimal, 28);
263
264 assert!(w.position() > w.header_size_in_bytes);
266 }
267}