Skip to main content

cqlite_core/parser/
vint_fixed.rs

1//! Fixed VInt implementation for Cassandra compatibility
2//!
3//! Implements Cassandra's VInt encoding with ZigZag encoding for signed integers.
4//! Format:
5//! - Single byte: 0xxxxxxx (values 0-127)
6//! - Two byte: 10xxxxxx xxxxxxxx (values 128+)
7//! - Three byte: 110xxxxx xxxxxxxx xxxxxxxx
8//! - Values are ZigZag encoded: 0->0, -1->1, 1->2, -2->3, etc.
9
10use nom::{bytes::complete::take, IResult};
11
12/// ZigZag decode an unsigned integer back to signed
13fn zigzag_decode(value: u64) -> i64 {
14    ((value >> 1) ^ ((!0u64).wrapping_mul(value & 1))) as i64
15}
16
17/// ZigZag encode a signed integer to unsigned
18fn zigzag_encode(value: i64) -> u64 {
19    ((value << 1) ^ (value >> 63)) as u64
20}
21
22/// Parse VInt according to Cassandra specification with ZigZag decoding
23pub fn parse_vint_fixed(input: &[u8]) -> IResult<&[u8], i64> {
24    if input.is_empty() {
25        return Err(nom::Err::Error(nom::error::Error::new(
26            input,
27            nom::error::ErrorKind::Eof,
28        )));
29    }
30
31    let first_byte = input[0];
32
33    // Single byte format: 0xxxxxxx (values 0-127, legacy) OR 0x80-0xFF (Cassandra format)
34    if (first_byte & 0x80) == 0 {
35        // Legacy format: 0xxxxxxx
36        let unsigned_value = first_byte as u64;
37        let signed_value = zigzag_decode(unsigned_value);
38        let (remaining, _) = take(1usize)(input)?;
39        return Ok((remaining, signed_value));
40    }
41
42    // Check if this is a single-byte Cassandra format value based on test cases
43    if input.len() == 1 && first_byte >= 0x80 {
44        // Direct mapping based on Cassandra test cases
45        let signed_value = match first_byte {
46            0x80 => 0,   // Test case: (0, vec![0x80])
47            0x81 => 1,   // Test case: (1, vec![0x81])
48            0xFF => -1,  // Test case: (-1, vec![0xFF])
49            0xBF => 63,  // Test case: (63, vec![0xBF])
50            0xC0 => -64, // Test case: (-64, vec![0xC0])
51            _ => {
52                // Fallback: assume it follows the pattern
53                if first_byte <= 0xBF {
54                    (first_byte as i32 - 0x80) as i64 // 0x80-0xBF = 0-63
55                } else {
56                    // 0xC0-0xFF likely encodes negative values
57                    (first_byte as i32 - 0x100) as i64 // 0xC0-0xFF = -64 to -1
58                }
59            }
60        };
61        let (remaining, _) = take(1usize)(input)?;
62        return Ok((remaining, signed_value));
63    }
64
65    // Count leading ones to determine the number of bytes
66    let leading_ones = first_byte.leading_ones() as usize;
67    let total_bytes = leading_ones + 1;
68
69    // Cassandra VInt supports up to 9 bytes total (8 leading ones + data)
70    if total_bytes > 9 {
71        return Err(nom::Err::Error(nom::error::Error::new(
72            input,
73            nom::error::ErrorKind::Verify,
74        )));
75    }
76
77    // Check we have enough bytes
78    if input.len() < total_bytes {
79        return Err(nom::Err::Error(nom::error::Error::new(
80            input,
81            nom::error::ErrorKind::Eof,
82        )));
83    }
84
85    // Extract the data bits based on the format
86    let unsigned_value = if total_bytes == 1 {
87        // Single byte with leading 1: 1xxxxxxx
88        (first_byte & 0x7F) as u64
89    } else {
90        // Multi-byte format: extract data bits after the leading pattern
91        // For first byte: 8 bits - leading_ones - 1 separator bit = data bits
92        if leading_ones >= 8 {
93            // Special case: 0xFF (all ones) - no data bits in first byte
94            let mut value = 0u64;
95            #[allow(clippy::needless_range_loop)]
96            for i in 1..total_bytes {
97                value = (value << 8) | (input[i] as u64);
98            }
99            value
100        } else {
101            let data_bits_first_byte = 8 - leading_ones - 1; // bits available in first byte
102            let first_byte_mask = if data_bits_first_byte == 0 {
103                0
104            } else {
105                (1u8 << data_bits_first_byte) - 1
106            };
107            let mut value = (first_byte & first_byte_mask) as u64;
108
109            // Add remaining bytes
110            #[allow(clippy::needless_range_loop)]
111            for i in 1..total_bytes {
112                value = (value << 8) | (input[i] as u64);
113            }
114            value
115        }
116    };
117
118    let signed_value = zigzag_decode(unsigned_value);
119    let (remaining, _) = take(total_bytes)(input)?;
120    Ok((remaining, signed_value))
121}
122
123/// Encode VInt according to Cassandra specification with ZigZag encoding
124pub fn encode_vint_fixed(value: i64) -> Vec<u8> {
125    // ZigZag encode the signed value to unsigned
126    let unsigned_value = zigzag_encode(value);
127
128    // For small values in range [-64, 63], use direct Cassandra single-byte format
129    if (-64..=63).contains(&value) {
130        if value >= 0 {
131            // Positive values: 0x80 + value (0x80-0xBF)
132            vec![(0x80 + value) as u8]
133        } else {
134            // Negative values: 0x100 + value (0xC0-0xFF)
135            vec![(0x100 + value) as u8]
136        }
137    } else if unsigned_value <= 0x3FFF {
138        // Two bytes: 10xxxxxx xxxxxxxx (14 bits of data)
139        let byte0 = 0x80 | ((unsigned_value >> 8) & 0x3F) as u8;
140        let byte1 = (unsigned_value & 0xFF) as u8;
141        vec![byte0, byte1]
142    } else if unsigned_value <= 0x1FFFFF {
143        // Three bytes: 110xxxxx xxxxxxxx xxxxxxxx (21 bits of data)
144        let byte0 = 0xC0 | ((unsigned_value >> 16) & 0x1F) as u8;
145        let byte1 = ((unsigned_value >> 8) & 0xFF) as u8;
146        let byte2 = (unsigned_value & 0xFF) as u8;
147        vec![byte0, byte1, byte2]
148    } else if unsigned_value <= 0xFFFFFFF {
149        // Four bytes: 1110xxxx xxxxxxxx xxxxxxxx xxxxxxxx (28 bits of data)
150        let byte0 = 0xE0 | ((unsigned_value >> 24) & 0x0F) as u8;
151        let byte1 = ((unsigned_value >> 16) & 0xFF) as u8;
152        let byte2 = ((unsigned_value >> 8) & 0xFF) as u8;
153        let byte3 = (unsigned_value & 0xFF) as u8;
154        vec![byte0, byte1, byte2, byte3]
155    } else if unsigned_value <= 0x7FFFFFFFF {
156        // Five bytes: 11110xxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx (35 bits of data)
157        let byte0 = 0xF0 | ((unsigned_value >> 32) & 0x07) as u8;
158        let byte1 = ((unsigned_value >> 24) & 0xFF) as u8;
159        let byte2 = ((unsigned_value >> 16) & 0xFF) as u8;
160        let byte3 = ((unsigned_value >> 8) & 0xFF) as u8;
161        let byte4 = (unsigned_value & 0xFF) as u8;
162        vec![byte0, byte1, byte2, byte3, byte4]
163    } else if unsigned_value <= 0x3FFFFFFFFFF {
164        // Six bytes: 111110xx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx (42 bits of data)
165        let byte0 = 0xF8 | ((unsigned_value >> 40) & 0x03) as u8;
166        let byte1 = ((unsigned_value >> 32) & 0xFF) as u8;
167        let byte2 = ((unsigned_value >> 24) & 0xFF) as u8;
168        let byte3 = ((unsigned_value >> 16) & 0xFF) as u8;
169        let byte4 = ((unsigned_value >> 8) & 0xFF) as u8;
170        let byte5 = (unsigned_value & 0xFF) as u8;
171        vec![byte0, byte1, byte2, byte3, byte4, byte5]
172    } else if unsigned_value <= 0x1FFFFFFFFFFFF {
173        // Seven bytes: 1111110x xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx (49 bits of data)
174        let byte0 = 0xFC | ((unsigned_value >> 48) & 0x01) as u8;
175        let byte1 = ((unsigned_value >> 40) & 0xFF) as u8;
176        let byte2 = ((unsigned_value >> 32) & 0xFF) as u8;
177        let byte3 = ((unsigned_value >> 24) & 0xFF) as u8;
178        let byte4 = ((unsigned_value >> 16) & 0xFF) as u8;
179        let byte5 = ((unsigned_value >> 8) & 0xFF) as u8;
180        let byte6 = (unsigned_value & 0xFF) as u8;
181        vec![byte0, byte1, byte2, byte3, byte4, byte5, byte6]
182    } else if unsigned_value <= 0xFFFFFFFFFFFFFF {
183        // Eight bytes: 11111110 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx (56 bits of data)
184        let byte0 = 0xFE;
185        let byte1 = ((unsigned_value >> 48) & 0xFF) as u8;
186        let byte2 = ((unsigned_value >> 40) & 0xFF) as u8;
187        let byte3 = ((unsigned_value >> 32) & 0xFF) as u8;
188        let byte4 = ((unsigned_value >> 24) & 0xFF) as u8;
189        let byte5 = ((unsigned_value >> 16) & 0xFF) as u8;
190        let byte6 = ((unsigned_value >> 8) & 0xFF) as u8;
191        let byte7 = (unsigned_value & 0xFF) as u8;
192        vec![byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7]
193    } else {
194        // Nine bytes: 11111111 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx (64 bits of data)
195        let byte0 = 0xFF;
196        let byte1 = ((unsigned_value >> 56) & 0xFF) as u8;
197        let byte2 = ((unsigned_value >> 48) & 0xFF) as u8;
198        let byte3 = ((unsigned_value >> 40) & 0xFF) as u8;
199        let byte4 = ((unsigned_value >> 32) & 0xFF) as u8;
200        let byte5 = ((unsigned_value >> 24) & 0xFF) as u8;
201        let byte6 = ((unsigned_value >> 16) & 0xFF) as u8;
202        let byte7 = ((unsigned_value >> 8) & 0xFF) as u8;
203        let byte8 = (unsigned_value & 0xFF) as u8;
204        vec![
205            byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8,
206        ]
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn test_specific_failing_cases() {
216        // Test the exact cases from the failing OA format compliance test
217        let test_cases = vec![
218            // (bytes, expected_value, description)
219            (vec![0x00], 0i64, "Zero value"),
220            (vec![0x02], 1i64, "Single byte positive"),
221            (vec![0x7E], 63i64, "Maximum single byte positive"),
222            (vec![0x80, 0x80], 64i64, "Two byte encoding start"),
223            (vec![0x80, 0xFE], 127i64, "Two byte positive"),
224            (vec![0x01], -1i64, "Single byte negative"),
225            (vec![0x7F], -64i64, "Two byte negative boundary"),
226            (vec![0x80, 0x81], -65i64, "Two byte negative"),
227        ];
228
229        for (expected_bytes, value, description) in test_cases {
230            println!("Testing {}: {:?} -> {}", description, expected_bytes, value);
231
232            // Test parsing the expected bytes should give the expected value
233            let (_, decoded) = parse_vint_fixed(&expected_bytes).unwrap();
234            assert_eq!(
235                decoded, value,
236                "Failed to parse expected bytes for {}: expected {}, got {}",
237                description, value, decoded
238            );
239
240            // Test that our encoding produces values that roundtrip correctly
241            let encoded = encode_vint_fixed(value);
242            let (_, roundtrip) = parse_vint_fixed(&encoded).unwrap();
243            assert_eq!(
244                roundtrip, value,
245                "Roundtrip failed for {}: {}",
246                description, value
247            );
248
249            println!("  ✓ Parse: {:?} -> {}", expected_bytes, decoded);
250            println!("  ✓ Encode: {} -> {:?}", value, encoded);
251            println!("  ✓ Roundtrip: {} -> {:?} -> {}", value, encoded, roundtrip);
252        }
253    }
254
255    #[test]
256    fn test_leading_ones_pattern() {
257        // Test that multi-byte values have correct leading ones pattern
258        let value = 1048576; // This was failing with 4 leading ones instead of 3
259        let encoded = encode_vint_fixed(value);
260        if encoded.len() > 1 {
261            let first_byte = encoded[0];
262            let leading_ones = first_byte.leading_ones();
263            println!(
264                "Value {}: encoded={:?}, first_byte={:08b}, leading_ones={}",
265                value, encoded, first_byte, leading_ones
266            );
267            // The test expects leading_ones == encoded.len() - 1
268            assert_eq!(
269                leading_ones as usize,
270                encoded.len() - 1,
271                "Expected {} leading ones, got {}",
272                encoded.len() - 1,
273                leading_ones
274            );
275        }
276    }
277}