use crate::row::Decimal;
use crate::row::binary::BinaryWriter;
use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
use crate::util::varint::{write_unsigned_varint_to_slice, write_unsigned_varint_u64_to_slice};
use bytes::{Bytes, BytesMut};
use std::cmp;
#[allow(dead_code)]
pub struct CompactedRowWriter {
header_size_in_bytes: usize,
position: usize,
buffer: BytesMut,
}
#[allow(dead_code)]
impl CompactedRowWriter {
pub const MAX_INT_SIZE: usize = 5;
pub const MAX_LONG_SIZE: usize = 10;
pub fn new(field_count: usize) -> Self {
let header_size = calculate_bit_set_width_in_bytes(field_count);
let cap = cmp::max(64, header_size);
let mut buffer = BytesMut::with_capacity(cap);
buffer.resize(cap, 0);
Self {
header_size_in_bytes: header_size,
position: header_size,
buffer,
}
}
pub fn position(&self) -> usize {
self.position
}
pub fn buffer(&self) -> &[u8] {
&self.buffer[..self.position]
}
pub fn to_bytes(&self) -> Bytes {
Bytes::copy_from_slice(&self.buffer[..self.position])
}
pub fn flush_bytes(&mut self) -> Bytes {
let used = self.buffer.split_to(self.position);
self.position = self.header_size_in_bytes;
if self.buffer.len() < self.header_size_in_bytes {
self.buffer.resize(self.header_size_in_bytes.max(64), 0);
} else {
self.buffer[..self.header_size_in_bytes].fill(0);
}
used.freeze()
}
fn ensure_capacity(&mut self, need_len: usize) {
if (self.buffer.len() - self.position) < need_len {
let new_len = cmp::max(self.buffer.len() * 2, self.buffer.len() + need_len);
self.buffer.resize(new_len, 0);
}
}
fn write_raw(&mut self, src: &[u8]) {
let end = self.position + src.len();
self.ensure_capacity(src.len());
self.buffer[self.position..end].copy_from_slice(src);
self.position = end;
}
}
impl BinaryWriter for CompactedRowWriter {
fn reset(&mut self) {
self.position = self.header_size_in_bytes;
self.buffer[..self.header_size_in_bytes].fill(0);
}
fn set_null_at(&mut self, pos: usize) {
let byte_index = pos >> 3;
let bit = pos & 7;
debug_assert!(byte_index < self.header_size_in_bytes);
self.buffer[byte_index] |= 1u8 << bit;
}
fn write_boolean(&mut self, value: bool) {
let b = if value { 1u8 } else { 0u8 };
self.write_raw(&[b])
}
fn write_byte(&mut self, value: u8) {
self.write_raw(&[value])
}
fn write_bytes(&mut self, value: &[u8]) {
let len_i32 = i32::try_from(value.len())
.expect("Byte slice too large to encode length as i32: exceeds i32::MAX");
self.write_int(len_i32);
self.write_raw(value)
}
fn write_char(&mut self, value: &str, _length: usize) {
self.write_string(value)
}
fn write_string(&mut self, value: &str) {
self.write_bytes(value.as_ref())
}
fn write_short(&mut self, value: i16) {
self.write_raw(&value.to_ne_bytes())
}
fn write_int(&mut self, value: i32) {
self.ensure_capacity(Self::MAX_INT_SIZE);
let bytes_written =
write_unsigned_varint_to_slice(value as u32, &mut self.buffer[self.position..]);
self.position += bytes_written;
}
fn write_long(&mut self, value: i64) {
self.ensure_capacity(Self::MAX_LONG_SIZE);
let bytes_written =
write_unsigned_varint_u64_to_slice(value as u64, &mut self.buffer[self.position..]);
self.position += bytes_written;
}
fn write_float(&mut self, value: f32) {
self.write_raw(&value.to_ne_bytes())
}
fn write_double(&mut self, value: f64) {
self.write_raw(&value.to_ne_bytes())
}
fn write_binary(&mut self, bytes: &[u8], length: usize) {
self.write_bytes(&bytes[..length.min(bytes.len())])
}
fn complete(&mut self) {
}
fn write_decimal(&mut self, value: &Decimal, precision: u32) {
if Decimal::is_compact_precision(precision) {
self.write_long(
value
.to_unscaled_long()
.expect("Decimal should fit in i64 for compact precision"),
)
} else {
self.write_bytes(&value.to_unscaled_bytes())
}
}
fn write_time(&mut self, value: i32, _precision: u32) {
self.write_int(value)
}
fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz, precision: u32) {
if crate::row::datum::TimestampNtz::is_compact(precision) {
self.write_long(value.get_millisecond());
} else {
self.write_long(value.get_millisecond());
self.write_int(value.get_nano_of_millisecond());
}
}
fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32) {
if crate::row::datum::TimestampLtz::is_compact(precision) {
self.write_long(value.get_epoch_millisecond());
} else {
self.write_long(value.get_epoch_millisecond());
self.write_int(value.get_nano_of_millisecond());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bigdecimal::{BigDecimal, num_bigint::BigInt};
#[test]
fn test_write_decimal_compact() {
let bd = BigDecimal::new(BigInt::from(12345), 2); let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
let mut w = CompactedRowWriter::new(1);
w.write_decimal(&decimal, 10);
let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
w.buffer(),
w.header_size_in_bytes,
CompactedRowWriter::MAX_LONG_SIZE,
)
.unwrap();
assert_eq!(val as i64, 12345);
}
#[test]
fn test_write_decimal_rounding() {
let bd = BigDecimal::new(BigInt::from(12345), 3);
let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
let mut w = CompactedRowWriter::new(1);
w.write_decimal(&decimal, 10);
let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
w.buffer(),
w.header_size_in_bytes,
CompactedRowWriter::MAX_LONG_SIZE,
)
.unwrap();
assert_eq!(val as i64, 1235); }
#[test]
fn test_write_decimal_non_compact() {
let bd = BigDecimal::new(BigInt::from(12345), 0);
let decimal = Decimal::from_big_decimal(bd, 28, 0).unwrap();
let mut w = CompactedRowWriter::new(1);
w.write_decimal(&decimal, 28);
assert!(w.position() > w.header_size_in_bytes);
}
}