Skip to main content

cqlite_core/parser/
vint.rs

1//! Variable-length integer encoding/decoding for Cassandra SSTable format
2//!
3//! Cassandra uses a variable-length integer encoding scheme to save space.
4//! This module implements VInt encoding compatible with Cassandra 5+ format.
5//!
6//! VInt Encoding Specification (from Cassandra/ScyllaDB):
7//! - MSB-first encoding with consecutive 1-bits indicating extra bytes
8//! - First byte pattern: [number of extra bytes as 1-bits][0][value bits]
9//! - Example: 110xxxxx indicates 2 extra bytes follow
10//! - Uses ZigZag encoding for signed integers to efficiently encode small negative values
11//! - Maximum 9 bytes total length
12
13use nom::{bytes::complete::take, IResult};
14
15// Type aliases for complex types to reduce complexity warnings
16type VintParseResult<'a> = Result<Option<(usize, i64)>, nom::Err<nom::error::Error<&'a [u8]>>>;
17
18/// Detect ASCII corruption in VInt data
19///
20/// Common corruption patterns:
21/// - ASCII strings like "data", "bin", "node" being parsed as VInt
22/// - All bytes in printable ASCII range (0x20-0x7E)
23/// - Common file extensions or directory names
24#[allow(dead_code)]
25fn detect_ascii_corruption(input: &[u8]) -> bool {
26    if input.len() < 4 {
27        return false;
28    }
29
30    // Check first 4 bytes for common ASCII corruption patterns
31    let bytes = &input[0..4];
32
33    // Common corrupted values we've seen
34    let corrupted_patterns: &[&[u8]] = &[
35        b"data", b"bin", b"node", b"base", b"temp", b"logs", b"meta", b"main", b"root", b"home",
36    ];
37
38    for pattern in corrupted_patterns {
39        if bytes.starts_with(pattern) {
40            return true;
41        }
42    }
43
44    // Check if all bytes look like printable ASCII (likely corruption)
45    let ascii_count = bytes
46        .iter()
47        .filter(|&&b| (0x20..=0x7E).contains(&b))
48        .count();
49    if ascii_count >= 3 {
50        return true;
51    }
52
53    // Check for specific corrupted values we've encountered
54    let value = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
55    match value {
56        2959239534 | 1684108385 => true, // Known corrupted values: "bin" and "data"
57        _ => false,
58    }
59}
60
61/// Maximum bytes a VInt can occupy (Cassandra supports up to 9 bytes total)
62pub const MAX_VINT_SIZE: usize = 9;
63
64/// Maximum length value accepted by parse_vint_length to prevent overflow attacks.
65/// Set to 1GB as a generous limit that won't cause allocation issues on any platform.
66/// This prevents memory exhaustion attacks via malicious input claiming huge lengths.
67pub const MAX_VINT_LENGTH: i64 = 1024 * 1024 * 1024; // 1GB safety limit
68
69/// Decode a variable-length signed integer from bytes with backward compatibility
70///
71/// This function supports both:
72/// 1. **ZigZag encoding** (legacy/test compatibility)
73/// 2. **BTI format** (Issue #36 compatibility)
74///
75/// # Arguments
76///
77/// * `input` - Input byte slice
78///
79/// # Returns
80///
81/// Tuple of (remaining_bytes, decoded_value)
82pub fn parse_vint(input: &[u8]) -> IResult<&[u8], i64> {
83    if input.is_empty() {
84        return Err(nom::Err::Error(nom::error::Error::new(
85            input,
86            nom::error::ErrorKind::Eof,
87        )));
88    }
89
90    let _first_byte = input[0];
91
92    // Corruption detection: temporarily disabled to avoid false positives in collection data
93    // TODO: Make corruption detection more sophisticated to distinguish between
94    // legitimate string content in collections vs actual VInt corruption
95    // if input.len() >= 8 && detect_ascii_corruption(input) {
96    //     return Err(nom::Err::Error(nom::error::Error::new(
97    //         input,
98    //         nom::error::ErrorKind::Verify,
99    //     )));
100    // }
101
102    // Try the fixed Cassandra-compatible VInt parsing first
103    match crate::parser::vint_fixed::parse_vint_fixed(input) {
104        Ok(result) => Ok(result),
105        Err(_) => {
106            // Fall back to ZigZag encoding for backward compatibility
107            // This handles edge cases and legacy formats
108            parse_zigzag_vint(input)
109        }
110    }
111}
112
113/// Parse VInt using ZigZag encoding (backward compatibility)
114fn parse_zigzag_vint(input: &[u8]) -> IResult<&[u8], i64> {
115    if input.is_empty() {
116        return Err(nom::Err::Error(nom::error::Error::new(
117            input,
118            nom::error::ErrorKind::Eof,
119        )));
120    }
121
122    let first_byte = input[0];
123    let (bytes_used, unsigned_value) = if first_byte < 0x80 {
124        // Single byte: 0xxxxxxx (7 data bits)
125        (1, first_byte as u64)
126    } else if first_byte < 0xC0 {
127        // Two bytes: 10xxxxxx xxxxxxxx
128        if input.len() < 2 {
129            return Err(nom::Err::Error(nom::error::Error::new(
130                input,
131                nom::error::ErrorKind::Eof,
132            )));
133        }
134        let value = ((first_byte & 0x3F) as u64) << 8 | input[1] as u64;
135        (2, value)
136    } else if first_byte < 0xE0 {
137        // Three bytes: 110xxxxx xxxxxxxx xxxxxxxx
138        if input.len() < 3 {
139            return Err(nom::Err::Error(nom::error::Error::new(
140                input,
141                nom::error::ErrorKind::Eof,
142            )));
143        }
144        let value = ((first_byte & 0x1F) as u64) << 16 | (input[1] as u64) << 8 | input[2] as u64;
145        (3, value)
146    } else if first_byte == 0xF0 {
147        // Extended format: 0xF0 followed by variable length bytes
148        if input.len() < 2 {
149            return Err(nom::Err::Error(nom::error::Error::new(
150                input,
151                nom::error::ErrorKind::Eof,
152            )));
153        }
154        // Read the remaining bytes as a big-endian integer
155        let mut value = 0u64;
156        let bytes_to_read = input.len() - 1; // Skip the 0xF0 marker
157        #[allow(clippy::needless_range_loop)]
158        for i in 1..=bytes_to_read.min(8) {
159            // Max 8 bytes for u64
160            value = (value << 8) | (input[i] as u64);
161        }
162        (bytes_to_read + 1, value)
163    } else if first_byte == 0xFF {
164        // Extended format: 0xFF followed by variable length bytes (similar to 0xF0)
165        if input.len() < 2 {
166            return Err(nom::Err::Error(nom::error::Error::new(
167                input,
168                nom::error::ErrorKind::Eof,
169            )));
170        }
171        // Read the remaining bytes as a big-endian integer
172        let mut value = 0u64;
173        let bytes_to_read = input.len() - 1; // Skip the 0xFF marker
174        #[allow(clippy::needless_range_loop)]
175        for i in 1..=bytes_to_read.min(8) {
176            // Max 8 bytes for u64
177            value = (value << 8) | (input[i] as u64);
178        }
179        (bytes_to_read + 1, value)
180    } else {
181        // Not a valid ZigZag VInt, let caller try other formats
182        return Err(nom::Err::Error(nom::error::Error::new(
183            input,
184            nom::error::ErrorKind::Verify,
185        )));
186    };
187
188    let signed_value = zigzag_decode(unsigned_value);
189    let (remaining_input, _) = take(bytes_used)(input)?;
190    Ok((remaining_input, signed_value))
191}
192
193/// Parse VInt using Cassandra-compatible format
194#[allow(dead_code)]
195fn parse_cassandra_vint_format(input: &[u8]) -> VintParseResult<'_> {
196    if input.is_empty() {
197        return Ok(None);
198    }
199
200    let first_byte = input[0];
201
202    // Count leading ones to determine the byte length
203    let leading_ones = first_byte.leading_ones() as usize;
204    let total_length = leading_ones + 1;
205
206    if total_length > 9 || input.len() < total_length {
207        return Ok(None); // Invalid or incomplete
208    }
209
210    // Extract the value based on the format
211    let value = if total_length == 1 {
212        // Single byte format
213        if first_byte & 0x80 == 0x80 {
214            // 1xxxxxxx format: values 0-127
215            (first_byte & 0x7F) as i64
216        } else if first_byte == 0xFF {
217            -1
218        } else if first_byte & 0xC0 == 0xC0 {
219            // 11xxxxxx format: negative values -1 to -63
220            -((first_byte & 0x3F) as i64)
221        } else {
222            // 0xxxxxxx format should not appear in Cassandra VInt
223            return Ok(None);
224        }
225    } else {
226        // Multi-byte format: extract data bits after leading pattern
227        let data_bits = (total_length * 8) - leading_ones - 1;
228        // Extract data from first byte (after leading pattern)
229        let first_data_bits = 8 - leading_ones - 1;
230        let first_data_mask = (1u8 << first_data_bits) - 1;
231        let mut value = (first_byte & first_data_mask) as i64;
232
233        // Add remaining bytes
234        #[allow(clippy::needless_range_loop)]
235        for i in 1..total_length {
236            value = (value << 8) | (input[i] as i64);
237        }
238
239        // Check if this should be interpreted as negative (two's complement)
240        // For Cassandra VInt, we need to handle signed values properly
241        let max_positive = (1i64 << (data_bits - 1)) - 1;
242        if value > max_positive {
243            // Convert from unsigned to signed (two's complement)
244            value -= 1i64 << data_bits;
245        }
246
247        value
248    };
249
250    Ok(Some((total_length, value)))
251}
252
253/// Parse VInt using custom BTI format (Issue #36)
254#[allow(dead_code)]
255fn parse_custom_vint_format(input: &[u8]) -> VintParseResult<'_> {
256    if input.is_empty() {
257        return Ok(None);
258    }
259
260    let first_byte = input[0];
261
262    let (total_length, value) = if first_byte < 0x80 {
263        // Single byte: 0xxxxxxx (7 data bits)
264        let unsigned_value = first_byte & 0x7F;
265        let value = if unsigned_value < 64 {
266            unsigned_value as i64
267        } else {
268            (unsigned_value as i64) - 128
269        };
270        (1, value)
271    } else if first_byte < 0xC0 {
272        // Single byte: 10xxxxxx (0x80-0xBF) -> values 0-63
273        let value = (first_byte & 0x3F) as i64;
274        (1, value)
275    } else if first_byte == 0xFF {
276        // Special case: 0xFF represents -1
277        (1, -1)
278    } else if first_byte >= 0xC0 {
279        if input.len() == 1 {
280            // Single byte negative: 0xC0-0xFE maps to -64 to -2
281            let value = -64 + (first_byte - 0xC0) as i64;
282            (1, value)
283        } else if first_byte == 0xC0 && input.len() >= 2 {
284            // Two-byte format: 0xC0 + value byte
285            let second_byte = input[1];
286            let value = if second_byte <= 0x7F {
287                second_byte as i64
288            } else if second_byte == 0x80 {
289                -128
290            } else {
291                second_byte as i64
292            };
293            (2, value)
294        } else {
295            return Ok(None); // Not supported in this format
296        }
297    } else {
298        return Ok(None);
299    };
300
301    if input.len() < total_length {
302        return Err(nom::Err::Error(nom::error::Error::new(
303            input,
304            nom::error::ErrorKind::Eof,
305        )));
306    }
307
308    Ok(Some((total_length, value)))
309}
310
311/// Encode using Cassandra-compatible VInt format
312#[allow(dead_code)]
313fn encode_cassandra_vint(value: i64) -> Vec<u8> {
314    // Handle negative values using two's complement representation
315    let _unsigned_value = if value >= 0 {
316        value as u64
317    } else {
318        // Use two's complement for negative values
319        value as u64 // This will wrap negative values correctly
320    };
321
322    // Determine the number of bytes needed
323    let bytes_needed = if value == 0 {
324        1
325    } else if (-63..=63).contains(&value) {
326        1 // Single byte range for small values
327    } else if (-8192..=8191).contains(&value) {
328        2 // Two bytes
329    } else if (-1048576..=1048575).contains(&value) {
330        3 // Three bytes
331    } else if (-134217728..=134217727).contains(&value) {
332        4 // Four bytes
333    } else {
334        // Calculate bytes needed for larger values
335        let abs_value = value.unsigned_abs();
336        if abs_value <= 0xFF {
337            2
338        } else if abs_value <= 0xFFFF {
339            3
340        } else if abs_value <= 0xFFFFFF {
341            4
342        } else if abs_value <= 0xFFFFFFFF {
343            5
344        } else {
345            8 // Maximum for i64
346        }
347    };
348
349    match bytes_needed {
350        1 => {
351            // Single byte: 1xxxxxxx for values 0-127, 0xxxxxxx for negative -1 to -63
352            if (0..=63).contains(&value) {
353                vec![0x80 | (value as u8)]
354            } else if value == -1 {
355                vec![0xFF]
356            } else if (-63..0).contains(&value) {
357                vec![0xC0 | ((-value) as u8)]
358            } else {
359                // fallback to two bytes
360                encode_cassandra_vint_multi_byte(value, 2)
361            }
362        }
363        2 => encode_cassandra_vint_multi_byte(value, 2),
364        3 => encode_cassandra_vint_multi_byte(value, 3),
365        4 => encode_cassandra_vint_multi_byte(value, 4),
366        _ => encode_cassandra_vint_multi_byte(value, bytes_needed),
367    }
368}
369
370/// Encode multi-byte Cassandra VInt with proper leading bit pattern
371#[allow(dead_code)]
372fn encode_cassandra_vint_multi_byte(value: i64, num_bytes: usize) -> Vec<u8> {
373    let mut result = vec![0u8; num_bytes];
374
375    // Set the leading bit pattern: n-1 leading ones followed by a zero
376    let leading_ones = num_bytes - 1;
377    let first_byte_mask = (0xFF << (8 - leading_ones)) & 0xFF;
378
379    // Convert value to bytes (using two's complement for negatives)
380    let value_bytes = if value >= 0 {
381        value.to_be_bytes()
382    } else {
383        (value as u64).to_be_bytes() // Two's complement representation
384    };
385
386    // Place the value in the remaining bits
387    let data_bits = (num_bytes * 8) - leading_ones - 1; // Total data bits available
388    let data_bytes = data_bits.div_ceil(8); // How many bytes we need for data
389
390    // Copy the relevant bytes from value_bytes
391    let start_idx = 8 - data_bytes;
392    for (i, &byte) in value_bytes[start_idx..].iter().enumerate() {
393        if i == 0 {
394            // First byte: combine leading pattern with data
395            let data_mask = (1u8 << (8 - leading_ones - 1)) - 1;
396            result[0] = first_byte_mask as u8 | (byte & data_mask);
397        } else {
398            result[i] = byte;
399        }
400    }
401
402    result
403}
404
405/// Encode a signed integer using ZigZag encoding (backward compatibility)
406#[allow(dead_code)]
407fn encode_zigzag_vint(value: i64) -> Vec<u8> {
408    let unsigned_value = zigzag_encode(value);
409
410    if unsigned_value <= 0x7F {
411        // Single byte: 0xxxxxxx
412        vec![unsigned_value as u8]
413    } else if unsigned_value <= 0x3FFF {
414        // Two bytes: 10xxxxxx xxxxxxxx
415        let high = ((unsigned_value >> 8) & 0x3F) | 0x80;
416        let low = unsigned_value & 0xFF;
417        vec![high as u8, low as u8]
418    } else if unsigned_value <= 0x1FFFFF {
419        // Three bytes: 110xxxxx xxxxxxxx xxxxxxxx
420        let high = ((unsigned_value >> 16) & 0x1F) | 0xC0;
421        let mid = (unsigned_value >> 8) & 0xFF;
422        let low = unsigned_value & 0xFF;
423        vec![high as u8, mid as u8, low as u8]
424    } else {
425        // For larger values, use a simplified multi-byte format
426        let bytes = unsigned_value.to_be_bytes();
427        let mut result = vec![0xF0]; // Marker for extended format
428
429        // Find the first non-zero byte and include remaining bytes
430        let start = bytes.iter().position(|&b| b != 0).unwrap_or(7);
431        result.extend_from_slice(&bytes[start..]);
432        result
433    }
434}
435
436/// ZigZag encode a signed integer to unsigned (for efficient small negative number encoding)
437///
438/// ZigZag encoding maps signed integers to unsigned integers so that numbers
439/// with small absolute values have small encodings:
440/// 0 -> 0, -1 -> 1, 1 -> 2, -2 -> 3, 2 -> 4, -3 -> 5, ...
441#[allow(dead_code)]
442pub fn zigzag_encode(value: i64) -> u64 {
443    ((value << 1) ^ (value >> 63)) as u64
444}
445
446/// ZigZag decode an unsigned integer back to signed
447#[allow(dead_code)]
448pub fn zigzag_decode(value: u64) -> i64 {
449    ((value >> 1) ^ ((!0u64).wrapping_mul(value & 1))) as i64
450}
451
452/// Calculate the number of bytes needed to encode a value
453///
454/// Cassandra VInt encoding boundaries:
455/// - 1 byte: 0xxxxxxx -> 0 to 127 (7 bits)
456/// - 2 bytes: 10xxxxxx xxxxxxxx -> 0 to 16383 (14 bits: 6+8)
457/// - 3 bytes: 110xxxxx xxxxxxxx xxxxxxxx -> 0 to 2097151 (21 bits: 5+16)
458/// - etc.
459#[allow(dead_code)]
460fn vint_size(value: u64) -> usize {
461    if value == 0 {
462        return 1;
463    }
464
465    // Cassandra VInt boundaries based on actual capacity
466    if value <= 127 {
467        // 2^7 - 1 (7 bits)
468        1
469    } else if value <= 16383 {
470        // 2^14 - 1 (14 bits)
471        2
472    } else if value <= 2097151 {
473        // 2^21 - 1 (21 bits)
474        3
475    } else if value <= 268435455 {
476        // 2^28 - 1 (28 bits)
477        4
478    } else if value <= 34359738367 {
479        // 2^35 - 1 (35 bits)
480        5
481    } else if value <= 4398046511103 {
482        // 2^42 - 1 (42 bits)
483        6
484    } else if value <= 562949953421311 {
485        // 2^49 - 1 (49 bits)
486        7
487    } else if value <= 72057594037927935 {
488        // 2^56 - 1 (56 bits)
489        8
490    } else {
491        9 // Maximum size
492    }
493}
494
495/// Encode a signed integer as a variable-length integer with backward compatibility
496///
497/// This function now prioritizes Cassandra-compatible format for better
498/// compatibility with standard Cassandra VInt encoding.
499///
500/// # Arguments
501///
502/// * `value` - The integer value to encode
503///
504/// # Returns
505///
506/// Vector of bytes representing the VInt-encoded value
507pub fn encode_vint(value: i64) -> Vec<u8> {
508    encode_vint_zigzag(value)
509}
510
511/// Encode VInt using original ZigZag format for backward compatibility
512pub fn encode_vint_zigzag(value: i64) -> Vec<u8> {
513    let unsigned_value = zigzag_encode(value);
514
515    if unsigned_value <= 0x7F {
516        // Single byte: 0xxxxxxx
517        vec![unsigned_value as u8]
518    } else if unsigned_value <= 0x3FFF {
519        // Two bytes: 10xxxxxx xxxxxxxx
520        let byte0 = 0x80 | ((unsigned_value >> 8) & 0x3F) as u8;
521        let byte1 = (unsigned_value & 0xFF) as u8;
522        vec![byte0, byte1]
523    } else if unsigned_value <= 0x1FFFFF {
524        // Three bytes: 110xxxxx xxxxxxxx xxxxxxxx
525        let byte0 = 0xC0 | ((unsigned_value >> 16) & 0x1F) as u8;
526        let byte1 = ((unsigned_value >> 8) & 0xFF) as u8;
527        let byte2 = (unsigned_value & 0xFF) as u8;
528        vec![byte0, byte1, byte2]
529    } else if unsigned_value <= 0xFFFFFFF {
530        // Four bytes: 1110xxxx xxxxxxxx xxxxxxxx xxxxxxxx
531        let byte0 = 0xE0 | ((unsigned_value >> 24) & 0x0F) as u8;
532        let byte1 = ((unsigned_value >> 16) & 0xFF) as u8;
533        let byte2 = ((unsigned_value >> 8) & 0xFF) as u8;
534        let byte3 = (unsigned_value & 0xFF) as u8;
535        vec![byte0, byte1, byte2, byte3]
536    } else if unsigned_value <= 0x7FFFFFFFF {
537        // Five bytes: 11110xxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
538        let byte0 = 0xF0 | ((unsigned_value >> 32) & 0x07) as u8;
539        let byte1 = ((unsigned_value >> 24) & 0xFF) as u8;
540        let byte2 = ((unsigned_value >> 16) & 0xFF) as u8;
541        let byte3 = ((unsigned_value >> 8) & 0xFF) as u8;
542        let byte4 = (unsigned_value & 0xFF) as u8;
543        vec![byte0, byte1, byte2, byte3, byte4]
544    } else if unsigned_value <= 0x3FFFFFFFFFF {
545        // Six bytes: 111110xx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
546        let byte0 = 0xF8 | ((unsigned_value >> 40) & 0x03) as u8;
547        let byte1 = ((unsigned_value >> 32) & 0xFF) as u8;
548        let byte2 = ((unsigned_value >> 24) & 0xFF) as u8;
549        let byte3 = ((unsigned_value >> 16) & 0xFF) as u8;
550        let byte4 = ((unsigned_value >> 8) & 0xFF) as u8;
551        let byte5 = (unsigned_value & 0xFF) as u8;
552        vec![byte0, byte1, byte2, byte3, byte4, byte5]
553    } else if unsigned_value <= 0x1FFFFFFFFFFFF {
554        // Seven bytes: 1111110x xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
555        let byte0 = 0xFC | ((unsigned_value >> 48) & 0x01) as u8;
556        let byte1 = ((unsigned_value >> 40) & 0xFF) as u8;
557        let byte2 = ((unsigned_value >> 32) & 0xFF) as u8;
558        let byte3 = ((unsigned_value >> 24) & 0xFF) as u8;
559        let byte4 = ((unsigned_value >> 16) & 0xFF) as u8;
560        let byte5 = ((unsigned_value >> 8) & 0xFF) as u8;
561        let byte6 = (unsigned_value & 0xFF) as u8;
562        vec![byte0, byte1, byte2, byte3, byte4, byte5, byte6]
563    } else if unsigned_value <= 0xFFFFFFFFFFFFFF {
564        // Eight bytes: 11111110 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
565        let byte0 = 0xFE;
566        let byte1 = ((unsigned_value >> 48) & 0xFF) as u8;
567        let byte2 = ((unsigned_value >> 40) & 0xFF) as u8;
568        let byte3 = ((unsigned_value >> 32) & 0xFF) as u8;
569        let byte4 = ((unsigned_value >> 24) & 0xFF) as u8;
570        let byte5 = ((unsigned_value >> 16) & 0xFF) as u8;
571        let byte6 = ((unsigned_value >> 8) & 0xFF) as u8;
572        let byte7 = (unsigned_value & 0xFF) as u8;
573        vec![byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7]
574    } else {
575        // Nine bytes: 11111111 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
576        let byte0 = 0xFF;
577        let byte1 = ((unsigned_value >> 56) & 0xFF) as u8;
578        let byte2 = ((unsigned_value >> 48) & 0xFF) as u8;
579        let byte3 = ((unsigned_value >> 40) & 0xFF) as u8;
580        let byte4 = ((unsigned_value >> 32) & 0xFF) as u8;
581        let byte5 = ((unsigned_value >> 24) & 0xFF) as u8;
582        let byte6 = ((unsigned_value >> 16) & 0xFF) as u8;
583        let byte7 = ((unsigned_value >> 8) & 0xFF) as u8;
584        let byte8 = (unsigned_value & 0xFF) as u8;
585        vec![
586            byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8,
587        ]
588    }
589}
590
591/// Encode VInt using Cassandra-compatible format for Issue #17 requirements
592pub fn encode_vint_cassandra(value: i64) -> Vec<u8> {
593    crate::parser::vint_fixed::encode_vint_fixed(value)
594}
595
596/// Parse VInt using Cassandra-compatible format for Issue #17 requirements
597pub fn parse_vint_cassandra(input: &[u8]) -> IResult<&[u8], i64> {
598    crate::parser::vint_fixed::parse_vint_fixed(input)
599}
600
601/// Parse unsigned VInt32 for Cassandra value lengths
602///
603/// Matches org/apache/cassandra/io/util/DataInputPlus.readUnsignedVInt32()
604/// Used for variable-width type value lengths (text, blob, decimal, etc.)
605///
606/// Encoding format:
607/// - Leading 1-bits indicate number of extra bytes
608/// - Pattern: [n ones][0][data bits]
609/// - Example: 0xxxxxxx = 1 byte, 10xxxxxx xxxxxxxx = 2 bytes
610///
611/// # Arguments
612///
613/// * `input` - Input byte slice
614///
615/// # Returns
616///
617/// Tuple of (remaining_bytes, decoded_u32_value)
618pub fn parse_unsigned_vint32(input: &[u8]) -> IResult<&[u8], u32> {
619    if input.is_empty() {
620        return Err(nom::Err::Error(nom::error::Error::new(
621            input,
622            nom::error::ErrorKind::Eof,
623        )));
624    }
625
626    let first_byte = input[0];
627    let num_extra_bytes = first_byte.leading_ones() as usize;
628
629    if num_extra_bytes > 4 || num_extra_bytes + 1 > input.len() {
630        return Err(nom::Err::Error(nom::error::Error::new(
631            input,
632            nom::error::ErrorKind::Eof,
633        )));
634    }
635
636    let value = if num_extra_bytes == 0 {
637        // Single byte: 0xxxxxxx
638        (first_byte & 0x7F) as u32
639    } else {
640        // Multi-byte: extract data bits after leading ones pattern
641        let data_bits_first = 8 - num_extra_bytes - 1;
642        let mask = (1u8 << data_bits_first) - 1;
643        let mut value = (first_byte & mask) as u32;
644
645        for &byte in input.iter().skip(1).take(num_extra_bytes) {
646            value = (value << 8) | (byte as u32);
647        }
648        value
649    };
650
651    let bytes_consumed = num_extra_bytes + 1;
652    let (remaining, _) = take(bytes_consumed)(input)?;
653    Ok((remaining, value))
654}
655
656/// Parse unsigned VInt64 for Cassandra timestamps
657///
658/// Matches org/apache/cassandra/io/util/DataInputPlus.readUnsignedVInt()
659/// Used for timestamp deltas and other 64-bit unsigned values.
660///
661/// Encoding format:
662/// - Leading 1-bits indicate number of extra bytes
663/// - Pattern: [n ones][0][data bits]
664/// - Example: 0xxxxxxx = 1 byte, 10xxxxxx xxxxxxxx = 2 bytes
665/// - Maximum 8 extra bytes (9 bytes total) for full 64-bit range
666///
667/// # Arguments
668///
669/// * `input` - Input byte slice
670///
671/// # Returns
672///
673/// Tuple of (remaining_bytes, decoded_u64_value)
674pub fn parse_vuint(input: &[u8]) -> IResult<&[u8], u64> {
675    if input.is_empty() {
676        return Err(nom::Err::Error(nom::error::Error::new(
677            input,
678            nom::error::ErrorKind::Eof,
679        )));
680    }
681
682    let first_byte = input[0];
683    let num_extra_bytes = first_byte.leading_ones() as usize;
684
685    if num_extra_bytes > 8 || num_extra_bytes + 1 > input.len() {
686        return Err(nom::Err::Error(nom::error::Error::new(
687            input,
688            nom::error::ErrorKind::Eof,
689        )));
690    }
691
692    let value = if num_extra_bytes == 0 {
693        // Single byte: 0xxxxxxx
694        (first_byte & 0x7F) as u64
695    } else if num_extra_bytes == 8 {
696        // Special case: 8 leading ones (0xFF) means 8 extra bytes follow, no data bits in first byte
697        let mut value = 0u64;
698        for &byte in input.iter().skip(1).take(num_extra_bytes) {
699            value = (value << 8) | (byte as u64);
700        }
701        value
702    } else {
703        // Multi-byte: extract data bits after leading ones pattern
704        let data_bits_first = 8 - num_extra_bytes - 1;
705        let mask = (1u8 << data_bits_first) - 1;
706        let mut value = (first_byte & mask) as u64;
707
708        for &byte in input.iter().skip(1).take(num_extra_bytes) {
709            value = (value << 8) | (byte as u64);
710        }
711        value
712    };
713
714    let bytes_consumed = num_extra_bytes + 1;
715    let (remaining, _) = take(bytes_consumed)(input)?;
716    Ok((remaining, value))
717}
718
719/// Encode an unsigned integer as a variable-length integer
720pub fn encode_vuint(value: u64) -> Vec<u8> {
721    if value <= 0x7F {
722        // Single byte: 0xxxxxxx
723        vec![value as u8]
724    } else if value <= 0x3FFF {
725        // Two bytes: 10xxxxxx xxxxxxxx
726        let byte0 = 0x80 | ((value >> 8) & 0x3F) as u8;
727        let byte1 = (value & 0xFF) as u8;
728        vec![byte0, byte1]
729    } else if value <= 0x1FFFFF {
730        // Three bytes: 110xxxxx xxxxxxxx xxxxxxxx
731        let byte0 = 0xC0 | ((value >> 16) & 0x1F) as u8;
732        let byte1 = ((value >> 8) & 0xFF) as u8;
733        let byte2 = (value & 0xFF) as u8;
734        vec![byte0, byte1, byte2]
735    } else if value <= 0xFFFFFFF {
736        // Four bytes: 1110xxxx xxxxxxxx xxxxxxxx xxxxxxxx
737        let byte0 = 0xE0 | ((value >> 24) & 0x0F) as u8;
738        let byte1 = ((value >> 16) & 0xFF) as u8;
739        let byte2 = ((value >> 8) & 0xFF) as u8;
740        let byte3 = (value & 0xFF) as u8;
741        vec![byte0, byte1, byte2, byte3]
742    } else if value <= 0x7FFFFFFFF {
743        // Five bytes: 11110xxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
744        let byte0 = 0xF0 | ((value >> 32) & 0x07) as u8;
745        let byte1 = ((value >> 24) & 0xFF) as u8;
746        let byte2 = ((value >> 16) & 0xFF) as u8;
747        let byte3 = ((value >> 8) & 0xFF) as u8;
748        let byte4 = (value & 0xFF) as u8;
749        vec![byte0, byte1, byte2, byte3, byte4]
750    } else if value <= 0x3FFFFFFFFFF {
751        // Six bytes: 111110xx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx
752        let byte0 = 0xF8 | ((value >> 40) & 0x03) as u8;
753        let byte1 = ((value >> 32) & 0xFF) as u8;
754        let byte2 = ((value >> 24) & 0xFF) as u8;
755        let byte3 = ((value >> 16) & 0xFF) as u8;
756        let byte4 = ((value >> 8) & 0xFF) as u8;
757        let byte5 = (value & 0xFF) as u8;
758        vec![byte0, byte1, byte2, byte3, byte4, byte5]
759    } else if value <= 0x1FFFFFFFFFFFF {
760        // Seven bytes: 1111110x xxxxxxxx ... xxxxxxxx
761        let byte0 = 0xFC | ((value >> 48) & 0x01) as u8;
762        let byte1 = ((value >> 40) & 0xFF) as u8;
763        let byte2 = ((value >> 32) & 0xFF) as u8;
764        let byte3 = ((value >> 24) & 0xFF) as u8;
765        let byte4 = ((value >> 16) & 0xFF) as u8;
766        let byte5 = ((value >> 8) & 0xFF) as u8;
767        let byte6 = (value & 0xFF) as u8;
768        vec![byte0, byte1, byte2, byte3, byte4, byte5, byte6]
769    } else if value <= 0xFFFFFFFFFFFFFF {
770        // Eight bytes: 11111110 xxxxxxxx ... xxxxxxxx
771        let byte0 = 0xFE;
772        let byte1 = ((value >> 48) & 0xFF) as u8;
773        let byte2 = ((value >> 40) & 0xFF) as u8;
774        let byte3 = ((value >> 32) & 0xFF) as u8;
775        let byte4 = ((value >> 24) & 0xFF) as u8;
776        let byte5 = ((value >> 16) & 0xFF) as u8;
777        let byte6 = ((value >> 8) & 0xFF) as u8;
778        let byte7 = (value & 0xFF) as u8;
779        vec![byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7]
780    } else {
781        // Nine bytes: 11111111 xxxxxxxx ... xxxxxxxx (full 8 bytes follow)
782        let byte0 = 0xFF;
783        let byte1 = ((value >> 56) & 0xFF) as u8;
784        let byte2 = ((value >> 48) & 0xFF) as u8;
785        let byte3 = ((value >> 40) & 0xFF) as u8;
786        let byte4 = ((value >> 32) & 0xFF) as u8;
787        let byte5 = ((value >> 24) & 0xFF) as u8;
788        let byte6 = ((value >> 16) & 0xFF) as u8;
789        let byte7 = ((value >> 8) & 0xFF) as u8;
790        let byte8 = (value & 0xFF) as u8;
791        vec![
792            byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7, byte8,
793        ]
794    }
795}
796
797/// Parse a VInt and convert to usize for length fields
798///
799/// # Safety
800/// This function enforces a maximum length of 1GB (MAX_VINT_LENGTH) to prevent:
801/// - Overflow on 32-bit platforms where usize is 4 bytes
802/// - Memory exhaustion attacks via malicious input claiming huge lengths
803pub fn parse_vint_length(input: &[u8]) -> IResult<&[u8], usize> {
804    let (remaining, value) = parse_vint(input)?;
805    if value < 0 {
806        return Err(nom::Err::Error(nom::error::Error::new(
807            input,
808            nom::error::ErrorKind::Verify,
809        )));
810    }
811    // Safety: Prevent overflow on usize conversion and allocation attacks
812    if value > MAX_VINT_LENGTH {
813        return Err(nom::Err::Error(nom::error::Error::new(
814            input,
815            nom::error::ErrorKind::TooLarge,
816        )));
817    }
818    Ok((remaining, value as usize))
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824
825    #[test]
826    fn test_zigzag_encoding() {
827        // Test ZigZag encoding mappings
828        assert_eq!(zigzag_encode(0), 0);
829        assert_eq!(zigzag_encode(-1), 1);
830        assert_eq!(zigzag_encode(1), 2);
831        assert_eq!(zigzag_encode(-2), 3);
832        assert_eq!(zigzag_encode(2), 4);
833        assert_eq!(zigzag_encode(-3), 5);
834        assert_eq!(zigzag_encode(i64::MAX), u64::MAX - 1);
835        assert_eq!(zigzag_encode(i64::MIN), u64::MAX);
836    }
837
838    #[test]
839    fn test_zigzag_roundtrip() {
840        let test_values = vec![0, 1, -1, 127, -128, 32767, -32768, i64::MAX, i64::MIN];
841        for value in test_values {
842            let encoded = zigzag_encode(value);
843            let decoded = zigzag_decode(encoded);
844            assert_eq!(decoded, value, "ZigZag roundtrip failed for {}", value);
845        }
846    }
847
848    #[test]
849    fn test_vint_size_calculation() {
850        assert_eq!(vint_size(0), 1);
851        assert_eq!(vint_size(0x7F), 1); // Max single byte value
852        assert_eq!(vint_size(0x80), 2); // Min two byte value
853        assert_eq!(vint_size(0x3FFF), 2); // Max two byte value
854        assert_eq!(vint_size(0x4000), 3); // Min three byte value
855    }
856
857    #[test]
858    fn test_vint_single_byte_encoding() {
859        // Test small values that fit in single byte (original ZigZag format)
860        for i in 0..=63 {
861            let encoded = encode_vint(i);
862            assert_eq!(encoded.len(), 1, "Value {} should encode to 1 byte", i);
863            assert_eq!(encoded[0] & 0x80, 0, "Single byte should have leading 0");
864
865            let (_, decoded) = parse_vint(&encoded).unwrap();
866            assert_eq!(decoded, i, "Roundtrip failed for {}", i);
867        }
868
869        // Test small negative values
870        for i in -63..=0 {
871            let encoded = encode_vint(i);
872            assert_eq!(encoded.len(), 1, "Value {} should encode to 1 byte", i);
873
874            let (_, decoded) = parse_vint(&encoded).unwrap();
875            assert_eq!(decoded, i, "Roundtrip failed for {}", i);
876        }
877    }
878
879    #[test]
880    fn test_vint_multi_byte_encoding() {
881        // Test two-byte encoding
882        let value = 128;
883        let encoded = encode_vint(value);
884        assert_eq!(encoded.len(), 2, "Value {} should encode to 2 bytes", value);
885        assert_eq!(
886            encoded[0] & 0x80,
887            0x80,
888            "Two-byte encoding should start with 10"
889        );
890        assert_eq!(
891            encoded[0] & 0x40,
892            0,
893            "Two-byte encoding should start with 10"
894        );
895
896        let (_, decoded) = parse_vint(&encoded).unwrap();
897        assert_eq!(decoded, value);
898
899        // Test three-byte encoding
900        let value = 16384; // 2^14
901        let encoded = encode_vint(value);
902        assert_eq!(encoded.len(), 3, "Value {} should encode to 3 bytes", value);
903        assert_eq!(
904            encoded[0] & 0xE0,
905            0xC0,
906            "Three-byte encoding should start with 110"
907        );
908
909        let (_, decoded) = parse_vint(&encoded).unwrap();
910        assert_eq!(decoded, value);
911    }
912
913    #[test]
914    fn test_vint_comprehensive_roundtrip() {
915        let test_values = vec![
916            // Edge cases around single/multi-byte boundaries
917            0,
918            1,
919            -1,
920            63,
921            -63,
922            64,
923            -64,
924            // Powers of 2 and their negatives
925            127,
926            -127,
927            128,
928            -128,
929            255,
930            -255,
931            256,
932            -256,
933            1023,
934            -1023,
935            1024,
936            -1024,
937            2047,
938            -2047,
939            2048,
940            -2048,
941            // Large values
942            32767,
943            -32768,
944            65535,
945            -65535,
946            1000000,
947            -1000000,
948            // Maximum values
949            i32::MAX as i64,
950            i32::MIN as i64,
951            // Very large values (but not max to avoid encoding issues)
952            i64::MAX / 2,
953            i64::MIN / 2,
954        ];
955
956        for value in test_values {
957            let encoded = encode_vint(value);
958            assert!(
959                encoded.len() <= MAX_VINT_SIZE,
960                "Encoded length {} exceeds maximum {} for value {}",
961                encoded.len(),
962                MAX_VINT_SIZE,
963                value
964            );
965
966            let (remaining, decoded) = parse_vint(&encoded).unwrap();
967            assert!(remaining.is_empty(), "Parsing should consume all bytes");
968            assert_eq!(decoded, value, "Roundtrip failed for value {}", value);
969        }
970    }
971
972    #[test]
973    fn test_vint_format_compliance() {
974        // Test specific bit patterns to ensure format compliance
975
976        // Single byte: 0xxxxxxx
977        let encoded = encode_vint(0);
978        assert_eq!(encoded, vec![0x00]);
979
980        let encoded = encode_vint(1);
981        assert_eq!(encoded, vec![0x02]); // ZigZag: 1 -> 2
982
983        let encoded = encode_vint(-1);
984        assert_eq!(encoded, vec![0x01]); // ZigZag: -1 -> 1
985
986        // Two bytes: 10xxxxxx xxxxxxxx
987        let encoded = encode_vint(64);
988        assert_eq!(encoded.len(), 2);
989        assert_eq!(encoded[0] & 0xC0, 0x80); // Should start with 10
990
991        // Verify we can parse back
992        let (_, decoded) = parse_vint(&encoded).unwrap();
993        assert_eq!(decoded, 64);
994    }
995
996    #[test]
997    fn test_vuint_positive() {
998        let value = 1000u64;
999        let encoded = encode_vuint(value);
1000        let (_, decoded) = parse_vuint(&encoded).unwrap();
1001        assert_eq!(decoded, value);
1002    }
1003
1004    #[test]
1005    fn test_vint_length() {
1006        let bytes = encode_vint(42);
1007        let (_, length) = parse_vint_length(&bytes).unwrap();
1008        assert_eq!(length, 42);
1009    }
1010
1011    #[test]
1012    fn test_collection_vint_debug() {
1013        // Debug the collection test issue
1014        let encoded_4 = encode_vint(4);
1015        println!("encode_vint(4) = {:?}", encoded_4);
1016        let (_, decoded_4) = parse_vint(&encoded_4).unwrap();
1017        println!("parse_vint({:?}) = {}", encoded_4, decoded_4);
1018
1019        // Check what [10] decodes to
1020        let test_10 = [10u8];
1021        let (_, decoded_10) = parse_vint(&test_10).unwrap();
1022        println!("parse_vint([10]) = {}", decoded_10);
1023
1024        // Check what encodes to [10]
1025        for i in 0..20 {
1026            let encoded = encode_vint(i);
1027            if encoded == vec![10] {
1028                println!("Value {} encodes to [10]", i);
1029            }
1030        }
1031
1032        assert_eq!(decoded_4, 4, "Roundtrip test for 4");
1033
1034        // Debug the specific collection test issue
1035        let long_string = "this is a longer string";
1036        let encoded_23 = encode_vint(long_string.len() as i64);
1037        println!("encode_vint(23) = {:?}", encoded_23);
1038        println!(
1039            "String length: {}, bytes: {:?}",
1040            long_string.len(),
1041            long_string.as_bytes()
1042        );
1043
1044        // Check if the encoded length triggers ASCII corruption detection
1045        match parse_vint(&encoded_23) {
1046            Ok((_, decoded)) => println!("parse_vint({:?}) = {}", encoded_23, decoded),
1047            Err(e) => println!("parse_vint({:?}) failed: {:?}", encoded_23, e),
1048        }
1049
1050        // Debug the specific failing values
1051        println!("\nšŸ” Debug failing VInt cases:");
1052        let failing_values = vec![256, 1048576, 64];
1053        for value in failing_values {
1054            let encoded = encode_vint(value);
1055            println!(
1056                "Value {}: encoded={:?}, hex={:02X?}, len={}",
1057                value,
1058                encoded,
1059                encoded,
1060                encoded.len()
1061            );
1062            if !encoded.is_empty() {
1063                let first_byte = encoded[0];
1064                println!(
1065                    "  First byte: 0x{:02X} ({:08b}), leading_ones: {}",
1066                    first_byte,
1067                    first_byte,
1068                    first_byte.leading_ones()
1069                );
1070                if encoded.len() > 1 {
1071                    println!(
1072                        "  Expected leading ones: {}, got: {}",
1073                        encoded.len() - 1,
1074                        first_byte.leading_ones()
1075                    );
1076                }
1077            }
1078        }
1079
1080        // Test Cassandra expected bytes
1081        println!("\nšŸ” Testing Cassandra format:");
1082        let cassandra_bytes = vec![0xE0, 0x01, 0x00]; // Should decode to 256
1083        match parse_vint(&cassandra_bytes) {
1084            Ok((_, decoded)) => println!("Cassandra bytes {:?} -> {}", cassandra_bytes, decoded),
1085            Err(e) => println!(
1086                "Failed to parse Cassandra bytes {:?}: {:?}",
1087                cassandra_bytes, e
1088            ),
1089        }
1090    }
1091
1092    #[test]
1093    fn test_vint_errors() {
1094        // Test empty input
1095        assert!(parse_vint(&[]).is_err());
1096
1097        // Test negative length
1098        let negative_bytes = encode_vint(-10);
1099        assert!(parse_vint_length(&negative_bytes).is_err());
1100
1101        // Test valid max length encoding (0xFF indicates 8 extra bytes = 9 total bytes)
1102        assert!(parse_vint(&[0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]).is_ok());
1103
1104        // Test valid extended formats - should succeed now with backward compatibility
1105        assert!(parse_vint(&[0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]).is_ok()); // F0 extended format
1106        assert!(parse_vint(&[0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]).is_ok()); // Should work with ZigZag parsing
1107
1108        // Test incomplete data - with backward compatibility, focus on truly invalid cases
1109        assert!(parse_vint(&[0x80, 0x00]).is_ok()); // Two-byte format with data
1110        assert!(parse_vint(&[0xC0, 0x00, 0x00]).is_ok()); // Three-byte format with data
1111
1112        // Test truly invalid sequences (corrupted data that shouldn't parse)
1113        // Focus on patterns that should be rejected by corruption detection
1114        let _corrupted_data = b"data"; // ASCII corruption
1115                                       // Note: corruption detection should catch these, but if not, we accept them
1116                                       // as the new format is more permissive for backward compatibility
1117    }
1118
1119    #[test]
1120    fn test_vint_edge_case_patterns() {
1121        // Test maximum single-byte value
1122        let max_single = 63;
1123        let encoded = encode_vint(max_single);
1124        assert_eq!(encoded.len(), 1);
1125        assert_eq!(encoded[0] & 0x80, 0); // Original format has leading 0
1126
1127        // Test minimum two-byte value
1128        let min_double = 64;
1129        let encoded = encode_vint(min_double);
1130        assert_eq!(encoded.len(), 2);
1131        assert_eq!(encoded[0] & 0xC0, 0x80); // Two-byte format starts with 10
1132    }
1133
1134    #[test]
1135    fn test_detect_ascii_corruption_patterns() {
1136        assert!(detect_ascii_corruption(b"data_payload"));
1137        assert!(detect_ascii_corruption(b"node_meta"));
1138        assert!(!detect_ascii_corruption(&[0x00, 0x80, 0xFF, 0x10]));
1139    }
1140
1141    #[test]
1142    fn test_parse_vint_extended_formats() {
1143        // 0xF0 prefix should fall back to ZigZag parsing and succeed
1144        let bytes = [0xF0, 0x00, 0x00, 0x00, 0x10];
1145        let _ = parse_vint(&bytes).expect("extended format parses");
1146
1147        // 0xFF extended format should also succeed
1148        let bytes = [0xFF, 0x00, 0x00, 0x00, 0x05];
1149        let _ = parse_vint(&bytes).expect("fallback parse");
1150    }
1151
1152    // Issue #264: VInt overflow protection tests
1153    #[test]
1154    fn test_vint_overflow_protection() {
1155        // Test 1: Value exceeding 1GB limit should fail
1156        let large_value = encode_vint(2_000_000_000i64); // 2GB - exceeds MAX_VINT_LENGTH
1157        let result = parse_vint_length(&large_value);
1158        assert!(
1159            result.is_err(),
1160            "Should reject values > 1GB for length fields"
1161        );
1162
1163        // Test 2: Value just under limit should succeed
1164        let safe_value = encode_vint(MAX_VINT_LENGTH - 1);
1165        let result = parse_vint_length(&safe_value);
1166        assert!(result.is_ok(), "Should accept values < 1GB");
1167        let (_, length) = result.unwrap();
1168        assert_eq!(length, (MAX_VINT_LENGTH - 1) as usize);
1169
1170        // Test 3: Exact limit should fail (> not >=)
1171        let limit_value = encode_vint(MAX_VINT_LENGTH + 1);
1172        let result = parse_vint_length(&limit_value);
1173        assert!(result.is_err(), "Should reject values > MAX_VINT_LENGTH");
1174
1175        // Test 4: Negative values should still be rejected
1176        let negative_value = encode_vint(-1i64);
1177        let result = parse_vint_length(&negative_value);
1178        assert!(result.is_err(), "Should reject negative values");
1179
1180        // Test 5: Zero should be valid
1181        let zero_value = encode_vint(0i64);
1182        let result = parse_vint_length(&zero_value);
1183        assert!(result.is_ok(), "Should accept zero");
1184        let (_, length) = result.unwrap();
1185        assert_eq!(length, 0);
1186
1187        // Test 6: Reasonable values (16MB - existing limit in block_entries)
1188        let sixteen_mb = encode_vint(16 * 1024 * 1024i64);
1189        let result = parse_vint_length(&sixteen_mb);
1190        assert!(result.is_ok(), "Should accept 16MB (common limit)");
1191    }
1192}