use super::{
Column,
ColumnRef,
ColumnTyped,
};
use crate::{
types::{
ToType,
Type,
},
Error,
Result,
};
use bytes::{
Buf,
BufMut,
BytesMut,
};
use std::sync::Arc;
pub trait FixedSize: Sized + Clone + Send + Sync + 'static {
fn read_from(buffer: &mut &[u8]) -> Result<Self>;
fn write_to(&self, buffer: &mut BytesMut);
}
macro_rules! impl_fixed_size {
($type:ty, $get:ident, $put:ident) => {
impl FixedSize for $type {
fn read_from(buffer: &mut &[u8]) -> Result<Self> {
if buffer.len() < std::mem::size_of::<$type>() {
return Err(Error::Protocol(
"Buffer underflow".to_string(),
));
}
Ok(buffer.$get())
}
fn write_to(&self, buffer: &mut BytesMut) {
buffer.$put(*self);
}
}
};
}
impl_fixed_size!(u8, get_u8, put_u8);
impl_fixed_size!(u16, get_u16_le, put_u16_le);
impl_fixed_size!(u32, get_u32_le, put_u32_le);
impl_fixed_size!(u64, get_u64_le, put_u64_le);
impl_fixed_size!(i8, get_i8, put_i8);
impl_fixed_size!(i16, get_i16_le, put_i16_le);
impl_fixed_size!(i32, get_i32_le, put_i32_le);
impl_fixed_size!(i64, get_i64_le, put_i64_le);
impl_fixed_size!(f32, get_f32_le, put_f32_le);
impl_fixed_size!(f64, get_f64_le, put_f64_le);
impl FixedSize for i128 {
fn read_from(buffer: &mut &[u8]) -> Result<Self> {
if buffer.len() < 16 {
return Err(Error::Protocol("Buffer underflow".to_string()));
}
Ok(buffer.get_i128_le())
}
fn write_to(&self, buffer: &mut BytesMut) {
buffer.put_i128_le(*self);
}
}
impl FixedSize for u128 {
fn read_from(buffer: &mut &[u8]) -> Result<Self> {
if buffer.len() < 16 {
return Err(Error::Protocol("Buffer underflow".to_string()));
}
Ok(buffer.get_u128_le())
}
fn write_to(&self, buffer: &mut BytesMut) {
buffer.put_u128_le(*self);
}
}
pub struct ColumnVector<T: FixedSize> {
type_: Type,
data: Vec<T>,
}
impl<T: FixedSize + Clone + Send + Sync + 'static> ColumnVector<T> {
pub fn from_vec(type_: Type, data: Vec<T>) -> Self {
Self { type_, data }
}
pub fn with_data(mut self, data: Vec<T>) -> Self {
self.data = data;
self
}
pub fn reserve(&mut self, additional: usize) {
self.data.reserve(additional);
}
pub fn clear(&mut self) {
self.data.clear();
}
pub fn get(&self, index: usize) -> Option<&T> {
self.data.get(index)
}
pub fn at(&self, index: usize) -> T {
self.data[index].clone()
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn append(&mut self, value: T) {
self.data.push(value);
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.data.iter()
}
pub fn data(&self) -> &[T] {
&self.data
}
pub fn data_mut(&mut self) -> &mut Vec<T> {
&mut self.data
}
}
impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> ColumnVector<T> {
pub fn new() -> Self {
Self { type_: T::to_type(), data: Vec::new() }
}
pub fn with_capacity(capacity: usize) -> Self {
Self { type_: T::to_type(), data: Vec::with_capacity(capacity) }
}
}
impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> Default
for ColumnVector<T>
{
fn default() -> Self {
Self::new()
}
}
impl<T: FixedSize + ToType> Column for ColumnVector<T> {
fn column_type(&self) -> &Type {
&self.type_
}
fn size(&self) -> usize {
self.data.len()
}
fn clear(&mut self) {
self.data.clear()
}
fn reserve(&mut self, new_cap: usize) {
self.data.reserve(new_cap);
}
fn append_column(&mut self, other: ColumnRef) -> Result<()> {
let other = other
.as_any()
.downcast_ref::<ColumnVector<T>>()
.ok_or_else(|| Error::TypeMismatch {
expected: self.type_.name(),
actual: other.column_type().name(),
})?;
self.data.extend_from_slice(&other.data);
Ok(())
}
fn load_from_buffer(
&mut self,
buffer: &mut &[u8],
rows: usize,
) -> Result<()> {
let bytes_needed = rows * std::mem::size_of::<T>();
if buffer.len() < bytes_needed {
return Err(Error::Protocol(format!(
"Buffer underflow: need {} bytes, have {}",
bytes_needed,
buffer.len()
)));
}
self.data.clear();
self.data.reserve(rows);
unsafe {
let dest_ptr = self.data.as_mut_ptr() as *mut u8;
std::ptr::copy_nonoverlapping(
buffer.as_ptr(),
dest_ptr,
bytes_needed,
);
self.data.set_len(rows);
}
*buffer = &buffer[bytes_needed..];
Ok(())
}
fn save_to_buffer(&self, buffer: &mut BytesMut) -> Result<()> {
if !self.data.is_empty() {
let byte_slice = unsafe {
std::slice::from_raw_parts(
self.data.as_ptr() as *const u8,
self.data.len() * std::mem::size_of::<T>(),
)
};
buffer.extend_from_slice(byte_slice);
}
Ok(())
}
fn clone_empty(&self) -> ColumnRef {
Arc::new(ColumnVector::<T>::new())
}
fn slice(&self, begin: usize, len: usize) -> Result<ColumnRef> {
if begin + len > self.data.len() {
return Err(Error::InvalidArgument(format!(
"Slice range out of bounds: begin={}, len={}, size={}",
begin,
len,
self.data.len()
)));
}
let sliced_data = self.data[begin..begin + len].to_vec();
Ok(Arc::new(ColumnVector::<T>::from_vec(
self.type_.clone(),
sliced_data,
)))
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}
impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> ColumnTyped<T>
for ColumnVector<T>
{
fn get(&self, index: usize) -> Option<T> {
self.data.get(index).cloned()
}
fn append(&mut self, value: T) {
self.data.push(value);
}
}
pub type ColumnUInt8 = ColumnVector<u8>;
pub type ColumnUInt16 = ColumnVector<u16>;
pub type ColumnUInt32 = ColumnVector<u32>;
pub type ColumnUInt64 = ColumnVector<u64>;
pub type ColumnUInt128 = ColumnVector<u128>;
pub type ColumnInt8 = ColumnVector<i8>;
pub type ColumnInt16 = ColumnVector<i16>;
pub type ColumnInt32 = ColumnVector<i32>;
pub type ColumnInt64 = ColumnVector<i64>;
pub type ColumnInt128 = ColumnVector<i128>;
pub type ColumnFloat32 = ColumnVector<f32>;
pub type ColumnFloat64 = ColumnVector<f64>;
pub type ColumnDate = ColumnVector<u16>;
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
#[test]
fn test_column_creation() {
let col = ColumnUInt32::new();
assert_eq!(col.size(), 0);
assert_eq!(col.column_type().name(), "UInt32");
let col2 = ColumnUInt32::with_capacity(100);
assert_eq!(col2.size(), 0);
assert_eq!(col2.column_type().name(), "UInt32");
}
#[test]
fn test_column_append() {
let mut col = ColumnUInt32::new();
col.append(42);
col.append(100);
assert_eq!(col.size(), 2);
assert_eq!(col.get(0), Some(&42));
assert_eq!(col.get(1), Some(&100));
}
#[test]
fn test_column_clear() {
let mut col = ColumnInt64::new();
col.append(-123);
col.append(456);
assert_eq!(col.size(), 2);
col.clear();
assert_eq!(col.size(), 0);
}
#[test]
fn test_column_slice() {
let mut col = ColumnUInt64::new();
for i in 0..10 {
col.append(i);
}
let sliced = col.slice(2, 5).unwrap();
assert_eq!(sliced.size(), 5);
let sliced_concrete =
sliced.as_any().downcast_ref::<ColumnUInt64>().unwrap();
assert_eq!(sliced_concrete.get(0), Some(&2));
assert_eq!(sliced_concrete.get(4), Some(&6));
}
#[test]
fn test_column_save_load() {
let mut col = ColumnInt32::new();
col.append(1);
col.append(-2);
col.append(3);
let mut buf = BytesMut::new();
col.save_to_buffer(&mut buf).unwrap();
let mut col2 = ColumnInt32::new();
let mut reader = &buf[..];
col2.load_from_buffer(&mut reader, 3).unwrap();
assert_eq!(col2.size(), 3);
assert_eq!(col2.get(0), Some(&1));
assert_eq!(col2.get(1), Some(&-2));
assert_eq!(col2.get(2), Some(&3));
}
#[test]
fn test_column_append_column() {
let mut col1 = ColumnFloat64::new();
col1.append(1.5);
col1.append(2.5);
let mut col2 = ColumnFloat64::new();
col2.append(3.5);
col2.append(4.5);
col1.append_column(Arc::new(col2)).unwrap();
assert_eq!(col1.size(), 4);
assert_eq!(col1.get(0), Some(&1.5));
assert_eq!(col1.get(3), Some(&4.5));
}
#[test]
fn test_bulk_load_large_dataset() {
let mut col = ColumnUInt64::new();
let data: Vec<u64> = (0..10_000).collect();
let mut buf = BytesMut::new();
for &val in &data {
buf.put_u64_le(val);
}
let mut reader = &buf[..];
col.load_from_buffer(&mut reader, 10_000).unwrap();
assert_eq!(col.size(), 10_000);
assert_eq!(col.get(0), Some(&0));
assert_eq!(col.get(5_000), Some(&5_000));
assert_eq!(col.get(9_999), Some(&9_999));
}
#[test]
fn test_bulk_load_multiple_sequential() {
let mut col = ColumnInt32::new();
let mut buf1 = BytesMut::new();
for i in 0..1_000 {
buf1.put_i32_le(i);
}
let mut reader1 = &buf1[..];
col.load_from_buffer(&mut reader1, 1_000).unwrap();
assert_eq!(col.size(), 1_000);
assert_eq!(col.get(0), Some(&0));
assert_eq!(col.get(999), Some(&999));
let mut buf2 = BytesMut::new();
for i in 1_000..2_000 {
buf2.put_i32_le(i);
}
let mut reader2 = &buf2[..];
col.load_from_buffer(&mut reader2, 1_000).unwrap();
assert_eq!(col.size(), 1_000); assert_eq!(col.get(0), Some(&1_000));
assert_eq!(col.get(999), Some(&1_999));
}
#[test]
fn test_bulk_load_empty() {
let mut col = ColumnUInt32::new();
let buf = BytesMut::new();
let mut reader = &buf[..];
col.load_from_buffer(&mut reader, 0).unwrap();
assert_eq!(col.size(), 0);
}
#[test]
fn test_bulk_load_single_element() {
let mut col = ColumnInt64::new();
let mut buf = BytesMut::new();
buf.put_i64_le(42);
let mut reader = &buf[..];
col.load_from_buffer(&mut reader, 1).unwrap();
assert_eq!(col.size(), 1);
assert_eq!(col.get(0), Some(&42));
}
#[test]
fn test_bulk_load_roundtrip_large() {
let mut col1 = ColumnFloat32::new();
for i in 0..5_000 {
col1.append(i as f32 * 1.5);
}
let mut buf = BytesMut::new();
col1.save_to_buffer(&mut buf).unwrap();
let mut col2 = ColumnFloat32::new();
let mut reader = &buf[..];
col2.load_from_buffer(&mut reader, 5_000).unwrap();
assert_eq!(col2.size(), 5_000);
for i in 0..5_000 {
assert_eq!(col2.get(i), Some(&(i as f32 * 1.5)));
}
}
#[test]
fn test_bulk_load_all_numeric_types() {
let mut col_u8 = ColumnUInt8::new();
let mut buf = BytesMut::new();
for i in 0..255u8 {
buf.put_u8(i);
}
let mut reader = &buf[..];
col_u8.load_from_buffer(&mut reader, 255).unwrap();
assert_eq!(col_u8.size(), 255);
let mut col_u16 = ColumnUInt16::new();
let mut buf = BytesMut::new();
for i in 0..1000u16 {
buf.put_u16_le(i);
}
let mut reader = &buf[..];
col_u16.load_from_buffer(&mut reader, 1000).unwrap();
assert_eq!(col_u16.size(), 1000);
let mut col_i8 = ColumnInt8::new();
let mut buf = BytesMut::new();
for i in -127..127i8 {
buf.put_i8(i);
}
let mut reader = &buf[..];
col_i8.load_from_buffer(&mut reader, 254).unwrap();
assert_eq!(col_i8.size(), 254);
let mut col_i16 = ColumnInt16::new();
let mut buf = BytesMut::new();
for i in 0..1000i16 {
buf.put_i16_le(i);
}
let mut reader = &buf[..];
col_i16.load_from_buffer(&mut reader, 1000).unwrap();
assert_eq!(col_i16.size(), 1000);
let mut col_i128 = ColumnInt128::new();
let mut buf = BytesMut::new();
for i in 0..100i128 {
buf.put_i128_le(i);
}
let mut reader = &buf[..];
col_i128.load_from_buffer(&mut reader, 100).unwrap();
assert_eq!(col_i128.size(), 100);
let mut col_u128 = ColumnUInt128::new();
let mut buf = BytesMut::new();
for i in 0..100u128 {
buf.put_u128_le(i);
}
let mut reader = &buf[..];
col_u128.load_from_buffer(&mut reader, 100).unwrap();
assert_eq!(col_u128.size(), 100);
}
#[test]
fn test_bulk_load_memory_safety() {
let mut col = ColumnInt32::new();
let mut buf = BytesMut::new();
let test_values: Vec<i32> =
vec![i32::MIN, -1_000_000, -1, 0, 1, 1_000_000, i32::MAX];
for &val in &test_values {
buf.put_i32_le(val);
}
let mut reader = &buf[..];
col.load_from_buffer(&mut reader, test_values.len()).unwrap();
assert_eq!(col.size(), test_values.len());
for (i, &expected) in test_values.iter().enumerate() {
assert_eq!(
col.get(i),
Some(&expected),
"Value mismatch at index {}",
i
);
}
}
}