use super::{
Column,
ColumnRef,
};
use crate::{
types::Type,
Error,
Result,
};
use bytes::{
Buf,
BufMut,
BytesMut,
};
use std::{
collections::HashMap,
sync::Arc,
};
use super::column_value::{
append_column_item,
compute_hash_key,
get_column_item,
ColumnValue,
};
pub struct ColumnLowCardinality {
type_: Type,
dictionary: ColumnRef, indices: Vec<u64>, unique_map: HashMap<(u64, u64), u64>,
}
impl ColumnLowCardinality {
pub fn new(type_: Type) -> Self {
let dictionary_type = match &type_ {
Type::LowCardinality { nested_type } => {
nested_type.as_ref().clone()
}
_ => panic!("ColumnLowCardinality requires LowCardinality type"),
};
let dictionary =
crate::io::block_stream::create_column(&dictionary_type)
.expect("Failed to create dictionary column");
Self {
type_,
dictionary,
indices: Vec::new(),
unique_map: HashMap::new(),
}
}
pub fn dictionary<T: Column + 'static>(&self) -> &T {
self.dictionary
.as_any()
.downcast_ref::<T>()
.expect("Failed to downcast dictionary column to requested type")
}
pub fn dictionary_mut<T: Column + 'static>(&mut self) -> &mut T {
Arc::get_mut(&mut self.dictionary)
.expect("Cannot get mutable reference to shared dictionary column")
.as_any_mut()
.downcast_mut::<T>()
.expect("Failed to downcast dictionary column to requested type")
}
pub fn dictionary_ref(&self) -> ColumnRef {
self.dictionary.clone()
}
pub fn dictionary_size(&self) -> usize {
self.dictionary.size()
}
pub fn index_at(&self, index: usize) -> u64 {
self.indices[index]
}
pub fn len(&self) -> usize {
self.indices.len()
}
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
}
pub fn append_unsafe(&mut self, value: &ColumnValue) -> Result<()> {
let hash_key = compute_hash_key(value);
let current_dict_size = self.dictionary.size() as u64;
let index = if let Some(&existing_idx) = self.unique_map.get(&hash_key)
{
existing_idx
} else {
let dict_mut = Arc::get_mut(&mut self.dictionary).ok_or_else(|| {
Error::Protocol(
"Cannot append to shared dictionary - column has multiple references"
.to_string(),
)
})?;
append_column_item(dict_mut, value)?;
self.unique_map.insert(hash_key, current_dict_size);
current_dict_size
};
self.indices.push(index);
Ok(())
}
pub fn append_values<I>(&mut self, values: I) -> Result<()>
where
I: IntoIterator<Item = ColumnValue>,
{
for value in values {
self.append_unsafe(&value)?;
}
Ok(())
}
}
impl Column for ColumnLowCardinality {
fn column_type(&self) -> &Type {
&self.type_
}
fn size(&self) -> usize {
self.indices.len()
}
fn clear(&mut self) {
self.indices.clear();
self.unique_map.clear();
}
fn reserve(&mut self, new_cap: usize) {
let estimated_dict_size = (new_cap as f64).sqrt().ceil() as usize;
if let Some(dict_mut) = Arc::get_mut(&mut self.dictionary) {
dict_mut.reserve(estimated_dict_size);
}
self.indices.reserve(new_cap + 2);
}
fn append_column(&mut self, other: ColumnRef) -> Result<()> {
let other = other
.as_any()
.downcast_ref::<ColumnLowCardinality>()
.ok_or_else(|| Error::TypeMismatch {
expected: self.type_.name(),
actual: other.column_type().name(),
})?;
if self.dictionary.column_type().name()
!= other.dictionary.column_type().name()
{
return Err(Error::TypeMismatch {
expected: self.dictionary.column_type().name(),
actual: other.dictionary.column_type().name(),
});
}
for &other_index in &other.indices {
let value = get_column_item(
other.dictionary.as_ref(),
other_index as usize,
)?;
self.append_unsafe(&value)?;
}
Ok(())
}
fn load_prefix(&mut self, buffer: &mut &[u8], _rows: usize) -> Result<()> {
if buffer.len() < 8 {
return Err(Error::Protocol(
"Not enough data for LowCardinality key version".to_string(),
));
}
let key_version = buffer.get_u64_le();
const SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS: u64 = 1;
if key_version != SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS {
return Err(Error::Protocol(format!(
"Invalid LowCardinality key version: expected {}, got {}",
SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS, key_version
)));
}
Ok(())
}
fn load_from_buffer(
&mut self,
buffer: &mut &[u8],
rows: usize,
) -> Result<()> {
if buffer.len() < 8 {
return Err(Error::Protocol(
"Not enough data for LowCardinality index serialization type"
.to_string(),
));
}
let index_serialization_type = buffer.get_u64_le();
const INDEX_TYPE_MASK: u64 = 0xFF;
const NEED_GLOBAL_DICTIONARY_BIT: u64 = 1 << 8;
const HAS_ADDITIONAL_KEYS_BIT: u64 = 1 << 9;
let index_type = index_serialization_type & INDEX_TYPE_MASK;
if (index_serialization_type & NEED_GLOBAL_DICTIONARY_BIT) != 0 {
return Err(Error::Protocol(
"Global dictionary is not supported".to_string(),
));
}
if (index_serialization_type & HAS_ADDITIONAL_KEYS_BIT) == 0 {
}
if buffer.len() < 8 {
return Err(Error::Protocol(
"Not enough data for dictionary size".to_string(),
));
}
let number_of_keys = buffer.get_u64_le() as usize;
if number_of_keys > 0 {
let dict_mut = Arc::get_mut(&mut self.dictionary).ok_or_else(|| {
Error::Protocol(
"Cannot load into shared dictionary - column has multiple references"
.to_string(),
)
})?;
use super::nullable::ColumnNullable;
if let Some(nullable_col) =
dict_mut.as_any_mut().downcast_mut::<ColumnNullable>()
{
let nested_ref = nullable_col.nested_ref_mut();
let nested_mut = Arc::get_mut(nested_ref)
.ok_or_else(|| {
Error::Protocol(
"Cannot load into shared nested column - column has multiple references"
.to_string(),
)
})?;
nested_mut.load_from_buffer(buffer, number_of_keys)?;
for _ in 0..number_of_keys {
nullable_col.append_non_null();
}
} else {
dict_mut.load_from_buffer(buffer, number_of_keys)?;
}
}
let _number_of_rows = if buffer.len() >= 8 {
let val = buffer.get_u64_le() as usize;
if val != rows {
return Err(Error::Protocol(format!(
"LowCardinality row count mismatch: expected {}, got {}",
rows, val
)));
}
val
} else {
rows
};
self.indices.reserve(rows);
match index_type {
0 => {
for _ in 0..rows {
if buffer.is_empty() {
return Err(Error::Protocol(
"Not enough data for LowCardinality index"
.to_string(),
));
}
let index = buffer.get_u8() as u64;
self.indices.push(index);
}
}
1 => {
for _ in 0..rows {
if buffer.len() < 2 {
return Err(Error::Protocol(
"Not enough data for LowCardinality index"
.to_string(),
));
}
let index = buffer.get_u16_le() as u64;
self.indices.push(index);
}
}
2 => {
for _ in 0..rows {
if buffer.len() < 4 {
return Err(Error::Protocol(
"Not enough data for LowCardinality index"
.to_string(),
));
}
let index = buffer.get_u32_le() as u64;
self.indices.push(index);
}
}
3 => {
for _ in 0..rows {
if buffer.len() < 8 {
return Err(Error::Protocol(
"Not enough data for LowCardinality index"
.to_string(),
));
}
let index = buffer.get_u64_le();
self.indices.push(index);
}
}
_ => {
return Err(Error::Protocol(format!(
"Unknown LowCardinality index type: {}",
index_type
)));
}
}
self.unique_map.clear();
for i in 0..self.dictionary.size() {
let value = get_column_item(self.dictionary.as_ref(), i)?;
let hash_key = compute_hash_key(&value);
self.unique_map.insert(hash_key, i as u64);
}
Ok(())
}
fn save_prefix(&self, buffer: &mut BytesMut) -> Result<()> {
const SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS: u64 = 1;
buffer.put_u64_le(SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS);
Ok(())
}
fn save_to_buffer(&self, buffer: &mut BytesMut) -> Result<()> {
const HAS_ADDITIONAL_KEYS_BIT: u64 = 1 << 9;
const INDEX_TYPE_UINT64: u64 = 3;
let index_serialization_type =
INDEX_TYPE_UINT64 | HAS_ADDITIONAL_KEYS_BIT;
buffer.put_u64_le(index_serialization_type);
buffer.put_u64_le(self.dictionary.size() as u64);
use super::nullable::ColumnNullable;
if let Some(nullable_col) =
self.dictionary.as_any().downcast_ref::<ColumnNullable>()
{
nullable_col.nested_ref().save_to_buffer(buffer)?;
} else {
self.dictionary.save_to_buffer(buffer)?;
}
buffer.put_u64_le(self.indices.len() as u64);
for &index in &self.indices {
buffer.put_u64_le(index);
}
Ok(())
}
fn clone_empty(&self) -> ColumnRef {
Arc::new(ColumnLowCardinality::new(self.type_.clone()))
}
fn slice(&self, begin: usize, len: usize) -> Result<ColumnRef> {
if begin + len > self.indices.len() {
return Err(Error::InvalidArgument(format!(
"Slice out of bounds: begin={}, len={}, size={}",
begin,
len,
self.indices.len()
)));
}
let mut sliced = ColumnLowCardinality::new(self.type_.clone());
for i in begin..begin + len {
let dict_index = self.indices[i] as usize;
let value = get_column_item(self.dictionary.as_ref(), dict_index)?;
sliced.append_unsafe(&value)?;
}
Ok(Arc::new(sliced))
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use crate::types::TypeCode;
#[test]
fn test_lowcardinality_creation() {
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let col = ColumnLowCardinality::new(lc_type);
assert_eq!(col.len(), 0);
assert!(col.is_empty());
assert_eq!(col.dictionary_size(), 0);
}
#[test]
fn test_lowcardinality_empty() {
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::UInt32)),
};
let col = ColumnLowCardinality::new(lc_type);
assert_eq!(col.dictionary_size(), 0);
assert_eq!(col.size(), 0);
}
#[test]
fn test_lowcardinality_slice() {
use crate::column::column_value::ColumnValue;
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col = ColumnLowCardinality::new(lc_type);
col.append_unsafe(&ColumnValue::from_string("a")).unwrap();
col.append_unsafe(&ColumnValue::from_string("b")).unwrap();
col.append_unsafe(&ColumnValue::from_string("c")).unwrap();
col.append_unsafe(&ColumnValue::from_string("b")).unwrap();
col.append_unsafe(&ColumnValue::from_string("a")).unwrap();
assert_eq!(col.len(), 5);
assert_eq!(col.dictionary_size(), 3);
let sliced = col.slice(1, 2).unwrap();
let sliced_col =
sliced.as_any().downcast_ref::<ColumnLowCardinality>().unwrap();
assert_eq!(sliced_col.len(), 2);
assert_eq!(
sliced_col.dictionary_size(),
2,
"Dictionary should be compacted"
);
let val0 = get_column_item(
sliced_col.dictionary.as_ref(),
sliced_col.index_at(0) as usize,
)
.unwrap();
let val1 = get_column_item(
sliced_col.dictionary.as_ref(),
sliced_col.index_at(1) as usize,
)
.unwrap();
assert_eq!(val0.as_string().unwrap(), "b");
assert_eq!(val1.as_string().unwrap(), "c");
}
#[test]
fn test_lowcardinality_slice_memory_efficiency() {
use crate::column::column_value::ColumnValue;
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col = ColumnLowCardinality::new(lc_type);
for i in 0..1000 {
col.append_unsafe(&ColumnValue::from_string(&format!(
"value_{}",
i
)))
.unwrap();
}
assert_eq!(col.dictionary_size(), 1000);
let sliced = col.slice(0, 10).unwrap();
let sliced_col =
sliced.as_any().downcast_ref::<ColumnLowCardinality>().unwrap();
assert_eq!(sliced_col.len(), 10);
assert_eq!(
sliced_col.dictionary_size(),
10,
"Dictionary should be compacted to only referenced items"
);
}
#[test]
fn test_lowcardinality_slice_with_duplicates() {
use crate::column::column_value::ColumnValue;
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col = ColumnLowCardinality::new(lc_type);
col.append_unsafe(&ColumnValue::from_string("x")).unwrap();
col.append_unsafe(&ColumnValue::from_string("y")).unwrap();
col.append_unsafe(&ColumnValue::from_string("z")).unwrap();
col.append_unsafe(&ColumnValue::from_string("x")).unwrap();
col.append_unsafe(&ColumnValue::from_string("x")).unwrap();
col.append_unsafe(&ColumnValue::from_string("z")).unwrap();
assert_eq!(col.dictionary_size(), 3);
let sliced = col.slice(3, 3).unwrap();
let sliced_col =
sliced.as_any().downcast_ref::<ColumnLowCardinality>().unwrap();
assert_eq!(sliced_col.len(), 3);
assert_eq!(
sliced_col.dictionary_size(),
2,
"Only 'x' and 'z' should be in dictionary"
);
assert_eq!(
sliced_col.index_at(0),
sliced_col.index_at(1),
"Duplicate 'x' should use same index"
);
}
#[test]
fn test_lowcardinality_clear() {
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col = ColumnLowCardinality::new(lc_type);
col.indices = vec![0, 1, 2];
col.clear();
assert_eq!(col.len(), 0);
assert!(col.is_empty());
}
#[test]
fn test_lowcardinality_reserve() {
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col = ColumnLowCardinality::new(lc_type);
col.reserve(10_000);
assert!(col.indices.capacity() >= 10_000);
use crate::column::column_value::ColumnValue;
col.append_unsafe(&ColumnValue::from_string("test")).unwrap();
assert_eq!(col.len(), 1);
assert_eq!(col.dictionary_size(), 1);
}
#[test]
fn test_lowcardinality_reserve_performance() {
use crate::column::column_value::ColumnValue;
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col_with_reserve = ColumnLowCardinality::new(lc_type.clone());
col_with_reserve.reserve(1000);
let mut col_without_reserve = ColumnLowCardinality::new(lc_type);
for i in 0..100 {
let value = format!("value_{}", i % 10); col_with_reserve
.append_unsafe(&ColumnValue::from_string(&value))
.unwrap();
col_without_reserve
.append_unsafe(&ColumnValue::from_string(&value))
.unwrap();
}
assert_eq!(col_with_reserve.len(), 100);
assert_eq!(col_without_reserve.len(), 100);
assert_eq!(col_with_reserve.dictionary_size(), 10);
assert_eq!(col_without_reserve.dictionary_size(), 10);
assert!(col_with_reserve.indices.capacity() >= 1000);
}
#[test]
fn test_lowcardinality_save_load_roundtrip() {
use bytes::BytesMut;
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Simple(TypeCode::String)),
};
let mut col = ColumnLowCardinality::new(lc_type.clone());
use crate::column::column_value::ColumnValue;
col.append_unsafe(&ColumnValue::from_string("hello")).unwrap();
col.append_unsafe(&ColumnValue::from_string("world")).unwrap();
col.append_unsafe(&ColumnValue::from_string("hello")).unwrap(); col.append_unsafe(&ColumnValue::from_string("test")).unwrap();
col.append_unsafe(&ColumnValue::from_string("world")).unwrap();
assert_eq!(col.len(), 5);
assert_eq!(col.dictionary_size(), 3);
let mut buffer = BytesMut::new();
col.save_prefix(&mut buffer).unwrap();
col.save_to_buffer(&mut buffer).unwrap();
let mut read_buf = &buffer[..];
use bytes::Buf;
let key_version = read_buf.get_u64_le();
assert_eq!(key_version, 1, "key_version should be 1");
let index_serialization_type = read_buf.get_u64_le();
let index_type = index_serialization_type & 0xFF;
let has_additional_keys = (index_serialization_type & (1 << 9)) != 0;
assert_eq!(index_type, 3, "index_type should be 3 (UInt64)");
assert!(has_additional_keys, "HasAdditionalKeysBit should be set");
let number_of_keys = read_buf.get_u64_le();
assert_eq!(
number_of_keys, 3,
"dictionary should have 3 unique values"
);
let mut loaded_col = ColumnLowCardinality::new(lc_type);
let mut load_buf = &buffer[..];
loaded_col.load_prefix(&mut load_buf, 5).unwrap();
loaded_col.load_from_buffer(&mut load_buf, 5).unwrap();
assert_eq!(loaded_col.len(), 5);
assert_eq!(loaded_col.dictionary_size(), 3);
assert_eq!(loaded_col.index_at(0), col.index_at(0)); assert_eq!(loaded_col.index_at(1), col.index_at(1)); assert_eq!(loaded_col.index_at(2), col.index_at(2)); assert_eq!(loaded_col.index_at(3), col.index_at(3)); assert_eq!(loaded_col.index_at(4), col.index_at(4));
assert_eq!(loaded_col.index_at(0), loaded_col.index_at(2));
assert_eq!(loaded_col.index_at(1), loaded_col.index_at(4));
}
#[test]
fn test_lowcardinality_nullable_save_format() {
use bytes::BytesMut;
let lc_type = Type::LowCardinality {
nested_type: Box::new(Type::Nullable {
nested_type: Box::new(Type::Simple(TypeCode::String)),
}),
};
let mut col = ColumnLowCardinality::new(lc_type.clone());
use crate::column::column_value::ColumnValue;
col.append_unsafe(&ColumnValue::from_string("hello")).unwrap();
col.append_unsafe(&ColumnValue::void()).unwrap(); col.append_unsafe(&ColumnValue::from_string("world")).unwrap();
assert_eq!(col.len(), 3);
let mut buffer = BytesMut::new();
col.save_prefix(&mut buffer).unwrap();
col.save_to_buffer(&mut buffer).unwrap();
assert!(!buffer.is_empty(), "Buffer should contain data");
use bytes::Buf;
let mut read_buf = &buffer[..];
let key_version = read_buf.get_u64_le();
assert_eq!(key_version, 1, "key_version should be 1");
}
}