clickhouse_native_client/column/
lowcardinality.rs1use super::{
43 Column,
44 ColumnRef,
45};
46use crate::{
47 types::Type,
48 Error,
49 Result,
50};
51use bytes::{
52 Buf,
53 BufMut,
54 BytesMut,
55};
56use std::{
57 collections::HashMap,
58 sync::Arc,
59};
60
61use super::column_value::{
62 append_column_item,
63 compute_hash_key,
64 get_column_item,
65 ColumnValue,
66};
67
68pub struct ColumnLowCardinality {
76 type_: Type,
77 dictionary: ColumnRef, indices: Vec<u64>, unique_map: HashMap<(u64, u64), u64>, }
82
83impl ColumnLowCardinality {
84 pub fn new(type_: Type) -> Self {
86 let dictionary_type = match &type_ {
88 Type::LowCardinality { nested_type } => {
89 nested_type.as_ref().clone()
90 }
91 _ => panic!("ColumnLowCardinality requires LowCardinality type"),
92 };
93
94 let dictionary =
96 crate::io::block_stream::create_column(&dictionary_type)
97 .expect("Failed to create dictionary column");
98
99 Self {
100 type_,
101 dictionary,
102 indices: Vec::new(),
103 unique_map: HashMap::new(),
104 }
105 }
106
107 pub fn dictionary<T: Column + 'static>(&self) -> &T {
115 self.dictionary
116 .as_any()
117 .downcast_ref::<T>()
118 .expect("Failed to downcast dictionary column to requested type")
119 }
120
121 pub fn dictionary_mut<T: Column + 'static>(&mut self) -> &mut T {
129 Arc::get_mut(&mut self.dictionary)
130 .expect("Cannot get mutable reference to shared dictionary column")
131 .as_any_mut()
132 .downcast_mut::<T>()
133 .expect("Failed to downcast dictionary column to requested type")
134 }
135
136 pub fn dictionary_ref(&self) -> ColumnRef {
138 self.dictionary.clone()
139 }
140
141 pub fn dictionary_size(&self) -> usize {
143 self.dictionary.size()
144 }
145
146 pub fn index_at(&self, index: usize) -> u64 {
148 self.indices[index]
149 }
150
151 pub fn len(&self) -> usize {
153 self.indices.len()
154 }
155
156 pub fn is_empty(&self) -> bool {
158 self.indices.is_empty()
159 }
160
161 pub fn append_unsafe(&mut self, value: &ColumnValue) -> Result<()> {
164 let hash_key = compute_hash_key(value);
165 let current_dict_size = self.dictionary.size() as u64;
166
167 let index = if let Some(&existing_idx) = self.unique_map.get(&hash_key)
169 {
170 existing_idx
172 } else {
173 let dict_mut = Arc::get_mut(&mut self.dictionary).ok_or_else(|| {
175 Error::Protocol(
176 "Cannot append to shared dictionary - column has multiple references"
177 .to_string(),
178 )
179 })?;
180
181 append_column_item(dict_mut, value)?;
183
184 self.unique_map.insert(hash_key, current_dict_size);
186
187 current_dict_size
188 };
189
190 self.indices.push(index);
192
193 Ok(())
194 }
195
196 pub fn append_values<I>(&mut self, values: I) -> Result<()>
198 where
199 I: IntoIterator<Item = ColumnValue>,
200 {
201 for value in values {
202 self.append_unsafe(&value)?;
203 }
204 Ok(())
205 }
206}
207
208impl Column for ColumnLowCardinality {
209 fn column_type(&self) -> &Type {
210 &self.type_
211 }
212
213 fn size(&self) -> usize {
214 self.indices.len()
215 }
216
217 fn clear(&mut self) {
218 self.indices.clear();
219 self.unique_map.clear();
220 }
223
224 fn reserve(&mut self, new_cap: usize) {
225 let estimated_dict_size = (new_cap as f64).sqrt().ceil() as usize;
229
230 if let Some(dict_mut) = Arc::get_mut(&mut self.dictionary) {
232 dict_mut.reserve(estimated_dict_size);
233 }
234
235 self.indices.reserve(new_cap + 2);
237 }
238
239 fn append_column(&mut self, other: ColumnRef) -> Result<()> {
240 let other = other
241 .as_any()
242 .downcast_ref::<ColumnLowCardinality>()
243 .ok_or_else(|| Error::TypeMismatch {
244 expected: self.type_.name(),
245 actual: other.column_type().name(),
246 })?;
247
248 if self.dictionary.column_type().name()
250 != other.dictionary.column_type().name()
251 {
252 return Err(Error::TypeMismatch {
253 expected: self.dictionary.column_type().name(),
254 actual: other.dictionary.column_type().name(),
255 });
256 }
257
258 for &other_index in &other.indices {
271 let value = get_column_item(
273 other.dictionary.as_ref(),
274 other_index as usize,
275 )?;
276
277 self.append_unsafe(&value)?;
279 }
280
281 Ok(())
282 }
283
284 fn load_prefix(&mut self, buffer: &mut &[u8], _rows: usize) -> Result<()> {
285 if buffer.len() < 8 {
288 return Err(Error::Protocol(
289 "Not enough data for LowCardinality key version".to_string(),
290 ));
291 }
292
293 let key_version = buffer.get_u64_le();
294 const SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS: u64 = 1;
295
296 if key_version != SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS {
297 return Err(Error::Protocol(format!(
298 "Invalid LowCardinality key version: expected {}, got {}",
299 SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS, key_version
300 )));
301 }
302
303 Ok(())
304 }
305
306 fn load_from_buffer(
307 &mut self,
308 buffer: &mut &[u8],
309 rows: usize,
310 ) -> Result<()> {
311 if buffer.len() < 8 {
325 return Err(Error::Protocol(
326 "Not enough data for LowCardinality index serialization type"
327 .to_string(),
328 ));
329 }
330
331 let index_serialization_type = buffer.get_u64_le();
332
333 const INDEX_TYPE_MASK: u64 = 0xFF;
334 const NEED_GLOBAL_DICTIONARY_BIT: u64 = 1 << 8;
335 const HAS_ADDITIONAL_KEYS_BIT: u64 = 1 << 9;
336
337 let index_type = index_serialization_type & INDEX_TYPE_MASK;
338
339 if (index_serialization_type & NEED_GLOBAL_DICTIONARY_BIT) != 0 {
341 return Err(Error::Protocol(
342 "Global dictionary is not supported".to_string(),
343 ));
344 }
345
346 if (index_serialization_type & HAS_ADDITIONAL_KEYS_BIT) == 0 {
347 }
349
350 if buffer.len() < 8 {
352 return Err(Error::Protocol(
353 "Not enough data for dictionary size".to_string(),
354 ));
355 }
356 let number_of_keys = buffer.get_u64_le() as usize;
357
358 if number_of_keys > 0 {
364 let dict_mut = Arc::get_mut(&mut self.dictionary).ok_or_else(|| {
365 Error::Protocol(
366 "Cannot load into shared dictionary - column has multiple references"
367 .to_string(),
368 )
369 })?;
370
371 use super::nullable::ColumnNullable;
373 if let Some(nullable_col) =
374 dict_mut.as_any_mut().downcast_mut::<ColumnNullable>()
375 {
376 let nested_ref = nullable_col.nested_ref_mut();
379 let nested_mut = Arc::get_mut(nested_ref)
380 .ok_or_else(|| {
381 Error::Protocol(
382 "Cannot load into shared nested column - column has multiple references"
383 .to_string(),
384 )
385 })?;
386 nested_mut.load_from_buffer(buffer, number_of_keys)?;
387
388 for _ in 0..number_of_keys {
391 nullable_col.append_non_null();
392 }
393 } else {
394 dict_mut.load_from_buffer(buffer, number_of_keys)?;
396 }
397 }
398
399 let _number_of_rows = if buffer.len() >= 8 {
402 let val = buffer.get_u64_le() as usize;
403
404 if val != rows {
405 return Err(Error::Protocol(format!(
406 "LowCardinality row count mismatch: expected {}, got {}",
407 rows, val
408 )));
409 }
410 val
411 } else {
412 rows
415 };
416
417 self.indices.reserve(rows);
419 match index_type {
420 0 => {
421 for _ in 0..rows {
423 if buffer.is_empty() {
424 return Err(Error::Protocol(
425 "Not enough data for LowCardinality index"
426 .to_string(),
427 ));
428 }
429 let index = buffer.get_u8() as u64;
430 self.indices.push(index);
431 }
432 }
433 1 => {
434 for _ in 0..rows {
436 if buffer.len() < 2 {
437 return Err(Error::Protocol(
438 "Not enough data for LowCardinality index"
439 .to_string(),
440 ));
441 }
442 let index = buffer.get_u16_le() as u64;
443 self.indices.push(index);
444 }
445 }
446 2 => {
447 for _ in 0..rows {
449 if buffer.len() < 4 {
450 return Err(Error::Protocol(
451 "Not enough data for LowCardinality index"
452 .to_string(),
453 ));
454 }
455 let index = buffer.get_u32_le() as u64;
456 self.indices.push(index);
457 }
458 }
459 3 => {
460 for _ in 0..rows {
462 if buffer.len() < 8 {
463 return Err(Error::Protocol(
464 "Not enough data for LowCardinality index"
465 .to_string(),
466 ));
467 }
468 let index = buffer.get_u64_le();
469 self.indices.push(index);
470 }
471 }
472 _ => {
473 return Err(Error::Protocol(format!(
474 "Unknown LowCardinality index type: {}",
475 index_type
476 )));
477 }
478 }
479
480 self.unique_map.clear();
482 for i in 0..self.dictionary.size() {
483 let value = get_column_item(self.dictionary.as_ref(), i)?;
484 let hash_key = compute_hash_key(&value);
485 self.unique_map.insert(hash_key, i as u64);
486 }
487
488 Ok(())
489 }
490
491 fn save_prefix(&self, buffer: &mut BytesMut) -> Result<()> {
492 const SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS: u64 = 1;
495 buffer.put_u64_le(SHARED_DICTIONARIES_WITH_ADDITIONAL_KEYS);
496 Ok(())
497 }
498
499 fn save_to_buffer(&self, buffer: &mut BytesMut) -> Result<()> {
500 const HAS_ADDITIONAL_KEYS_BIT: u64 = 1 << 9;
509
510 const INDEX_TYPE_UINT64: u64 = 3;
514
515 let index_serialization_type =
517 INDEX_TYPE_UINT64 | HAS_ADDITIONAL_KEYS_BIT;
518 buffer.put_u64_le(index_serialization_type);
519
520 buffer.put_u64_le(self.dictionary.size() as u64);
522
523 use super::nullable::ColumnNullable;
528 if let Some(nullable_col) =
529 self.dictionary.as_any().downcast_ref::<ColumnNullable>()
530 {
531 nullable_col.nested_ref().save_to_buffer(buffer)?;
533 } else {
534 self.dictionary.save_to_buffer(buffer)?;
536 }
537
538 buffer.put_u64_le(self.indices.len() as u64);
540
541 for &index in &self.indices {
543 buffer.put_u64_le(index);
544 }
545
546 Ok(())
547 }
548
549 fn clone_empty(&self) -> ColumnRef {
550 Arc::new(ColumnLowCardinality::new(self.type_.clone()))
551 }
552
553 fn slice(&self, begin: usize, len: usize) -> Result<ColumnRef> {
554 if begin + len > self.indices.len() {
555 return Err(Error::InvalidArgument(format!(
556 "Slice out of bounds: begin={}, len={}, size={}",
557 begin,
558 len,
559 self.indices.len()
560 )));
561 }
562
563 let mut sliced = ColumnLowCardinality::new(self.type_.clone());
566
567 for i in begin..begin + len {
571 let dict_index = self.indices[i] as usize;
572 let value = get_column_item(self.dictionary.as_ref(), dict_index)?;
573 sliced.append_unsafe(&value)?;
574 }
575
576 Ok(Arc::new(sliced))
577 }
578
579 fn as_any(&self) -> &dyn std::any::Any {
580 self
581 }
582
583 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
584 self
585 }
586}
587
588#[cfg(test)]
589#[cfg_attr(coverage_nightly, coverage(off))]
590mod tests {
591 use super::*;
592 use crate::types::TypeCode;
593
594 #[test]
595 fn test_lowcardinality_creation() {
596 let lc_type = Type::LowCardinality {
597 nested_type: Box::new(Type::Simple(TypeCode::String)),
598 };
599
600 let col = ColumnLowCardinality::new(lc_type);
601 assert_eq!(col.len(), 0);
602 assert!(col.is_empty());
603 assert_eq!(col.dictionary_size(), 0);
604 }
605
606 #[test]
607 fn test_lowcardinality_empty() {
608 let lc_type = Type::LowCardinality {
609 nested_type: Box::new(Type::Simple(TypeCode::UInt32)),
610 };
611
612 let col = ColumnLowCardinality::new(lc_type);
613 assert_eq!(col.dictionary_size(), 0);
614 assert_eq!(col.size(), 0);
615 }
616
617 #[test]
618 fn test_lowcardinality_slice() {
619 use crate::column::column_value::ColumnValue;
620
621 let lc_type = Type::LowCardinality {
622 nested_type: Box::new(Type::Simple(TypeCode::String)),
623 };
624
625 let mut col = ColumnLowCardinality::new(lc_type);
626
627 col.append_unsafe(&ColumnValue::from_string("a")).unwrap();
629 col.append_unsafe(&ColumnValue::from_string("b")).unwrap();
630 col.append_unsafe(&ColumnValue::from_string("c")).unwrap();
631 col.append_unsafe(&ColumnValue::from_string("b")).unwrap();
632 col.append_unsafe(&ColumnValue::from_string("a")).unwrap();
633
634 assert_eq!(col.len(), 5);
635 assert_eq!(col.dictionary_size(), 3); let sliced = col.slice(1, 2).unwrap();
639 let sliced_col =
640 sliced.as_any().downcast_ref::<ColumnLowCardinality>().unwrap();
641
642 assert_eq!(sliced_col.len(), 2);
643 assert_eq!(
645 sliced_col.dictionary_size(),
646 2,
647 "Dictionary should be compacted"
648 );
649
650 let val0 = get_column_item(
652 sliced_col.dictionary.as_ref(),
653 sliced_col.index_at(0) as usize,
654 )
655 .unwrap();
656 let val1 = get_column_item(
657 sliced_col.dictionary.as_ref(),
658 sliced_col.index_at(1) as usize,
659 )
660 .unwrap();
661 assert_eq!(val0.as_string().unwrap(), "b");
662 assert_eq!(val1.as_string().unwrap(), "c");
663 }
664
665 #[test]
666 fn test_lowcardinality_slice_memory_efficiency() {
667 use crate::column::column_value::ColumnValue;
668
669 let lc_type = Type::LowCardinality {
670 nested_type: Box::new(Type::Simple(TypeCode::String)),
671 };
672
673 let mut col = ColumnLowCardinality::new(lc_type);
674
675 for i in 0..1000 {
677 col.append_unsafe(&ColumnValue::from_string(&format!(
678 "value_{}",
679 i
680 )))
681 .unwrap();
682 }
683
684 assert_eq!(col.dictionary_size(), 1000);
685
686 let sliced = col.slice(0, 10).unwrap();
688 let sliced_col =
689 sliced.as_any().downcast_ref::<ColumnLowCardinality>().unwrap();
690
691 assert_eq!(sliced_col.len(), 10);
692 assert_eq!(
694 sliced_col.dictionary_size(),
695 10,
696 "Dictionary should be compacted to only referenced items"
697 );
698 }
699
700 #[test]
701 fn test_lowcardinality_slice_with_duplicates() {
702 use crate::column::column_value::ColumnValue;
703
704 let lc_type = Type::LowCardinality {
705 nested_type: Box::new(Type::Simple(TypeCode::String)),
706 };
707
708 let mut col = ColumnLowCardinality::new(lc_type);
709
710 col.append_unsafe(&ColumnValue::from_string("x")).unwrap();
712 col.append_unsafe(&ColumnValue::from_string("y")).unwrap();
713 col.append_unsafe(&ColumnValue::from_string("z")).unwrap();
714 col.append_unsafe(&ColumnValue::from_string("x")).unwrap();
715 col.append_unsafe(&ColumnValue::from_string("x")).unwrap();
716 col.append_unsafe(&ColumnValue::from_string("z")).unwrap();
717
718 assert_eq!(col.dictionary_size(), 3); let sliced = col.slice(3, 3).unwrap();
722 let sliced_col =
723 sliced.as_any().downcast_ref::<ColumnLowCardinality>().unwrap();
724
725 assert_eq!(sliced_col.len(), 3);
726 assert_eq!(
727 sliced_col.dictionary_size(),
728 2,
729 "Only 'x' and 'z' should be in dictionary"
730 );
731
732 assert_eq!(
735 sliced_col.index_at(0),
736 sliced_col.index_at(1),
737 "Duplicate 'x' should use same index"
738 );
739 }
740
741 #[test]
742 fn test_lowcardinality_clear() {
743 let lc_type = Type::LowCardinality {
744 nested_type: Box::new(Type::Simple(TypeCode::String)),
745 };
746
747 let mut col = ColumnLowCardinality::new(lc_type);
748 col.indices = vec![0, 1, 2];
749
750 col.clear();
751 assert_eq!(col.len(), 0);
752 assert!(col.is_empty());
753 }
754
755 #[test]
756 fn test_lowcardinality_reserve() {
757 let lc_type = Type::LowCardinality {
758 nested_type: Box::new(Type::Simple(TypeCode::String)),
759 };
760
761 let mut col = ColumnLowCardinality::new(lc_type);
762
763 col.reserve(10_000);
766
767 assert!(col.indices.capacity() >= 10_000);
769
770 use crate::column::column_value::ColumnValue;
772 col.append_unsafe(&ColumnValue::from_string("test")).unwrap();
773 assert_eq!(col.len(), 1);
774 assert_eq!(col.dictionary_size(), 1);
775 }
776
777 #[test]
778 fn test_lowcardinality_reserve_performance() {
779 use crate::column::column_value::ColumnValue;
780
781 let lc_type = Type::LowCardinality {
782 nested_type: Box::new(Type::Simple(TypeCode::String)),
783 };
784
785 let mut col_with_reserve = ColumnLowCardinality::new(lc_type.clone());
789 col_with_reserve.reserve(1000);
790
791 let mut col_without_reserve = ColumnLowCardinality::new(lc_type);
792
793 for i in 0..100 {
795 let value = format!("value_{}", i % 10); col_with_reserve
797 .append_unsafe(&ColumnValue::from_string(&value))
798 .unwrap();
799 col_without_reserve
800 .append_unsafe(&ColumnValue::from_string(&value))
801 .unwrap();
802 }
803
804 assert_eq!(col_with_reserve.len(), 100);
805 assert_eq!(col_without_reserve.len(), 100);
806 assert_eq!(col_with_reserve.dictionary_size(), 10);
807 assert_eq!(col_without_reserve.dictionary_size(), 10);
808
809 assert!(col_with_reserve.indices.capacity() >= 1000);
811 }
812
813 #[test]
814 fn test_lowcardinality_save_load_roundtrip() {
815 use bytes::BytesMut;
816
817 let lc_type = Type::LowCardinality {
819 nested_type: Box::new(Type::Simple(TypeCode::String)),
820 };
821
822 let mut col = ColumnLowCardinality::new(lc_type.clone());
823
824 use crate::column::column_value::ColumnValue;
826 col.append_unsafe(&ColumnValue::from_string("hello")).unwrap();
827 col.append_unsafe(&ColumnValue::from_string("world")).unwrap();
828 col.append_unsafe(&ColumnValue::from_string("hello")).unwrap(); col.append_unsafe(&ColumnValue::from_string("test")).unwrap();
830 col.append_unsafe(&ColumnValue::from_string("world")).unwrap(); assert_eq!(col.len(), 5);
834 assert_eq!(col.dictionary_size(), 3); let mut buffer = BytesMut::new();
838 col.save_prefix(&mut buffer).unwrap();
839 col.save_to_buffer(&mut buffer).unwrap();
840
841 let mut read_buf = &buffer[..];
843 use bytes::Buf;
844
845 let key_version = read_buf.get_u64_le();
847 assert_eq!(key_version, 1, "key_version should be 1");
848
849 let index_serialization_type = read_buf.get_u64_le();
851 let index_type = index_serialization_type & 0xFF;
852 let has_additional_keys = (index_serialization_type & (1 << 9)) != 0;
853 assert_eq!(index_type, 3, "index_type should be 3 (UInt64)");
854 assert!(has_additional_keys, "HasAdditionalKeysBit should be set");
855
856 let number_of_keys = read_buf.get_u64_le();
858 assert_eq!(
859 number_of_keys, 3,
860 "dictionary should have 3 unique values"
861 );
862
863 let mut loaded_col = ColumnLowCardinality::new(lc_type);
865 let mut load_buf = &buffer[..];
866 loaded_col.load_prefix(&mut load_buf, 5).unwrap();
867 loaded_col.load_from_buffer(&mut load_buf, 5).unwrap();
868
869 assert_eq!(loaded_col.len(), 5);
871 assert_eq!(loaded_col.dictionary_size(), 3);
872
873 assert_eq!(loaded_col.index_at(0), col.index_at(0)); assert_eq!(loaded_col.index_at(1), col.index_at(1)); assert_eq!(loaded_col.index_at(2), col.index_at(2)); assert_eq!(loaded_col.index_at(3), col.index_at(3)); assert_eq!(loaded_col.index_at(4), col.index_at(4)); assert_eq!(loaded_col.index_at(0), loaded_col.index_at(2));
882 assert_eq!(loaded_col.index_at(1), loaded_col.index_at(4));
883 }
884
885 #[test]
886 fn test_lowcardinality_nullable_save_format() {
887 use bytes::BytesMut;
888
889 let lc_type = Type::LowCardinality {
891 nested_type: Box::new(Type::Nullable {
892 nested_type: Box::new(Type::Simple(TypeCode::String)),
893 }),
894 };
895
896 let mut col = ColumnLowCardinality::new(lc_type.clone());
897
898 use crate::column::column_value::ColumnValue;
900 col.append_unsafe(&ColumnValue::from_string("hello")).unwrap();
901 col.append_unsafe(&ColumnValue::void()).unwrap(); col.append_unsafe(&ColumnValue::from_string("world")).unwrap();
903
904 assert_eq!(col.len(), 3);
905
906 let mut buffer = BytesMut::new();
908 col.save_prefix(&mut buffer).unwrap();
909 col.save_to_buffer(&mut buffer).unwrap();
910
911 assert!(!buffer.is_empty(), "Buffer should contain data");
914
915 use bytes::Buf;
917 let mut read_buf = &buffer[..];
918 let key_version = read_buf.get_u64_le();
919 assert_eq!(key_version, 1, "key_version should be 1");
920
921 }
924}