clickhouse_native_client/column/
numeric.rs1use super::{
39 Column,
40 ColumnRef,
41 ColumnTyped,
42};
43use crate::{
44 types::{
45 ToType,
46 Type,
47 },
48 Error,
49 Result,
50};
51use bytes::{
52 Buf,
53 BufMut,
54 BytesMut,
55};
56use std::sync::Arc;
57
58pub trait FixedSize: Sized + Clone + Send + Sync + 'static {
64 fn read_from(buffer: &mut &[u8]) -> Result<Self>;
67 fn write_to(&self, buffer: &mut BytesMut);
70}
71
72macro_rules! impl_fixed_size {
74 ($type:ty, $get:ident, $put:ident) => {
75 impl FixedSize for $type {
76 fn read_from(buffer: &mut &[u8]) -> Result<Self> {
77 if buffer.len() < std::mem::size_of::<$type>() {
78 return Err(Error::Protocol(
79 "Buffer underflow".to_string(),
80 ));
81 }
82 Ok(buffer.$get())
83 }
84
85 fn write_to(&self, buffer: &mut BytesMut) {
86 buffer.$put(*self);
87 }
88 }
89 };
90}
91
92impl_fixed_size!(u8, get_u8, put_u8);
93impl_fixed_size!(u16, get_u16_le, put_u16_le);
94impl_fixed_size!(u32, get_u32_le, put_u32_le);
95impl_fixed_size!(u64, get_u64_le, put_u64_le);
96impl_fixed_size!(i8, get_i8, put_i8);
97impl_fixed_size!(i16, get_i16_le, put_i16_le);
98impl_fixed_size!(i32, get_i32_le, put_i32_le);
99impl_fixed_size!(i64, get_i64_le, put_i64_le);
100impl_fixed_size!(f32, get_f32_le, put_f32_le);
101impl_fixed_size!(f64, get_f64_le, put_f64_le);
102
103impl FixedSize for i128 {
104 fn read_from(buffer: &mut &[u8]) -> Result<Self> {
105 if buffer.len() < 16 {
106 return Err(Error::Protocol("Buffer underflow".to_string()));
107 }
108 Ok(buffer.get_i128_le())
109 }
110
111 fn write_to(&self, buffer: &mut BytesMut) {
112 buffer.put_i128_le(*self);
113 }
114}
115
116impl FixedSize for u128 {
117 fn read_from(buffer: &mut &[u8]) -> Result<Self> {
118 if buffer.len() < 16 {
119 return Err(Error::Protocol("Buffer underflow".to_string()));
120 }
121 Ok(buffer.get_u128_le())
122 }
123
124 fn write_to(&self, buffer: &mut BytesMut) {
125 buffer.put_u128_le(*self);
126 }
127}
128
129pub struct ColumnVector<T: FixedSize> {
131 type_: Type,
132 data: Vec<T>,
133}
134
135impl<T: FixedSize + Clone + Send + Sync + 'static> ColumnVector<T> {
136 pub fn from_vec(type_: Type, data: Vec<T>) -> Self {
138 Self { type_, data }
139 }
140
141 pub fn with_data(mut self, data: Vec<T>) -> Self {
143 self.data = data;
144 self
145 }
146
147 pub fn reserve(&mut self, additional: usize) {
150 self.data.reserve(additional);
151 }
152
153 pub fn clear(&mut self) {
156 self.data.clear();
157 }
158
159 pub fn get(&self, index: usize) -> Option<&T> {
162 self.data.get(index)
163 }
164
165 pub fn at(&self, index: usize) -> T {
167 self.data[index].clone()
168 }
169
170 pub fn len(&self) -> usize {
172 self.data.len()
173 }
174
175 pub fn is_empty(&self) -> bool {
177 self.data.is_empty()
178 }
179
180 pub fn append(&mut self, value: T) {
182 self.data.push(value);
183 }
184
185 pub fn iter(&self) -> impl Iterator<Item = &T> {
187 self.data.iter()
188 }
189
190 pub fn data(&self) -> &[T] {
192 &self.data
193 }
194
195 pub fn data_mut(&mut self) -> &mut Vec<T> {
197 &mut self.data
198 }
199}
200
201impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> ColumnVector<T> {
204 pub fn new() -> Self {
218 Self { type_: T::to_type(), data: Vec::new() }
219 }
220
221 pub fn with_capacity(capacity: usize) -> Self {
232 Self { type_: T::to_type(), data: Vec::with_capacity(capacity) }
233 }
234}
235
236impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> Default
237 for ColumnVector<T>
238{
239 fn default() -> Self {
240 Self::new()
241 }
242}
243
244impl<T: FixedSize + ToType> Column for ColumnVector<T> {
245 fn column_type(&self) -> &Type {
246 &self.type_
247 }
248
249 fn size(&self) -> usize {
250 self.data.len()
251 }
252
253 fn clear(&mut self) {
254 self.data.clear()
255 }
256
257 fn reserve(&mut self, new_cap: usize) {
258 self.data.reserve(new_cap);
259 }
260
261 fn append_column(&mut self, other: ColumnRef) -> Result<()> {
262 let other = other
263 .as_any()
264 .downcast_ref::<ColumnVector<T>>()
265 .ok_or_else(|| Error::TypeMismatch {
266 expected: self.type_.name(),
267 actual: other.column_type().name(),
268 })?;
269
270 self.data.extend_from_slice(&other.data);
271 Ok(())
272 }
273
274 fn load_from_buffer(
275 &mut self,
276 buffer: &mut &[u8],
277 rows: usize,
278 ) -> Result<()> {
279 let bytes_needed = rows * std::mem::size_of::<T>();
283
284 if buffer.len() < bytes_needed {
285 return Err(Error::Protocol(format!(
286 "Buffer underflow: need {} bytes, have {}",
287 bytes_needed,
288 buffer.len()
289 )));
290 }
291
292 self.data.clear();
294 self.data.reserve(rows);
295
296 unsafe {
297 let dest_ptr = self.data.as_mut_ptr() as *mut u8;
299 std::ptr::copy_nonoverlapping(
300 buffer.as_ptr(),
301 dest_ptr,
302 bytes_needed,
303 );
304 self.data.set_len(rows);
305 }
306
307 *buffer = &buffer[bytes_needed..];
309 Ok(())
310 }
311
312 fn save_to_buffer(&self, buffer: &mut BytesMut) -> Result<()> {
313 if !self.data.is_empty() {
318 let byte_slice = unsafe {
319 std::slice::from_raw_parts(
320 self.data.as_ptr() as *const u8,
321 self.data.len() * std::mem::size_of::<T>(),
322 )
323 };
324 buffer.extend_from_slice(byte_slice);
325 }
326 Ok(())
327 }
328
329 fn clone_empty(&self) -> ColumnRef {
330 Arc::new(ColumnVector::<T>::new())
331 }
332
333 fn slice(&self, begin: usize, len: usize) -> Result<ColumnRef> {
334 if begin + len > self.data.len() {
335 return Err(Error::InvalidArgument(format!(
336 "Slice range out of bounds: begin={}, len={}, size={}",
337 begin,
338 len,
339 self.data.len()
340 )));
341 }
342
343 let sliced_data = self.data[begin..begin + len].to_vec();
344 Ok(Arc::new(ColumnVector::<T>::from_vec(
345 self.type_.clone(),
346 sliced_data,
347 )))
348 }
349
350 fn as_any(&self) -> &dyn std::any::Any {
351 self
352 }
353
354 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
355 self
356 }
357}
358
359impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> ColumnTyped<T>
360 for ColumnVector<T>
361{
362 fn get(&self, index: usize) -> Option<T> {
363 self.data.get(index).cloned()
364 }
365
366 fn append(&mut self, value: T) {
367 self.data.push(value);
368 }
369}
370
371pub type ColumnUInt8 = ColumnVector<u8>;
373pub type ColumnUInt16 = ColumnVector<u16>;
375pub type ColumnUInt32 = ColumnVector<u32>;
377pub type ColumnUInt64 = ColumnVector<u64>;
379pub type ColumnUInt128 = ColumnVector<u128>;
381
382pub type ColumnInt8 = ColumnVector<i8>;
384pub type ColumnInt16 = ColumnVector<i16>;
386pub type ColumnInt32 = ColumnVector<i32>;
388pub type ColumnInt64 = ColumnVector<i64>;
390pub type ColumnInt128 = ColumnVector<i128>;
392
393pub type ColumnFloat32 = ColumnVector<f32>;
395pub type ColumnFloat64 = ColumnVector<f64>;
397
398pub type ColumnDate = ColumnVector<u16>;
400
401#[cfg(test)]
402#[cfg_attr(coverage_nightly, coverage(off))]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_column_creation() {
408 let col = ColumnUInt32::new();
410 assert_eq!(col.size(), 0);
411 assert_eq!(col.column_type().name(), "UInt32");
412
413 let col2 = ColumnUInt32::with_capacity(100);
415 assert_eq!(col2.size(), 0);
416 assert_eq!(col2.column_type().name(), "UInt32");
417 }
418
419 #[test]
420 fn test_column_append() {
421 let mut col = ColumnUInt32::new();
422 col.append(42);
423 col.append(100);
424
425 assert_eq!(col.size(), 2);
426 assert_eq!(col.get(0), Some(&42));
427 assert_eq!(col.get(1), Some(&100));
428 }
429
430 #[test]
431 fn test_column_clear() {
432 let mut col = ColumnInt64::new();
433 col.append(-123);
434 col.append(456);
435 assert_eq!(col.size(), 2);
436
437 col.clear();
438 assert_eq!(col.size(), 0);
439 }
440
441 #[test]
442 fn test_column_slice() {
443 let mut col = ColumnUInt64::new();
444 for i in 0..10 {
445 col.append(i);
446 }
447
448 let sliced = col.slice(2, 5).unwrap();
449 assert_eq!(sliced.size(), 5);
450
451 let sliced_concrete =
452 sliced.as_any().downcast_ref::<ColumnUInt64>().unwrap();
453 assert_eq!(sliced_concrete.get(0), Some(&2));
454 assert_eq!(sliced_concrete.get(4), Some(&6));
455 }
456
457 #[test]
458 fn test_column_save_load() {
459 let mut col = ColumnInt32::new();
460 col.append(1);
461 col.append(-2);
462 col.append(3);
463
464 let mut buf = BytesMut::new();
466 col.save_to_buffer(&mut buf).unwrap();
467
468 let mut col2 = ColumnInt32::new();
470 let mut reader = &buf[..];
471 col2.load_from_buffer(&mut reader, 3).unwrap();
472
473 assert_eq!(col2.size(), 3);
474 assert_eq!(col2.get(0), Some(&1));
475 assert_eq!(col2.get(1), Some(&-2));
476 assert_eq!(col2.get(2), Some(&3));
477 }
478
479 #[test]
480 fn test_column_append_column() {
481 let mut col1 = ColumnFloat64::new();
482 col1.append(1.5);
483 col1.append(2.5);
484
485 let mut col2 = ColumnFloat64::new();
486 col2.append(3.5);
487 col2.append(4.5);
488
489 col1.append_column(Arc::new(col2)).unwrap();
490
491 assert_eq!(col1.size(), 4);
492 assert_eq!(col1.get(0), Some(&1.5));
493 assert_eq!(col1.get(3), Some(&4.5));
494 }
495
496 #[test]
498 fn test_bulk_load_large_dataset() {
499 let mut col = ColumnUInt64::new();
501 let data: Vec<u64> = (0..10_000).collect();
502
503 let mut buf = BytesMut::new();
505 for &val in &data {
506 buf.put_u64_le(val);
507 }
508
509 let mut reader = &buf[..];
511 col.load_from_buffer(&mut reader, 10_000).unwrap();
512
513 assert_eq!(col.size(), 10_000);
514 assert_eq!(col.get(0), Some(&0));
515 assert_eq!(col.get(5_000), Some(&5_000));
516 assert_eq!(col.get(9_999), Some(&9_999));
517 }
518
519 #[test]
520 fn test_bulk_load_multiple_sequential() {
521 let mut col = ColumnInt32::new();
523
524 let mut buf1 = BytesMut::new();
526 for i in 0..1_000 {
527 buf1.put_i32_le(i);
528 }
529 let mut reader1 = &buf1[..];
530 col.load_from_buffer(&mut reader1, 1_000).unwrap();
531
532 assert_eq!(col.size(), 1_000);
533 assert_eq!(col.get(0), Some(&0));
534 assert_eq!(col.get(999), Some(&999));
535
536 let mut buf2 = BytesMut::new();
538 for i in 1_000..2_000 {
539 buf2.put_i32_le(i);
540 }
541 let mut reader2 = &buf2[..];
542 col.load_from_buffer(&mut reader2, 1_000).unwrap();
543
544 assert_eq!(col.size(), 1_000); assert_eq!(col.get(0), Some(&1_000));
546 assert_eq!(col.get(999), Some(&1_999));
547 }
548
549 #[test]
550 fn test_bulk_load_empty() {
551 let mut col = ColumnUInt32::new();
553 let buf = BytesMut::new();
554 let mut reader = &buf[..];
555 col.load_from_buffer(&mut reader, 0).unwrap();
556
557 assert_eq!(col.size(), 0);
558 }
559
560 #[test]
561 fn test_bulk_load_single_element() {
562 let mut col = ColumnInt64::new();
564 let mut buf = BytesMut::new();
565 buf.put_i64_le(42);
566
567 let mut reader = &buf[..];
568 col.load_from_buffer(&mut reader, 1).unwrap();
569
570 assert_eq!(col.size(), 1);
571 assert_eq!(col.get(0), Some(&42));
572 }
573
574 #[test]
575 fn test_bulk_load_roundtrip_large() {
576 let mut col1 = ColumnFloat32::new();
578 for i in 0..5_000 {
579 col1.append(i as f32 * 1.5);
580 }
581
582 let mut buf = BytesMut::new();
584 col1.save_to_buffer(&mut buf).unwrap();
585
586 let mut col2 = ColumnFloat32::new();
588 let mut reader = &buf[..];
589 col2.load_from_buffer(&mut reader, 5_000).unwrap();
590
591 assert_eq!(col2.size(), 5_000);
592 for i in 0..5_000 {
593 assert_eq!(col2.get(i), Some(&(i as f32 * 1.5)));
594 }
595 }
596
597 #[test]
598 fn test_bulk_load_all_numeric_types() {
599 let mut col_u8 = ColumnUInt8::new();
604 let mut buf = BytesMut::new();
605 for i in 0..255u8 {
606 buf.put_u8(i);
607 }
608 let mut reader = &buf[..];
609 col_u8.load_from_buffer(&mut reader, 255).unwrap();
610 assert_eq!(col_u8.size(), 255);
611
612 let mut col_u16 = ColumnUInt16::new();
614 let mut buf = BytesMut::new();
615 for i in 0..1000u16 {
616 buf.put_u16_le(i);
617 }
618 let mut reader = &buf[..];
619 col_u16.load_from_buffer(&mut reader, 1000).unwrap();
620 assert_eq!(col_u16.size(), 1000);
621
622 let mut col_i8 = ColumnInt8::new();
624 let mut buf = BytesMut::new();
625 for i in -127..127i8 {
626 buf.put_i8(i);
627 }
628 let mut reader = &buf[..];
629 col_i8.load_from_buffer(&mut reader, 254).unwrap();
630 assert_eq!(col_i8.size(), 254);
631
632 let mut col_i16 = ColumnInt16::new();
634 let mut buf = BytesMut::new();
635 for i in 0..1000i16 {
636 buf.put_i16_le(i);
637 }
638 let mut reader = &buf[..];
639 col_i16.load_from_buffer(&mut reader, 1000).unwrap();
640 assert_eq!(col_i16.size(), 1000);
641
642 let mut col_i128 = ColumnInt128::new();
644 let mut buf = BytesMut::new();
645 for i in 0..100i128 {
646 buf.put_i128_le(i);
647 }
648 let mut reader = &buf[..];
649 col_i128.load_from_buffer(&mut reader, 100).unwrap();
650 assert_eq!(col_i128.size(), 100);
651
652 let mut col_u128 = ColumnUInt128::new();
653 let mut buf = BytesMut::new();
654 for i in 0..100u128 {
655 buf.put_u128_le(i);
656 }
657 let mut reader = &buf[..];
658 col_u128.load_from_buffer(&mut reader, 100).unwrap();
659 assert_eq!(col_u128.size(), 100);
660 }
661
662 #[test]
663 fn test_bulk_load_memory_safety() {
664 let mut col = ColumnInt32::new();
668
669 let mut buf = BytesMut::new();
671 let test_values: Vec<i32> =
672 vec![i32::MIN, -1_000_000, -1, 0, 1, 1_000_000, i32::MAX];
673 for &val in &test_values {
674 buf.put_i32_le(val);
675 }
676
677 let mut reader = &buf[..];
679 col.load_from_buffer(&mut reader, test_values.len()).unwrap();
680
681 assert_eq!(col.size(), test_values.len());
683 for (i, &expected) in test_values.iter().enumerate() {
684 assert_eq!(
685 col.get(i),
686 Some(&expected),
687 "Value mismatch at index {}",
688 i
689 );
690 }
691 }
692}