Skip to main content

clickhouse_native_client/column/
numeric.rs

1//! Numeric column implementations
2//!
3//! **ClickHouse Documentation:**
4//! - [Integer Types](https://clickhouse.com/docs/en/sql-reference/data-types/int-uint)
5//!   - Int8/16/32/64/128, UInt8/16/32/64/128
6//! - [Floating-Point Types](https://clickhouse.com/docs/en/sql-reference/data-types/float)
7//!   - Float32, Float64
8//! - [Decimal Types](https://clickhouse.com/docs/en/sql-reference/data-types/decimal)
9//!   - Fixed-point numbers
10//!
11//! ## Integer Types
12//!
13//! All integer types are stored in **little-endian** format:
14//!
15//! | Type | Rust Type | Storage | Min | Max |
16//! |------|-----------|---------|-----|-----|
17//! | `Int8` | `i8` | 1 byte | -128 | 127 |
18//! | `Int16` | `i16` | 2 bytes | -32,768 | 32,767 |
19//! | `Int32` | `i32` | 4 bytes | -2³¹ | 2³¹-1 |
20//! | `Int64` | `i64` | 8 bytes | -2⁶³ | 2⁶³-1 |
21//! | `Int128` | `i128` | 16 bytes | -2¹²⁷ | 2¹²⁷-1 |
22//! | `UInt8` | `u8` | 1 byte | 0 | 255 |
23//! | `UInt16` | `u16` | 2 bytes | 0 | 65,535 |
24//! | `UInt32` | `u32` | 4 bytes | 0 | 2³²-1 |
25//! | `UInt64` | `u64` | 8 bytes | 0 | 2⁶⁴-1 |
26//! | `UInt128` | `u128` | 16 bytes | 0 | 2¹²⁸-1 |
27//!
28//! ## Floating-Point Types
29//!
30//! IEEE 754 floating-point numbers, stored in little-endian:
31//! - `Float32` - Single precision (32-bit)
32//! - `Float64` - Double precision (64-bit)
33//!
34//! ## Bool Type
35//!
36//! `Bool` is an alias for `UInt8` where 0 = false, 1 = true.
37
38use super::{
39    Column,
40    ColumnRef,
41    ColumnTyped,
42};
43use crate::{
44    types::{
45        ToType,
46        Type,
47    },
48    Error,
49    Result,
50};
51use bytes::{
52    Buf,
53    BufMut,
54    BytesMut,
55};
56use std::sync::Arc;
57
58/// Trait for types that can be read/written as fixed-size values (synchronous
59/// version for columns)
60///
61/// All numeric types are stored in **little-endian** format in ClickHouse wire
62/// protocol.
63pub trait FixedSize: Sized + Clone + Send + Sync + 'static {
64    /// Read a single fixed-size value from the buffer in little-endian byte
65    /// order.
66    fn read_from(buffer: &mut &[u8]) -> Result<Self>;
67    /// Write a single fixed-size value to the buffer in little-endian byte
68    /// order.
69    fn write_to(&self, buffer: &mut BytesMut);
70}
71
72// Implement FixedSize for primitive types
73macro_rules! impl_fixed_size {
74    ($type:ty, $get:ident, $put:ident) => {
75        impl FixedSize for $type {
76            fn read_from(buffer: &mut &[u8]) -> Result<Self> {
77                if buffer.len() < std::mem::size_of::<$type>() {
78                    return Err(Error::Protocol(
79                        "Buffer underflow".to_string(),
80                    ));
81                }
82                Ok(buffer.$get())
83            }
84
85            fn write_to(&self, buffer: &mut BytesMut) {
86                buffer.$put(*self);
87            }
88        }
89    };
90}
91
92impl_fixed_size!(u8, get_u8, put_u8);
93impl_fixed_size!(u16, get_u16_le, put_u16_le);
94impl_fixed_size!(u32, get_u32_le, put_u32_le);
95impl_fixed_size!(u64, get_u64_le, put_u64_le);
96impl_fixed_size!(i8, get_i8, put_i8);
97impl_fixed_size!(i16, get_i16_le, put_i16_le);
98impl_fixed_size!(i32, get_i32_le, put_i32_le);
99impl_fixed_size!(i64, get_i64_le, put_i64_le);
100impl_fixed_size!(f32, get_f32_le, put_f32_le);
101impl_fixed_size!(f64, get_f64_le, put_f64_le);
102
103impl FixedSize for i128 {
104    fn read_from(buffer: &mut &[u8]) -> Result<Self> {
105        if buffer.len() < 16 {
106            return Err(Error::Protocol("Buffer underflow".to_string()));
107        }
108        Ok(buffer.get_i128_le())
109    }
110
111    fn write_to(&self, buffer: &mut BytesMut) {
112        buffer.put_i128_le(*self);
113    }
114}
115
116impl FixedSize for u128 {
117    fn read_from(buffer: &mut &[u8]) -> Result<Self> {
118        if buffer.len() < 16 {
119            return Err(Error::Protocol("Buffer underflow".to_string()));
120        }
121        Ok(buffer.get_u128_le())
122    }
123
124    fn write_to(&self, buffer: &mut BytesMut) {
125        buffer.put_u128_le(*self);
126    }
127}
128
129/// Generic column for numeric types
130pub struct ColumnVector<T: FixedSize> {
131    type_: Type,
132    data: Vec<T>,
133}
134
135impl<T: FixedSize + Clone + Send + Sync + 'static> ColumnVector<T> {
136    /// Create a column from an explicit type and a pre-built vector of values.
137    pub fn from_vec(type_: Type, data: Vec<T>) -> Self {
138        Self { type_, data }
139    }
140
141    /// Create a column with initial data (builder pattern)
142    pub fn with_data(mut self, data: Vec<T>) -> Self {
143        self.data = data;
144        self
145    }
146
147    /// Reserve capacity for additional elements (for
148    /// benchmarking/optimization)
149    pub fn reserve(&mut self, additional: usize) {
150        self.data.reserve(additional);
151    }
152
153    /// Clear the column while preserving capacity (for
154    /// benchmarking/optimization)
155    pub fn clear(&mut self) {
156        self.data.clear();
157    }
158
159    /// Return a reference to the value at the given index, or `None` if out of
160    /// bounds.
161    pub fn get(&self, index: usize) -> Option<&T> {
162        self.data.get(index)
163    }
164
165    /// Get value at index (panics if out of bounds - for tests)
166    pub fn at(&self, index: usize) -> T {
167        self.data[index].clone()
168    }
169
170    /// Get the number of elements (alias for size())
171    pub fn len(&self) -> usize {
172        self.data.len()
173    }
174
175    /// Check if the column is empty
176    pub fn is_empty(&self) -> bool {
177        self.data.is_empty()
178    }
179
180    /// Append a single value to the end of the column.
181    pub fn append(&mut self, value: T) {
182        self.data.push(value);
183    }
184
185    /// Return an iterator over references to the column values.
186    pub fn iter(&self) -> impl Iterator<Item = &T> {
187        self.data.iter()
188    }
189
190    /// Return a slice reference to the underlying data.
191    pub fn data(&self) -> &[T] {
192        &self.data
193    }
194
195    /// Return a mutable reference to the underlying data vector.
196    pub fn data_mut(&mut self) -> &mut Vec<T> {
197        &mut self.data
198    }
199}
200
201/// Type-inferred constructors for ColumnVector
202/// Implements the type map pattern from C++ `Type::CreateSimple<T>()`
203impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> ColumnVector<T> {
204    /// Create a new column with type inferred from T
205    /// Equivalent to C++ pattern where type is determined from template
206    /// parameter
207    ///
208    /// # Examples
209    ///
210    /// ```
211    /// use clickhouse_native_client::column::{Column, ColumnVector};
212    /// use clickhouse_native_client::types::Type;
213    ///
214    /// let col = ColumnVector::<i32>::new();
215    /// assert_eq!(col.column_type(), &Type::int32());
216    /// ```
217    pub fn new() -> Self {
218        Self { type_: T::to_type(), data: Vec::new() }
219    }
220
221    /// Create a new column with type inferred from T and specified capacity
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// use clickhouse_native_client::column::ColumnVector;
227    ///
228    /// let col = ColumnVector::<u64>::with_capacity(100);
229    /// assert_eq!(col.len(), 0);
230    /// ```
231    pub fn with_capacity(capacity: usize) -> Self {
232        Self { type_: T::to_type(), data: Vec::with_capacity(capacity) }
233    }
234}
235
236impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> Default
237    for ColumnVector<T>
238{
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244impl<T: FixedSize + ToType> Column for ColumnVector<T> {
245    fn column_type(&self) -> &Type {
246        &self.type_
247    }
248
249    fn size(&self) -> usize {
250        self.data.len()
251    }
252
253    fn clear(&mut self) {
254        self.data.clear()
255    }
256
257    fn reserve(&mut self, new_cap: usize) {
258        self.data.reserve(new_cap);
259    }
260
261    fn append_column(&mut self, other: ColumnRef) -> Result<()> {
262        let other = other
263            .as_any()
264            .downcast_ref::<ColumnVector<T>>()
265            .ok_or_else(|| Error::TypeMismatch {
266                expected: self.type_.name(),
267                actual: other.column_type().name(),
268            })?;
269
270        self.data.extend_from_slice(&other.data);
271        Ok(())
272    }
273
274    fn load_from_buffer(
275        &mut self,
276        buffer: &mut &[u8],
277        rows: usize,
278    ) -> Result<()> {
279        // Optimize: Use bulk read instead of loop for massive performance gain
280        // C++ does: WireFormat::ReadBytes(*input, data_.data(), rows *
281        // sizeof(T))
282        let bytes_needed = rows * std::mem::size_of::<T>();
283
284        if buffer.len() < bytes_needed {
285            return Err(Error::Protocol(format!(
286                "Buffer underflow: need {} bytes, have {}",
287                bytes_needed,
288                buffer.len()
289            )));
290        }
291
292        // Pre-allocate and read directly into Vec's memory
293        self.data.clear();
294        self.data.reserve(rows);
295
296        unsafe {
297            // Read bytes directly into Vec's uninitialized memory
298            let dest_ptr = self.data.as_mut_ptr() as *mut u8;
299            std::ptr::copy_nonoverlapping(
300                buffer.as_ptr(),
301                dest_ptr,
302                bytes_needed,
303            );
304            self.data.set_len(rows);
305        }
306
307        // Advance buffer
308        *buffer = &buffer[bytes_needed..];
309        Ok(())
310    }
311
312    fn save_to_buffer(&self, buffer: &mut BytesMut) -> Result<()> {
313        // Optimize: Use bulk write instead of loop for massive performance
314        // gain C++ does: WireFormat::WriteBytes(*output, data_.data(),
315        // data_.size() * sizeof(T)) This achieves the same with
316        // extend_from_slice on the raw bytes
317        if !self.data.is_empty() {
318            let byte_slice = unsafe {
319                std::slice::from_raw_parts(
320                    self.data.as_ptr() as *const u8,
321                    self.data.len() * std::mem::size_of::<T>(),
322                )
323            };
324            buffer.extend_from_slice(byte_slice);
325        }
326        Ok(())
327    }
328
329    fn clone_empty(&self) -> ColumnRef {
330        Arc::new(ColumnVector::<T>::new())
331    }
332
333    fn slice(&self, begin: usize, len: usize) -> Result<ColumnRef> {
334        if begin + len > self.data.len() {
335            return Err(Error::InvalidArgument(format!(
336                "Slice range out of bounds: begin={}, len={}, size={}",
337                begin,
338                len,
339                self.data.len()
340            )));
341        }
342
343        let sliced_data = self.data[begin..begin + len].to_vec();
344        Ok(Arc::new(ColumnVector::<T>::from_vec(
345            self.type_.clone(),
346            sliced_data,
347        )))
348    }
349
350    fn as_any(&self) -> &dyn std::any::Any {
351        self
352    }
353
354    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
355        self
356    }
357}
358
359impl<T: FixedSize + ToType + Clone + Send + Sync + 'static> ColumnTyped<T>
360    for ColumnVector<T>
361{
362    fn get(&self, index: usize) -> Option<T> {
363        self.data.get(index).cloned()
364    }
365
366    fn append(&mut self, value: T) {
367        self.data.push(value);
368    }
369}
370
371/// Column of `UInt8` values (1-byte unsigned integers).
372pub type ColumnUInt8 = ColumnVector<u8>;
373/// Column of `UInt16` values (2-byte unsigned integers, little-endian).
374pub type ColumnUInt16 = ColumnVector<u16>;
375/// Column of `UInt32` values (4-byte unsigned integers, little-endian).
376pub type ColumnUInt32 = ColumnVector<u32>;
377/// Column of `UInt64` values (8-byte unsigned integers, little-endian).
378pub type ColumnUInt64 = ColumnVector<u64>;
379/// Column of `UInt128` values (16-byte unsigned integers, little-endian).
380pub type ColumnUInt128 = ColumnVector<u128>;
381
382/// Column of `Int8` values (1-byte signed integers).
383pub type ColumnInt8 = ColumnVector<i8>;
384/// Column of `Int16` values (2-byte signed integers, little-endian).
385pub type ColumnInt16 = ColumnVector<i16>;
386/// Column of `Int32` values (4-byte signed integers, little-endian).
387pub type ColumnInt32 = ColumnVector<i32>;
388/// Column of `Int64` values (8-byte signed integers, little-endian).
389pub type ColumnInt64 = ColumnVector<i64>;
390/// Column of `Int128` values (16-byte signed integers, little-endian).
391pub type ColumnInt128 = ColumnVector<i128>;
392
393/// Column of `Float32` values (IEEE 754 single-precision, little-endian).
394pub type ColumnFloat32 = ColumnVector<f32>;
395/// Column of `Float64` values (IEEE 754 double-precision, little-endian).
396pub type ColumnFloat64 = ColumnVector<f64>;
397
398/// Column of `Date` values stored as `u16` (days since 1970-01-01).
399pub type ColumnDate = ColumnVector<u16>;
400
401#[cfg(test)]
402#[cfg_attr(coverage_nightly, coverage(off))]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_column_creation() {
408        // Test type-inferred constructor
409        let col = ColumnUInt32::new();
410        assert_eq!(col.size(), 0);
411        assert_eq!(col.column_type().name(), "UInt32");
412
413        // Test with_capacity constructor
414        let col2 = ColumnUInt32::with_capacity(100);
415        assert_eq!(col2.size(), 0);
416        assert_eq!(col2.column_type().name(), "UInt32");
417    }
418
419    #[test]
420    fn test_column_append() {
421        let mut col = ColumnUInt32::new();
422        col.append(42);
423        col.append(100);
424
425        assert_eq!(col.size(), 2);
426        assert_eq!(col.get(0), Some(&42));
427        assert_eq!(col.get(1), Some(&100));
428    }
429
430    #[test]
431    fn test_column_clear() {
432        let mut col = ColumnInt64::new();
433        col.append(-123);
434        col.append(456);
435        assert_eq!(col.size(), 2);
436
437        col.clear();
438        assert_eq!(col.size(), 0);
439    }
440
441    #[test]
442    fn test_column_slice() {
443        let mut col = ColumnUInt64::new();
444        for i in 0..10 {
445            col.append(i);
446        }
447
448        let sliced = col.slice(2, 5).unwrap();
449        assert_eq!(sliced.size(), 5);
450
451        let sliced_concrete =
452            sliced.as_any().downcast_ref::<ColumnUInt64>().unwrap();
453        assert_eq!(sliced_concrete.get(0), Some(&2));
454        assert_eq!(sliced_concrete.get(4), Some(&6));
455    }
456
457    #[test]
458    fn test_column_save_load() {
459        let mut col = ColumnInt32::new();
460        col.append(1);
461        col.append(-2);
462        col.append(3);
463
464        // Save to buffer
465        let mut buf = BytesMut::new();
466        col.save_to_buffer(&mut buf).unwrap();
467
468        // Load from buffer
469        let mut col2 = ColumnInt32::new();
470        let mut reader = &buf[..];
471        col2.load_from_buffer(&mut reader, 3).unwrap();
472
473        assert_eq!(col2.size(), 3);
474        assert_eq!(col2.get(0), Some(&1));
475        assert_eq!(col2.get(1), Some(&-2));
476        assert_eq!(col2.get(2), Some(&3));
477    }
478
479    #[test]
480    fn test_column_append_column() {
481        let mut col1 = ColumnFloat64::new();
482        col1.append(1.5);
483        col1.append(2.5);
484
485        let mut col2 = ColumnFloat64::new();
486        col2.append(3.5);
487        col2.append(4.5);
488
489        col1.append_column(Arc::new(col2)).unwrap();
490
491        assert_eq!(col1.size(), 4);
492        assert_eq!(col1.get(0), Some(&1.5));
493        assert_eq!(col1.get(3), Some(&4.5));
494    }
495
496    // Bulk copy tests - verify set_len safety
497    #[test]
498    fn test_bulk_load_large_dataset() {
499        // Test with 10,000 elements to ensure bulk copy works correctly
500        let mut col = ColumnUInt64::new();
501        let data: Vec<u64> = (0..10_000).collect();
502
503        // Save to buffer
504        let mut buf = BytesMut::new();
505        for &val in &data {
506            buf.put_u64_le(val);
507        }
508
509        // Load from buffer using bulk copy (internally uses set_len)
510        let mut reader = &buf[..];
511        col.load_from_buffer(&mut reader, 10_000).unwrap();
512
513        assert_eq!(col.size(), 10_000);
514        assert_eq!(col.get(0), Some(&0));
515        assert_eq!(col.get(5_000), Some(&5_000));
516        assert_eq!(col.get(9_999), Some(&9_999));
517    }
518
519    #[test]
520    fn test_bulk_load_multiple_sequential() {
521        // Test multiple sequential bulk loads
522        let mut col = ColumnInt32::new();
523
524        // First bulk load
525        let mut buf1 = BytesMut::new();
526        for i in 0..1_000 {
527            buf1.put_i32_le(i);
528        }
529        let mut reader1 = &buf1[..];
530        col.load_from_buffer(&mut reader1, 1_000).unwrap();
531
532        assert_eq!(col.size(), 1_000);
533        assert_eq!(col.get(0), Some(&0));
534        assert_eq!(col.get(999), Some(&999));
535
536        // Second bulk load (should append)
537        let mut buf2 = BytesMut::new();
538        for i in 1_000..2_000 {
539            buf2.put_i32_le(i);
540        }
541        let mut reader2 = &buf2[..];
542        col.load_from_buffer(&mut reader2, 1_000).unwrap();
543
544        assert_eq!(col.size(), 1_000); // Note: load_from_buffer clears first
545        assert_eq!(col.get(0), Some(&1_000));
546        assert_eq!(col.get(999), Some(&1_999));
547    }
548
549    #[test]
550    fn test_bulk_load_empty() {
551        // Test edge case: empty load
552        let mut col = ColumnUInt32::new();
553        let buf = BytesMut::new();
554        let mut reader = &buf[..];
555        col.load_from_buffer(&mut reader, 0).unwrap();
556
557        assert_eq!(col.size(), 0);
558    }
559
560    #[test]
561    fn test_bulk_load_single_element() {
562        // Test edge case: single element
563        let mut col = ColumnInt64::new();
564        let mut buf = BytesMut::new();
565        buf.put_i64_le(42);
566
567        let mut reader = &buf[..];
568        col.load_from_buffer(&mut reader, 1).unwrap();
569
570        assert_eq!(col.size(), 1);
571        assert_eq!(col.get(0), Some(&42));
572    }
573
574    #[test]
575    fn test_bulk_load_roundtrip_large() {
576        // Test save/load round-trip with large data
577        let mut col1 = ColumnFloat32::new();
578        for i in 0..5_000 {
579            col1.append(i as f32 * 1.5);
580        }
581
582        // Save to buffer
583        let mut buf = BytesMut::new();
584        col1.save_to_buffer(&mut buf).unwrap();
585
586        // Load from buffer
587        let mut col2 = ColumnFloat32::new();
588        let mut reader = &buf[..];
589        col2.load_from_buffer(&mut reader, 5_000).unwrap();
590
591        assert_eq!(col2.size(), 5_000);
592        for i in 0..5_000 {
593            assert_eq!(col2.get(i), Some(&(i as f32 * 1.5)));
594        }
595    }
596
597    #[test]
598    fn test_bulk_load_all_numeric_types() {
599        // Test bulk load for all numeric types to ensure set_len works
600        // correctly
601
602        // UInt8
603        let mut col_u8 = ColumnUInt8::new();
604        let mut buf = BytesMut::new();
605        for i in 0..255u8 {
606            buf.put_u8(i);
607        }
608        let mut reader = &buf[..];
609        col_u8.load_from_buffer(&mut reader, 255).unwrap();
610        assert_eq!(col_u8.size(), 255);
611
612        // UInt16
613        let mut col_u16 = ColumnUInt16::new();
614        let mut buf = BytesMut::new();
615        for i in 0..1000u16 {
616            buf.put_u16_le(i);
617        }
618        let mut reader = &buf[..];
619        col_u16.load_from_buffer(&mut reader, 1000).unwrap();
620        assert_eq!(col_u16.size(), 1000);
621
622        // Int8
623        let mut col_i8 = ColumnInt8::new();
624        let mut buf = BytesMut::new();
625        for i in -127..127i8 {
626            buf.put_i8(i);
627        }
628        let mut reader = &buf[..];
629        col_i8.load_from_buffer(&mut reader, 254).unwrap();
630        assert_eq!(col_i8.size(), 254);
631
632        // Int16
633        let mut col_i16 = ColumnInt16::new();
634        let mut buf = BytesMut::new();
635        for i in 0..1000i16 {
636            buf.put_i16_le(i);
637        }
638        let mut reader = &buf[..];
639        col_i16.load_from_buffer(&mut reader, 1000).unwrap();
640        assert_eq!(col_i16.size(), 1000);
641
642        // i128 and u128
643        let mut col_i128 = ColumnInt128::new();
644        let mut buf = BytesMut::new();
645        for i in 0..100i128 {
646            buf.put_i128_le(i);
647        }
648        let mut reader = &buf[..];
649        col_i128.load_from_buffer(&mut reader, 100).unwrap();
650        assert_eq!(col_i128.size(), 100);
651
652        let mut col_u128 = ColumnUInt128::new();
653        let mut buf = BytesMut::new();
654        for i in 0..100u128 {
655            buf.put_u128_le(i);
656        }
657        let mut reader = &buf[..];
658        col_u128.load_from_buffer(&mut reader, 100).unwrap();
659        assert_eq!(col_u128.size(), 100);
660    }
661
662    #[test]
663    fn test_bulk_load_memory_safety() {
664        // This test specifically validates that set_len is called AFTER memory
665        // initialization If set_len was called before
666        // copy_nonoverlapping, this would fail or cause UB
667        let mut col = ColumnInt32::new();
668
669        // Create test data with specific pattern
670        let mut buf = BytesMut::new();
671        let test_values: Vec<i32> =
672            vec![i32::MIN, -1_000_000, -1, 0, 1, 1_000_000, i32::MAX];
673        for &val in &test_values {
674            buf.put_i32_le(val);
675        }
676
677        // Load using bulk copy
678        let mut reader = &buf[..];
679        col.load_from_buffer(&mut reader, test_values.len()).unwrap();
680
681        // Verify all values are correctly initialized
682        assert_eq!(col.size(), test_values.len());
683        for (i, &expected) in test_values.iter().enumerate() {
684            assert_eq!(
685                col.get(i),
686                Some(&expected),
687                "Value mismatch at index {}",
688                i
689            );
690        }
691    }
692}