Skip to main content

clickhouse_native_client/io/
buffer_utils.rs

1//! Buffer utilities for synchronous varint and string encoding/decoding
2//!
3//! These utilities work on in-memory byte slices and are used for parsing
4//! compressed block data and constructing query packets.
5
6use crate::{
7    Error,
8    Result,
9};
10use bytes::{
11    Buf,
12    BufMut,
13    BytesMut,
14};
15
16/// Read a varint-encoded u64 from a byte slice
17///
18/// This is the synchronous version used for parsing in-memory buffers.
19/// For async I/O, use `WireFormat::read_varint64` instead.
20pub fn read_varint(buffer: &mut &[u8]) -> Result<u64> {
21    let mut result: u64 = 0;
22    let mut shift = 0;
23
24    loop {
25        if buffer.is_empty() {
26            return Err(Error::Protocol(
27                "Unexpected end of buffer reading varint".to_string(),
28            ));
29        }
30
31        let byte = buffer[0];
32        buffer.advance(1);
33
34        result |= ((byte & 0x7F) as u64) << shift;
35
36        if byte & 0x80 == 0 {
37            break;
38        }
39
40        shift += 7;
41        if shift >= 64 {
42            return Err(Error::Protocol("Varint overflow".to_string()));
43        }
44    }
45
46    Ok(result)
47}
48
49/// Write a varint-encoded u64 to a byte buffer
50///
51/// This is the synchronous version used for constructing in-memory buffers.
52/// For async I/O, use `WireFormat::write_varint64` instead.
53pub fn write_varint(buffer: &mut BytesMut, mut value: u64) {
54    loop {
55        let mut byte = (value & 0x7F) as u8;
56        value >>= 7;
57
58        if value != 0 {
59            byte |= 0x80;
60        }
61
62        buffer.put_u8(byte);
63
64        if value == 0 {
65            break;
66        }
67    }
68}
69
70/// Read a length-prefixed string from a byte slice
71///
72/// This is the synchronous version used for parsing in-memory buffers.
73/// For async I/O, use `WireFormat::read_string` instead.
74pub fn read_string(buffer: &mut &[u8]) -> Result<String> {
75    let len = read_varint(buffer)? as usize;
76
77    if buffer.len() < len {
78        return Err(Error::Protocol(format!(
79            "Not enough data for string: need {}, have {}",
80            len,
81            buffer.len()
82        )));
83    }
84
85    let string_data = &buffer[..len];
86    let s = String::from_utf8(string_data.to_vec()).map_err(|e| {
87        Error::Protocol(format!("Invalid UTF-8 in string: {}", e))
88    })?;
89
90    buffer.advance(len);
91    Ok(s)
92}
93
94/// Write a length-prefixed string to a byte buffer
95///
96/// This is the synchronous version used for constructing in-memory buffers.
97/// For async I/O, use `WireFormat::write_string` instead.
98pub fn write_string(buffer: &mut BytesMut, s: &str) {
99    write_varint(buffer, s.len() as u64);
100    buffer.put_slice(s.as_bytes());
101}
102
103/// Write a varint to a raw `Vec<u8>` (convenience for tests)
104pub fn write_varint_to_vec(buf: &mut Vec<u8>, mut value: u64) {
105    loop {
106        let mut byte = (value & 0x7F) as u8;
107        value >>= 7;
108        if value != 0 {
109            byte |= 0x80;
110        }
111        buf.push(byte);
112        if value == 0 {
113            break;
114        }
115    }
116}
117
118#[cfg(test)]
119#[cfg_attr(coverage_nightly, coverage(off))]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn test_varint_roundtrip() {
125        let test_cases =
126            vec![0u64, 1, 127, 128, 255, 256, 65535, 65536, u64::MAX];
127
128        for value in test_cases {
129            let mut buf = BytesMut::new();
130            write_varint(&mut buf, value);
131
132            let mut slice = &buf[..];
133            let decoded = read_varint(&mut slice).unwrap();
134
135            assert_eq!(
136                value, decoded,
137                "Varint roundtrip failed for {}",
138                value
139            );
140            assert!(slice.is_empty(), "Buffer should be fully consumed");
141        }
142    }
143
144    #[test]
145    fn test_string_roundtrip() {
146        let test_strings =
147            vec!["", "hello", "мир", "🦀", "test\nwith\nnewlines"];
148
149        for s in test_strings {
150            let mut buf = BytesMut::new();
151            write_string(&mut buf, s);
152
153            let mut slice = &buf[..];
154            let decoded = read_string(&mut slice).unwrap();
155
156            assert_eq!(s, decoded, "String roundtrip failed for '{}'", s);
157            assert!(slice.is_empty(), "Buffer should be fully consumed");
158        }
159    }
160
161    #[test]
162    fn test_varint_overflow() {
163        // Create an invalid varint that would overflow
164        let mut buf = BytesMut::new();
165        for _ in 0..10 {
166            buf.put_u8(0xFF); // All continuation bits set
167        }
168
169        let mut slice = &buf[..];
170        let result = read_varint(&mut slice);
171        assert!(result.is_err());
172    }
173
174    #[test]
175    fn test_string_truncated() {
176        let mut buf = BytesMut::new();
177        write_varint(&mut buf, 100); // Say we have 100 bytes
178        buf.put_slice(b"only10"); // But only provide 6
179
180        let mut slice = &buf[..];
181        let result = read_string(&mut slice);
182        assert!(result.is_err());
183    }
184
185    #[test]
186    fn test_varint_to_vec() {
187        let mut buf = Vec::new();
188        write_varint_to_vec(&mut buf, 300);
189
190        let mut slice = &buf[..];
191        let decoded = read_varint(&mut slice).unwrap();
192        assert_eq!(decoded, 300);
193    }
194}