Skip to main content

clickhouse_native_client/column/
ipv6.rs

1use super::{
2    Column,
3    ColumnRef,
4};
5use crate::{
6    types::Type,
7    Error,
8    Result,
9};
10use bytes::BytesMut;
11use std::sync::Arc;
12
13/// Column for IPv6 addresses (stored as FixedString(16) - 16 bytes)
14///
15/// **Implementation Note:**
16/// Unlike C++, this does NOT delegate to `ColumnFixedString` because IPv6 data
17/// is pure binary (not UTF-8 text). Rust's `ColumnFixedString` uses `String`
18/// which requires valid UTF-8 and trims null bytes, corrupting binary IPv6
19/// data. Direct `Vec<[u8; 16]>` storage is more appropriate and preserves bulk
20/// copy performance.
21pub struct ColumnIpv6 {
22    type_: Type,
23    data: Vec<[u8; 16]>, // IPv6 addresses stored as 16 bytes
24}
25
26impl ColumnIpv6 {
27    /// Create a new empty IPv6 column.
28    pub fn new(type_: Type) -> Self {
29        Self { type_, data: Vec::new() }
30    }
31
32    /// Set the column data from a vector of 16-byte arrays.
33    pub fn with_data(mut self, data: Vec<[u8; 16]>) -> Self {
34        self.data = data;
35        self
36    }
37
38    /// Append an IPv6 address parsed from a string.
39    ///
40    /// Supports both full and compressed formats (e.g., `"::1"`, `"fe80::1"`).
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the string is not a valid IPv6 address.
45    pub fn append_from_string(&mut self, s: &str) -> Result<()> {
46        let bytes = parse_ipv6(s)?;
47        self.data.push(bytes);
48        Ok(())
49    }
50
51    /// Append IPv6 from 16-byte array
52    pub fn append(&mut self, bytes: [u8; 16]) {
53        self.data.push(bytes);
54    }
55
56    /// Get IPv6 at index as 16-byte array
57    pub fn at(&self, index: usize) -> [u8; 16] {
58        self.data[index]
59    }
60
61    /// Format IPv6 at index as string
62    pub fn as_string(&self, index: usize) -> String {
63        format_ipv6(&self.data[index])
64    }
65
66    /// Returns the number of values in this column.
67    pub fn len(&self) -> usize {
68        self.data.len()
69    }
70
71    /// Returns `true` if the column contains no values.
72    pub fn is_empty(&self) -> bool {
73        self.data.is_empty()
74    }
75}
76
77impl Column for ColumnIpv6 {
78    fn column_type(&self) -> &Type {
79        &self.type_
80    }
81
82    fn size(&self) -> usize {
83        self.data.len()
84    }
85
86    fn clear(&mut self) {
87        self.data.clear();
88    }
89
90    fn reserve(&mut self, new_cap: usize) {
91        self.data.reserve(new_cap);
92    }
93
94    fn append_column(&mut self, other: ColumnRef) -> Result<()> {
95        let other =
96            other.as_any().downcast_ref::<ColumnIpv6>().ok_or_else(|| {
97                Error::TypeMismatch {
98                    expected: self.type_.name(),
99                    actual: other.column_type().name(),
100                }
101            })?;
102
103        self.data.extend_from_slice(&other.data);
104        Ok(())
105    }
106
107    fn load_from_buffer(
108        &mut self,
109        buffer: &mut &[u8],
110        rows: usize,
111    ) -> Result<()> {
112        let bytes_needed = rows * 16;
113        if buffer.len() < bytes_needed {
114            return Err(Error::Protocol(format!(
115                "Buffer underflow: need {} bytes for IPv6, have {}",
116                bytes_needed,
117                buffer.len()
118            )));
119        }
120
121        // Use bulk copy for performance
122        self.data.reserve(rows);
123        let current_len = self.data.len();
124        unsafe {
125            // Set length first to claim ownership of the memory
126            self.data.set_len(current_len + rows);
127            let dest_ptr =
128                (self.data.as_mut_ptr() as *mut u8).add(current_len * 16);
129            std::ptr::copy_nonoverlapping(
130                buffer.as_ptr(),
131                dest_ptr,
132                bytes_needed,
133            );
134        }
135
136        use bytes::Buf;
137        buffer.advance(bytes_needed);
138        Ok(())
139    }
140
141    fn save_to_buffer(&self, buffer: &mut BytesMut) -> Result<()> {
142        if !self.data.is_empty() {
143            let byte_slice = unsafe {
144                std::slice::from_raw_parts(
145                    self.data.as_ptr() as *const u8,
146                    self.data.len() * 16,
147                )
148            };
149            buffer.extend_from_slice(byte_slice);
150        }
151        Ok(())
152    }
153
154    fn clone_empty(&self) -> ColumnRef {
155        Arc::new(ColumnIpv6::new(self.type_.clone()))
156    }
157
158    fn slice(&self, begin: usize, len: usize) -> Result<ColumnRef> {
159        if begin + len > self.data.len() {
160            return Err(Error::InvalidArgument(format!(
161                "Slice out of bounds: begin={}, len={}, size={}",
162                begin,
163                len,
164                self.data.len()
165            )));
166        }
167
168        let sliced_data = self.data[begin..begin + len].to_vec();
169        Ok(Arc::new(
170            ColumnIpv6::new(self.type_.clone()).with_data(sliced_data),
171        ))
172    }
173
174    fn as_any(&self) -> &dyn std::any::Any {
175        self
176    }
177
178    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
179        self
180    }
181}
182
183/// Parse IPv6 string to 16-byte array
184fn parse_ipv6(s: &str) -> Result<[u8; 16]> {
185    let parts: Vec<&str> = s.split("::").collect();
186
187    if parts.len() > 2 {
188        return Err(Error::Protocol(format!(
189            "Invalid IPv6 format (multiple ::): {}",
190            s
191        )));
192    }
193
194    let mut result = [0u8; 16];
195
196    if parts.len() == 2 {
197        // Compressed format with ::
198        let left_parts: Vec<&str> = if parts[0].is_empty() {
199            vec![]
200        } else {
201            parts[0].split(':').collect()
202        };
203        let right_parts: Vec<&str> = if parts[1].is_empty() {
204            vec![]
205        } else {
206            parts[1].split(':').collect()
207        };
208
209        // Parse left side
210        for (i, part) in left_parts.iter().enumerate() {
211            let value = u16::from_str_radix(part, 16).map_err(|e| {
212                Error::Protocol(format!("Invalid IPv6 hex: {}", e))
213            })?;
214            result[i * 2] = (value >> 8) as u8;
215            result[i * 2 + 1] = (value & 0xFF) as u8;
216        }
217
218        // Parse right side
219        let right_start = 16 - right_parts.len() * 2;
220        for (i, part) in right_parts.iter().enumerate() {
221            let value = u16::from_str_radix(part, 16).map_err(|e| {
222                Error::Protocol(format!("Invalid IPv6 hex: {}", e))
223            })?;
224            result[right_start + i * 2] = (value >> 8) as u8;
225            result[right_start + i * 2 + 1] = (value & 0xFF) as u8;
226        }
227    } else {
228        // Full format
229        let parts: Vec<&str> = s.split(':').collect();
230        if parts.len() != 8 {
231            return Err(Error::Protocol(format!(
232                "Invalid IPv6 format (expected 8 parts): {}",
233                s
234            )));
235        }
236
237        for (i, part) in parts.iter().enumerate() {
238            let value = u16::from_str_radix(part, 16).map_err(|e| {
239                Error::Protocol(format!("Invalid IPv6 hex: {}", e))
240            })?;
241            result[i * 2] = (value >> 8) as u8;
242            result[i * 2 + 1] = (value & 0xFF) as u8;
243        }
244    }
245
246    Ok(result)
247}
248
249/// Format 16-byte array as IPv6 string (compressed format)
250fn format_ipv6(bytes: &[u8; 16]) -> String {
251    // Convert bytes to u16 groups
252    let mut groups = [0u16; 8];
253    for i in 0..8 {
254        groups[i] = ((bytes[i * 2] as u16) << 8) | (bytes[i * 2 + 1] as u16);
255    }
256
257    // Find longest run of zeros for compression
258    let mut max_zero_start = None;
259    let mut max_zero_len = 0;
260    let mut current_zero_start = None;
261    let mut current_zero_len = 0;
262
263    for (i, &group) in groups.iter().enumerate() {
264        if group == 0 {
265            if current_zero_start.is_none() {
266                current_zero_start = Some(i);
267                current_zero_len = 1;
268            } else {
269                current_zero_len += 1;
270            }
271        } else {
272            if current_zero_len > max_zero_len {
273                max_zero_start = current_zero_start;
274                max_zero_len = current_zero_len;
275            }
276            current_zero_start = None;
277            current_zero_len = 0;
278        }
279    }
280
281    // Check final run
282    if current_zero_len > max_zero_len {
283        max_zero_start = current_zero_start;
284        max_zero_len = current_zero_len;
285    }
286
287    // Format with compression if we have a run of 2+ zeros
288    if max_zero_len >= 2 {
289        let start = max_zero_start.unwrap();
290        let end = start + max_zero_len;
291
292        let mut result = String::new();
293
294        // Add groups before compression
295        for (i, &group) in groups.iter().enumerate().take(start) {
296            if i > 0 {
297                result.push(':');
298            }
299            result.push_str(&format!("{:x}", group));
300        }
301
302        // Add compression marker
303        result.push_str("::");
304
305        // Add groups after compression
306        for (i, &group) in groups.iter().enumerate().skip(end) {
307            if i > end {
308                result.push(':');
309            }
310            result.push_str(&format!("{:x}", group));
311        }
312
313        result
314    } else {
315        // No compression
316        groups.iter().map(|g| format!("{:x}", g)).collect::<Vec<_>>().join(":")
317    }
318}
319
320#[cfg(test)]
321#[cfg_attr(coverage_nightly, coverage(off))]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_ipv6_full_format() {
327        let mut col = ColumnIpv6::new(Type::ipv6());
328        col.append_from_string("2001:0db8:85a3:0000:0000:8a2e:0370:7334")
329            .unwrap();
330
331        assert_eq!(col.len(), 1);
332        // Should be compressed when formatted
333        let formatted = col.as_string(0);
334        assert!(formatted.contains("2001") && formatted.contains("7334"));
335    }
336
337    #[test]
338    fn test_ipv6_compressed() {
339        let mut col = ColumnIpv6::new(Type::ipv6());
340        col.append_from_string("::1").unwrap();
341        col.append_from_string("fe80::1").unwrap();
342
343        assert_eq!(col.len(), 2);
344    }
345
346    #[test]
347    fn test_ipv6_zeros() {
348        let mut col = ColumnIpv6::new(Type::ipv6());
349        col.append([0u8; 16]);
350
351        assert_eq!(col.len(), 1);
352        let formatted = col.as_string(0);
353        assert_eq!(formatted, "::");
354    }
355
356    #[test]
357    fn test_ipv6_from_bytes() {
358        let mut col = ColumnIpv6::new(Type::ipv6());
359        let bytes = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
360        col.append(bytes);
361
362        assert_eq!(col.len(), 1);
363        assert_eq!(col.at(0), bytes);
364    }
365}