Skip to main content

cqlite_core/parser/
enhanced_statistics_parser.rs

1//! Enhanced Statistics.db parser for Cassandra 5.0 'nb' format
2//!
3//! # Implementation Status (Issue #162)
4//!
5//! This module provides **MINIMAL PARSING** of nb-format Statistics.db files to support
6//! delta-coded timestamp decoding in V5CompressedLegacy parser.
7//!
8//! ## Current Implementation
9//!
10//! Parses ONLY the EncodingStats fields required for delta decoding:
11//! - Header (32 bytes): version, data_length, checksum, metadata
12//! - EncodingStats section: partitioner, minTimestamp, minLocalDeletionTime, minTTL
13//!
14//! All other statistics (row counts, histograms, column stats, etc.) are populated with
15//! placeholder values. This is sufficient for V5CompressedLegacy parser baseline values.
16//!
17//! ## Previous Implementation (REMOVED)
18//!
19//! The previous implementation violated the no-heuristics mandate (Issue #28) by fabricating
20//! statistics from header metadata. It was removed and replaced with this minimal real-data
21//! parser that extracts only what's needed from the actual binary format.
22//!
23//! ## Deferred to Future Milestones
24//!
25//! Complete Statistics.db parsing including:
26//! - Row count statistics and distribution histograms
27//! - Column-level statistics and cardinality estimates
28//! - Partition size histograms and percentiles
29//! - Compression ratio and performance metrics
30//! - Checksum validation (header.checksum field not yet validated)
31//!
32//! ## References
33//!
34//! - Issue #162: Fix Statistics reader for Cassandra 5 nb format
35//! - Issue #28: No-heuristics mandate for modern Cassandra 5.0 paths
36//! - Issue #105: Remove heuristic estimation from enhanced_statistics_parser.rs
37//! - `docs/development/rust_developer_guide.md`: Architecture decisions
38
39use super::statistics::*;
40use super::vint::parse_vuint;
41use crate::error::{Error, Result};
42use crate::storage::sstable::version_gate::VersionGates;
43use nom::{bytes::complete::take, number::complete::be_u32, IResult};
44
45/// Cassandra MetadataType enum ordinals (from MetadataType.java)
46/// Used to identify component types in Statistics.db TOC
47#[allow(dead_code)]
48const METADATA_TYPE_VALIDATION: u32 = 0;
49#[allow(dead_code)]
50const METADATA_TYPE_COMPACTION: u32 = 1;
51#[allow(dead_code)]
52const METADATA_TYPE_STATS: u32 = 2;
53const METADATA_TYPE_HEADER: u32 = 3; // SerializationHeader
54
55/// Epoch constants matching Cassandra's EncodingStats.java (EncodingStats.Serializer)
56/// Used for delta-encoding/decoding EncodingStats fields in Statistics.db SERIALIZATION_HEADER.
57/// Cassandra serializes: writeUnsignedVInt(value - EPOCH)
58/// Cassandra deserializes: readUnsignedVInt() + EPOCH
59const TIMESTAMP_EPOCH: i64 = 1_442_880_000_000_000; // Sept 22, 2015 00:00:00 UTC in microseconds
60const DELETION_TIME_EPOCH: i64 = 1_442_880_000; // Sept 22, 2015 00:00:00 UTC in seconds
61                                                // TTL epoch is 0 in Cassandra, but kept for consistency with the delta-encoding pattern
62const TTL_EPOCH: i64 = 0;
63
64/// Type alias for EncodingStats parse result to reduce complexity
65type EncodingStatsResult = (
66    i64,
67    i64,
68    Option<i64>,
69    Vec<super::header::ColumnInfo>,
70    Vec<super::header::ColumnInfo>,
71    Vec<super::header::ColumnInfo>,
72);
73
74/// Type alias for SerializationHeader parse result to reduce complexity
75type SerializationHeaderResult = (Vec<String>, Vec<String>, Vec<super::header::ColumnInfo>);
76
77/// Enhanced Statistics.db header parser for real 'nb' format
78///
79/// This function parses the actual 32-byte binary header structure from
80/// Cassandra 5.0 Statistics.db files. Based on hex analysis of real files:
81///
82/// ```text
83/// 00000000  00 00 00 04 26 29 1b 05  00 00 00 00 00 00 00 2c
84/// 00000010  00 00 00 01 00 00 00 65  00 00 00 02 00 00 14 d4
85/// ```
86///
87/// # Binary Format (32 bytes)
88///
89/// - Bytes 0-3:   `version_type` (u32 BE) - Format version identifier (e.g., 0x00000004)
90/// - Bytes 4-7:   `statistics_kind` (u32 BE) - Statistics type marker (e.g., 0x26291b05)
91/// - Bytes 8-11:  `reserved1` (u32 BE) - Reserved field (typically 0x00000000)
92/// - Bytes 12-15: `data_length` (u32 BE) - Length of variable-length data section
93/// - Bytes 16-19: `metadata1` (u32 BE) - Metadata field (purpose TBD in M2)
94/// - Bytes 20-23: `metadata2` (u32 BE) - Metadata field (purpose TBD in M2)
95/// - Bytes 24-27: `metadata3` (u32 BE) - Metadata field (purpose TBD in M2)
96/// - Bytes 28-31: `checksum_or_more` (u32 BE) - Checksum or additional metadata
97///
98/// # Returns
99///
100/// `Ok((remaining_input, StatisticsHeader))` on successful parse of 32-byte header.
101///
102/// # Note
103///
104/// This is the ONLY function in this module that reads actual binary data.
105/// All other parsing functions have been removed per Issue #28 mandate.
106pub fn parse_nb_format_header(input: &[u8]) -> IResult<&[u8], StatisticsHeader> {
107    let (input, version_type) = be_u32(input)?;
108    let (input, statistics_kind) = be_u32(input)?;
109    let (input, _reserved1) = be_u32(input)?;
110    let (input, data_length) = be_u32(input)?;
111    let (input, metadata1) = be_u32(input)?;
112    let (input, metadata2) = be_u32(input)?;
113    let (input, metadata3) = be_u32(input)?;
114    let (input, checksum_or_more) = be_u32(input)?;
115
116    Ok((
117        input,
118        StatisticsHeader {
119            version: version_type,
120            statistics_kind,
121            data_length,
122            metadata1,
123            metadata2,
124            metadata3,
125            checksum: checksum_or_more,
126            table_id: None,
127        },
128    ))
129}
130
131/// Parse Statistics.db Table of Contents to get component offsets (Issue #216)
132///
133/// Statistics.db format (from Cassandra MetadataSerializer.java):
134/// - [4 bytes] number_of_components (u32 BE)
135/// - [4 bytes] checksum (u32 BE)
136/// - [TOC] component_type (u32) | offset (u32) for each component
137/// - [Component data...]
138///
139/// MetadataType enum ordinals:
140/// - 0 = VALIDATION
141/// - 1 = COMPACTION
142/// - 2 = STATS
143/// - 3 = HEADER (SerializationHeader)
144///
145/// Returns the offset to the HEADER component (SerializationHeader), or None if not found.
146fn parse_statistics_toc_for_header_offset(input: &[u8]) -> Option<usize> {
147    if input.len() < 8 {
148        log::debug!("Statistics.db too small for TOC: {} bytes", input.len());
149        return None;
150    }
151
152    // Parse number of components
153    let num_components = u32::from_be_bytes([input[0], input[1], input[2], input[3]]);
154    log::debug!("Statistics.db TOC: {} components", num_components);
155
156    // Sanity check: Cassandra has exactly 4 MetadataType enum values
157    // (VALIDATION=0, COMPACTION=1, STATS=2, HEADER=3)
158    // A value > 100 indicates corrupted or malicious data
159    if num_components > 100 {
160        log::warn!(
161            "Suspicious num_components={} in Statistics.db TOC (expected <=4)",
162            num_components
163        );
164        return None;
165    }
166
167    // Skip checksum (bytes 4-7)
168    // TOC starts at byte 8
169
170    let toc_start: usize = 8;
171    let toc_entry_size: usize = 8; // 4 bytes type + 4 bytes offset
172
173    // Use checked_mul to prevent integer overflow on multiplication
174    let toc_size = (num_components as usize)
175        .checked_mul(toc_entry_size)
176        .and_then(|size| size.checked_add(toc_start))?;
177
178    if input.len() < toc_size {
179        log::debug!(
180            "Statistics.db too small for {} TOC entries: {} bytes (need {})",
181            num_components,
182            input.len(),
183            toc_size
184        );
185        return None;
186    }
187
188    // Search for HEADER component (type 3)
189    for i in 0..num_components as usize {
190        // Use checked arithmetic to prevent overflow in entry offset calculation
191        let entry_offset = i
192            .checked_mul(toc_entry_size)
193            .and_then(|offset| offset.checked_add(toc_start))?;
194        let component_type = u32::from_be_bytes([
195            input[entry_offset],
196            input[entry_offset + 1],
197            input[entry_offset + 2],
198            input[entry_offset + 3],
199        ]);
200        let component_offset = u32::from_be_bytes([
201            input[entry_offset + 4],
202            input[entry_offset + 5],
203            input[entry_offset + 6],
204            input[entry_offset + 7],
205        ]) as usize;
206
207        log::debug!(
208            "TOC entry {}: type={} offset=0x{:x}",
209            i,
210            component_type,
211            component_offset
212        );
213
214        if component_type == METADATA_TYPE_HEADER {
215            log::debug!(
216                "Found HEADER component at offset 0x{:x} ({})",
217                component_offset,
218                component_offset
219            );
220            return Some(component_offset);
221        }
222    }
223
224    log::debug!("HEADER component not found in Statistics.db TOC");
225    None
226}
227
228/// Parse minimal nb-format statistics data for delta-coding baseline (Issue #162)
229///
230/// This implementation parses ONLY the EncodingStats fields required for delta decoding:
231/// - partitioner (string)
232/// - minTimestamp (VInt)
233/// - minLocalDeletionTime (VInt)
234/// - minTTL (VInt)
235///
236/// All other fields (histograms, column stats, etc.) are skipped to minimize complexity.
237/// This is sufficient for V5CompressedLegacy parser which needs baseline values for
238/// delta-coded timestamps and TTLs.
239///
240/// # Format (observed from real nb-format Statistics.db files)
241///
242/// After 32-byte header:
243/// - metadata_type (u32 BE) = 0x00000003 (indicates EncodingStats section)
244/// - data_length (VInt) - length of remaining data
245/// - partitioner_length (VInt) - length of partitioner class name string
246/// - partitioner (UTF-8 string) - e.g., "org.apache.cassandra.dht.Murmur3Partitioner"
247/// - additional_metadata (various VInts) - skipped
248/// - minTimestamp (VInt, microseconds)
249/// - minLocalDeletionTime (VInt, seconds)
250/// - minTTL (VInt, seconds)
251///
252/// # Returns
253///
254/// Partial statistics with only TimestampStatistics populated from real data.
255#[allow(clippy::type_complexity)]
256pub fn parse_nb_format_statistics_data(
257    input: &[u8],
258    header: &StatisticsHeader,
259    full_input: &[u8],
260    // VG3 plumbing: gates are threaded here so version-sensitive decisions in
261    // parse_encoding_stats_vuints (e.g. has_uint_deletion_time) can be flipped
262    // without re-deriving gates from the filename.
263    // Pass `None` from callers that do not have gates (standalone tools, tests).
264    gates: Option<&VersionGates>,
265) -> Result<(
266    RowStatistics,
267    TimestampStatistics,
268    TableStatistics,
269    PartitionStatistics,
270    CompressionStatistics,
271    Vec<super::header::ColumnInfo>,
272    Vec<super::header::ColumnInfo>,
273    Vec<super::header::ColumnInfo>,
274)> {
275    // Get HEADER offset from TOC (Issue #216)
276    let header_offset = parse_statistics_toc_for_header_offset(full_input);
277
278    // Parse the EncodingStats section from the data following the header
279    let result = parse_minimal_encoding_stats(input, full_input, header_offset, gates);
280
281    match result {
282        Ok((
283            _,
284            (
285                min_timestamp,
286                min_deletion_time,
287                min_ttl,
288                partition_columns,
289                clustering_columns,
290                regular_columns,
291            ),
292        )) => {
293            // Create minimal statistics with only timestamp data populated
294            let row_stats = RowStatistics {
295                total_rows: 0,
296                live_rows: 0,
297                tombstone_count: 0,
298                partition_count: 0,
299                avg_rows_per_partition: 0.0,
300                row_size_histogram: vec![],
301            };
302
303            let timestamp_stats = TimestampStatistics {
304                min_timestamp,
305                max_timestamp: min_timestamp, // Not parsed, use min as placeholder
306                min_deletion_time,
307                max_deletion_time: min_deletion_time,
308                min_ttl,
309                max_ttl: min_ttl,
310                rows_with_ttl: 0,
311            };
312
313            let table_stats = TableStatistics {
314                disk_size: 0,
315                uncompressed_size: 0,
316                compressed_size: 0,
317                compression_ratio: 1.0,
318                block_count: 0,
319                avg_block_size: 0.0,
320                index_size: 0,
321                bloom_filter_size: 0,
322                level_count: 0,
323            };
324
325            let partition_stats = PartitionStatistics {
326                avg_partition_size: 0.0,
327                min_partition_size: 0,
328                max_partition_size: 0,
329                large_partition_percentage: 0.0,
330                size_histogram: vec![],
331            };
332
333            let compression_stats = CompressionStatistics {
334                algorithm: "unknown".to_string(),
335                original_size: 0,
336                compressed_size: 0,
337                ratio: 1.0,
338                compression_speed: 0.0,
339                decompression_speed: 0.0,
340                compressed_blocks: 0,
341            };
342
343            Ok((
344                row_stats,
345                timestamp_stats,
346                table_stats,
347                partition_stats,
348                compression_stats,
349                partition_columns,
350                clustering_columns,
351                regular_columns,
352            ))
353        }
354        Err(e) => {
355            log::debug!(
356                "Failed to parse minimal EncodingStats from Statistics.db: {:?}",
357                e
358            );
359            Err(Error::UnsupportedFormat(format!(
360                "Failed to parse minimal nb-format Statistics.db EncodingStats: {:?}. \
361                         This is required for delta-coded timestamp decoding. \
362                         Header checksum: 0x{:08x}, data_length: {}",
363                e, header.checksum, header.data_length
364            )))
365        }
366    }
367}
368
369/// Parse SerializationHeader from Statistics.db (Issue #163)
370///
371/// This function locates and parses the complete SerializationHeader section including:
372/// 1. Partition key types
373/// 2. Clustering key types
374/// 3. Regular column definitions
375///
376/// Returns: (partition_key_types, clustering_key_types, regular_columns)
377fn parse_serialization_header(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
378    log::debug!(
379        "Searching for SerializationHeader in {} bytes (max search: 8KB)",
380        input.len()
381    );
382
383    // Log input buffer state at function entry
384    let preview_len = std::cmp::min(64, input.len());
385    let preview_hex: String = input[..preview_len]
386        .iter()
387        .map(|b| format!("{:02x}", b))
388        .collect::<Vec<_>>()
389        .join(" ");
390    log::debug!(
391        "Input buffer size: {} bytes, first 64 bytes: {}",
392        input.len(),
393        preview_hex
394    );
395
396    // Search for SerializationHeader start marker: VInt followed by 0x00 0x00 and '(' character
397    // This marks the beginning of the partition key type descriptor
398    let mut search_offset = 0;
399
400    // Search for SerializationHeader by finding "org.apache.cassandra.db.marshal" string
401    // and working backwards to find the 0x00 0x00 marker
402    // Format: [VInt unknown] [0x00 0x00] [VInt partition_type_len] [partition_type_string]
403    let marshal_pattern = b"org.apache.cassandra.db.marshal";
404
405    while search_offset + marshal_pattern.len() < input.len() && search_offset < 8192 {
406        if &input[search_offset..search_offset + marshal_pattern.len()] == marshal_pattern {
407            let context_start = search_offset.saturating_sub(10);
408            let context_end = (search_offset + 50).min(input.len());
409            log::debug!(
410                "Found 'org.apache.cassandra.db.marshal' at offset {}, context (offset-10 to offset+50): {:02x?}",
411                search_offset,
412                &input[context_start..context_end]
413            );
414
415            // Issue #216 fix: Look for the pattern [prev_zero] [pk_type_len] "org.apache..."
416            // where pk_type_len is a valid VInt length (0x01-0x7F for single byte, or multi-byte VInt)
417            // The prev_zero is typically the last byte of EncodingStats (minTTL=0) or another zero field.
418            //
419            // We need to find the START of the partition key type length, which is:
420            // - 1 byte before "org.apache..." for single-byte lengths (0x28 = 40 bytes for UUIDType)
421            // - 2 bytes before for two-byte VInt lengths (0x80 0xXX)
422
423            for lookback in 1..=15 {
424                if search_offset < lookback {
425                    break;
426                }
427                let type_len_offset = search_offset - lookback;
428
429                // Check if this could be a valid pk_type_len
430                // For single-byte VInt: values 0x01-0x7F
431                // For two-byte VInt: first byte has high bit set (0x80-0xFF)
432                let first_byte = input[type_len_offset];
433
434                // Common partition key type lengths:
435                // - UUIDType: 40 bytes (0x28)
436                // - UTF8Type: 40 bytes (0x28)
437                // - Int32Type: 41 bytes (0x29)
438                // - TimestampType: 45 bytes (0x2D)
439                // - CompositeType: ~80-150 bytes (0x50-0x96 or multi-byte VInt)
440
441                // Single-byte VInt: 0x20-0x7F are reasonable pk_type lengths (32-127 bytes)
442                let is_valid_single_byte_len = (0x20..=0x7F).contains(&first_byte);
443
444                // Two-byte VInt: 0x80-0xBF with continuation
445                let is_multi_byte_vint = first_byte >= 0x80;
446
447                if is_valid_single_byte_len || is_multi_byte_vint {
448                    // Try parsing from this offset using sequential parser
449                    let result = parse_serialization_header_sequential(&input[type_len_offset..]);
450                    if let Ok((remaining, (pk_types, ck_types, cols))) = result {
451                        // Validate: partition key type should contain expected substring
452                        if !pk_types.is_empty()
453                            && pk_types[0].contains("org.apache.cassandra.db.marshal")
454                        {
455                            log::debug!(
456                                "Successfully parsed SerializationHeader at offset {} (lookback: {}): pk_type={}",
457                                type_len_offset,
458                                lookback,
459                                pk_types[0]
460                            );
461                            return Ok((remaining, (pk_types, ck_types, cols)));
462                        }
463                    }
464                }
465
466                // Also try the legacy 0x00 0x00 marker for backward compatibility
467                if type_len_offset > 0 {
468                    let prev_offset = type_len_offset - 1;
469                    if input[prev_offset] == 0x00 && input[type_len_offset] == 0x00 {
470                        let result = parse_serialization_header_at_offset(&input[prev_offset..]);
471                        if result.is_ok() {
472                            log::debug!(
473                                "Successfully parsed SerializationHeader at legacy marker offset {}",
474                                prev_offset
475                            );
476                            return result;
477                        }
478                    }
479                }
480            }
481        }
482        search_offset += 1;
483    }
484
485    log::debug!(
486        "Search completed: searched {} bytes, no partition key type found",
487        search_offset
488    );
489
490    // Partition key type not found - try to find regular columns directly
491    // This handles files where SerializationHeader contains only regular columns
492    log::debug!("Attempting to parse regular columns without partition key metadata");
493    let (remaining, (partition_keys, columns)) = parse_regular_columns(input)?;
494
495    if !columns.is_empty() {
496        log::debug!(
497            "Successfully parsed {} regular columns, {} partition keys via backtracking",
498            columns.len(),
499            partition_keys.len()
500        );
501        return Ok((remaining, (partition_keys, Vec::new(), columns)));
502    }
503
504    // Nothing found - return empty results
505    log::warn!(
506        "Failed to locate SerializationHeader or regular columns: searched {} bytes",
507        search_offset
508    );
509
510    if let Some((pk_types, ck_types, cols)) = fallback_parse_serialization_header_ascii(input) {
511        log::debug!(
512            "ASCII fallback extracted SerializationHeader: {} partition keys, {} clustering keys, {} regular columns",
513            pk_types.len(),
514            ck_types.len(),
515            cols.len()
516        );
517        return Ok((input, (pk_types, ck_types, cols)));
518    }
519
520    Ok((input, (Vec::new(), Vec::new(), Vec::new())))
521}
522
523/// Parse SerializationHeader structure starting at a known offset
524fn parse_serialization_header_at_offset(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
525    use nom::bytes::complete::tag;
526    use nom::number::complete::u8 as parse_u8;
527
528    let _original_input = input;
529
530    // Step 1: Expect 0x00 0x00 marker
531    let (input, _) = tag(b"\x00\x00")(input)?;
532    log::debug!("Found 0x00 0x00 marker");
533
534    // Step 2: Parse partition key type (single byte length + string)
535    let (input, partition_type_len) = parse_u8(input)?;
536    log::debug!("Partition key type length: {} bytes", partition_type_len);
537
538    let (input, partition_type_bytes) =
539        nom::bytes::complete::take(partition_type_len as usize)(input)?;
540    let partition_key_type = std::str::from_utf8(partition_type_bytes)
541        .map_err(|_| nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify)))?
542        .to_string();
543
544    log::debug!("Partition key type: {}", partition_key_type);
545
546    // Step 3: Parse clustering key count (single byte)
547    let (input, clustering_count) = parse_u8(input)?;
548    log::debug!("Clustering key count: {}", clustering_count);
549
550    // Step 4: Parse clustering key types
551    let mut clustering_key_types = Vec::with_capacity(clustering_count as usize);
552    let mut input = input;
553
554    for idx in 0..clustering_count {
555        // Parse clustering type length (single byte)
556        let (remaining, type_len) = parse_u8(input)?;
557        log::debug!("Clustering key {} type length: {} bytes", idx, type_len);
558
559        let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
560        let clustering_type = std::str::from_utf8(type_bytes)
561            .map_err(|_| {
562                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
563            })?
564            .to_string();
565
566        log::debug!("Clustering key {} type: {}", idx, clustering_type);
567
568        clustering_key_types.push(clustering_type);
569        input = remaining;
570    }
571
572    // Step 5: Parse static column count (NOT a separator - this was the bug!)
573    // When static_count = 0, this byte is 0x00 which made simple tables work.
574    // But when static_count > 0, parsing failed.
575    let (input, static_count) = parse_u8(input)?;
576    log::debug!("Static column count: {}", static_count);
577
578    // Step 5a: Parse static columns
579    let mut static_columns = Vec::with_capacity(static_count as usize);
580    let mut input = input;
581
582    for static_idx in 0..static_count {
583        // Static column name length (single byte)
584        let (remaining, name_len) = parse_u8(input)?;
585        log::debug!(
586            "Static column {} name length: {} bytes",
587            static_idx,
588            name_len
589        );
590
591        // Validate name length (match validation in parse_regular_columns)
592        if name_len == 0 || name_len > 200 {
593            log::debug!(
594                "Static column {} name_len sanity check failed: {}",
595                static_idx,
596                name_len
597            );
598            return Err(nom::Err::Error(nom::error::Error::new(
599                input,
600                nom::error::ErrorKind::Verify,
601            )));
602        }
603
604        // Static column name (UTF-8 string)
605        let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
606        let column_name = std::str::from_utf8(name_bytes)
607            .map_err(|_| {
608                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
609            })?
610            .to_string();
611
612        // Static column type length (VInt - can exceed 127 for collection types)
613        let (remaining, type_len_u64) = parse_vuint(remaining)?;
614        log::debug!(
615            "Static column {} ('{}') type length: {} bytes",
616            static_idx,
617            column_name,
618            type_len_u64
619        );
620
621        // Validate type length (match validation in parse_regular_columns)
622        if type_len_u64 == 0 || type_len_u64 > 5000 {
623            log::debug!(
624                "Static column {} ('{}') type_len sanity check failed: {}",
625                static_idx,
626                column_name,
627                type_len_u64
628            );
629            return Err(nom::Err::Error(nom::error::Error::new(
630                input,
631                nom::error::ErrorKind::Verify,
632            )));
633        }
634        if type_len_u64 > 1000 {
635            log::warn!(
636                "Unusually long static column type string: {} bytes (typical <1000)",
637                type_len_u64
638            );
639        }
640
641        // Static column type (UTF-8 string)
642        let (remaining, type_bytes) = nom::bytes::complete::take(type_len_u64 as usize)(remaining)?;
643        let internal_type = std::str::from_utf8(type_bytes)
644            .map_err(|_| {
645                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
646            })?
647            .to_string();
648
649        let cql_type = convert_marshal_type_to_cql(&internal_type);
650
651        log::debug!(
652            "Static column {}: name='{}', type='{}' (CQL: '{}')",
653            static_idx,
654            column_name,
655            internal_type,
656            cql_type
657        );
658
659        static_columns.push(super::header::ColumnInfo {
660            name: column_name,
661            column_type: cql_type,
662            is_primary_key: false,
663            key_position: None,
664            is_static: true, // Mark as static column!
665            is_clustering: false,
666        });
667
668        input = remaining;
669    }
670
671    log::debug!("Parsed {} static columns", static_columns.len());
672
673    // Step 6: Parse regular column count (single byte)
674    let (mut input, column_count) = parse_u8(input)?;
675    log::debug!("Regular column count: {}", column_count);
676
677    // Step 7: Parse each regular column
678    let mut columns = Vec::with_capacity(column_count as usize + static_columns.len());
679
680    for col_idx in 0..column_count {
681        // Column name length (single byte)
682        let (remaining, name_len) = parse_u8(input)?;
683        log::debug!("Column {} name length: {} bytes", col_idx, name_len);
684
685        // Column name (UTF-8 string)
686        let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
687        let column_name = std::str::from_utf8(name_bytes)
688            .map_err(|_| {
689                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
690            })?
691            .to_string();
692
693        // Column type length (VInt - can exceed 127 for collection types)
694        let (remaining, type_len_u64) = parse_vuint(remaining)?;
695        log::debug!(
696            "Column {} ('{}') type length: {} bytes",
697            col_idx,
698            column_name,
699            type_len_u64
700        );
701
702        // Validate type length (consistent with parse_regular_columns and static columns)
703        if type_len_u64 == 0 || type_len_u64 > 5000 {
704            log::debug!(
705                "Column {} ('{}') type_len validation failed: {}",
706                col_idx,
707                column_name,
708                type_len_u64
709            );
710            return Err(nom::Err::Error(nom::error::Error::new(
711                input,
712                nom::error::ErrorKind::Verify,
713            )));
714        }
715        if type_len_u64 > 1000 {
716            log::warn!(
717                "Unusually long column type string: {} bytes (typical <1000)",
718                type_len_u64
719            );
720        }
721
722        // Column type (UTF-8 string)
723        let (remaining, type_bytes) = nom::bytes::complete::take(type_len_u64 as usize)(remaining)?;
724        let internal_type = std::str::from_utf8(type_bytes)
725            .map_err(|_| {
726                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
727            })?
728            .to_string();
729
730        input = remaining;
731
732        // Convert to CQL type
733        let cql_type = convert_marshal_type_to_cql(&internal_type);
734
735        log::debug!(
736            "Column {}: name='{}', type='{}' (CQL: '{}')",
737            col_idx,
738            column_name,
739            internal_type,
740            cql_type
741        );
742
743        columns.push(super::header::ColumnInfo {
744            name: column_name,
745            column_type: cql_type,
746            is_primary_key: false,
747            key_position: None,
748            is_static: false,
749            is_clustering: false,
750        });
751    }
752
753    // Merge static columns (first) with regular columns
754    // Static columns come before regular columns in the combined list
755    let mut all_columns = static_columns;
756    all_columns.append(&mut columns);
757
758    log::debug!(
759        "Successfully parsed SerializationHeader: {} partition keys, {} clustering keys, {} static columns, {} regular columns ({} total)",
760        1, // Always 1 partition key in current implementation
761        clustering_key_types.len(),
762        all_columns.iter().filter(|c| c.is_static).count(),
763        all_columns.iter().filter(|c| !c.is_static).count(),
764        all_columns.len()
765    );
766
767    Ok((
768        input,
769        (vec![partition_key_type], clustering_key_types, all_columns),
770    ))
771}
772
773/// Extract partition key type by backtracking from the `0x00 0x00` marker
774///
775/// The partition key type descriptor ends immediately before the marker.
776/// We try parsing VInt lengths at different offsets before the marker to find
777/// a valid type string that matches Cassandra marshal type patterns.
778fn extract_partition_key_before_marker(input: &[u8], marker_offset: usize) -> Option<String> {
779    if marker_offset < 3 {
780        return None;
781    }
782
783    log::debug!(
784        "Backtracking from marker at offset {} (input len: {})",
785        marker_offset,
786        input.len()
787    );
788
789    // Try parsing VInt lengths at different positions before the marker
790    // Type strings can be up to 200 bytes, and VInts can be 1-9 bytes,
791    // so we need to search back at least 209 bytes (200 + 9)
792    let max_lookback = 210;
793    let search_start = marker_offset.saturating_sub(max_lookback);
794    log::debug!(
795        "Searching for VInt from offset {} to {} ({} positions)",
796        search_start,
797        marker_offset,
798        marker_offset - search_start
799    );
800
801    for vint_start in (search_start..marker_offset).rev() {
802        // Try to parse VInt at this position
803        match parse_vuint(&input[vint_start..marker_offset]) {
804            Ok((remaining, type_len)) => {
805                // Validate type length is reasonable first (before any arithmetic)
806                if !(10..200).contains(&type_len) {
807                    continue;
808                }
809
810                // Calculate how many bytes the VInt consumed
811                let vint_len = marker_offset - vint_start - remaining.len();
812                let type_start = vint_start + vint_len;
813
814                // Bounds check before addition to prevent overflow
815                let type_len_usize = type_len as usize;
816                if type_start > input.len() || type_len_usize > input.len() - type_start {
817                    continue;
818                }
819
820                let type_end = type_start + type_len_usize;
821
822                // Validate:
823                // 1. The type string ends exactly at the marker
824                // 2. The type string is valid UTF-8
825                // 3. It matches Cassandra marshal type patterns
826                if type_end == marker_offset {
827                    if let Ok(type_str) = std::str::from_utf8(&input[type_start..type_end]) {
828                        log::debug!(
829                            "Candidate at vint_start={}: type_len={}, type_start={}, type_end={}, str={}",
830                            vint_start, type_len, type_start, type_end, type_str
831                        );
832                        // Validate it's a Cassandra marshal type
833                        // Note: Partition key types may or may not start with '('
834                        // Both "(org.apache.cassandra..." and "org.apache.cassandra..." are valid
835                        if type_str.contains("org.apache.cassandra") {
836                            log::debug!(
837                                "Found partition key type at offset {}: length={}, type={}",
838                                vint_start,
839                                type_len,
840                                type_str
841                            );
842                            return Some(type_str.to_string());
843                        } else {
844                            log::debug!(
845                                "Rejected candidate (starts_with='(': {}, contains 'org.apache.cassandra': {})",
846                                type_str.starts_with('('),
847                                type_str.contains("org.apache.cassandra")
848                            );
849                        }
850                    } else {
851                        log::debug!(
852                            "Rejected candidate at vint_start={}: not valid UTF-8",
853                            vint_start
854                        );
855                    }
856                }
857            }
858            Err(_) => continue, // Try next offset
859        }
860    }
861
862    None
863}
864
865/// Parse regular columns section from SerializationHeader
866///
867/// Returns: (partition_key_types, regular_columns)
868/// Partition key types are extracted via backtracking when found before the column section marker.
869fn parse_regular_columns(
870    input: &[u8],
871) -> IResult<&[u8], (Vec<String>, Vec<super::header::ColumnInfo>)> {
872    use super::header::ColumnInfo;
873
874    let mut search_offset = 0;
875    let mut partition_key_types = Vec::new();
876
877    while search_offset + 2 < input.len() && search_offset < 8192 {
878        if input[search_offset] == 0x00 {
879            let (marker_offset, count_offset) =
880                if search_offset + 1 < input.len() && input[search_offset + 1] == 0x00 {
881                    (search_offset, search_offset + 2)
882                } else {
883                    (search_offset, search_offset + 1)
884                };
885
886            if count_offset >= input.len() {
887                break;
888            }
889
890            let column_count = input[count_offset] as usize;
891            if column_count == 0 || column_count > 50 {
892                search_offset += 1;
893                continue;
894            }
895
896            log::debug!(
897                "Attempting to extract partition key by backtracking from marker at offset {}",
898                marker_offset
899            );
900            if let Some(pk_type) = extract_partition_key_before_marker(input, marker_offset) {
901                log::debug!("Found partition key type before marker: {}", pk_type);
902                partition_key_types.push(pk_type);
903            } else {
904                log::debug!(
905                    "No partition key type found via backtracking at offset {}",
906                    marker_offset
907                );
908            }
909
910            let mut pos = count_offset + 1;
911
912            let context_len = std::cmp::min(128, input.len() - marker_offset);
913            let context_hex: String = input[marker_offset..marker_offset + context_len]
914                .iter()
915                .map(|b| format!("{:02x}", b))
916                .collect::<Vec<_>>()
917                .join(" ");
918            log::debug!(
919                "Pattern found at offset {}: count={}, next 128 bytes: {}",
920                marker_offset,
921                column_count,
922                context_hex
923            );
924
925            // Try to parse all columns - if successful, we found the right section
926            let mut parsed_columns = Vec::with_capacity(column_count);
927            let mut parse_success = true;
928
929            for col_idx in 0..column_count {
930                if pos >= input.len() {
931                    log::debug!(
932                        "Column {} parsing failed at offset {}: position {} exceeds buffer length {}",
933                        col_idx,
934                        marker_offset,
935                        pos,
936                        input.len()
937                    );
938                    parse_success = false;
939                    break;
940                }
941
942                if pos >= input.len() {
943                    log::debug!(
944                        "Column {} parsing failed at offset {}: no data available for name length byte (pos={}, len={})",
945                        col_idx,
946                        marker_offset,
947                        pos,
948                        input.len()
949                    );
950                    parse_success = false;
951                    break;
952                }
953
954                let name_len = input[pos] as usize;
955                pos += 1;
956
957                if name_len == 0 || name_len > 200 || pos + name_len > input.len() {
958                    log::debug!(
959                        "Column {} parsing failed at offset {}: name_len sanity check failed (name_len={}, pos={}, buffer_len={})",
960                        col_idx,
961                        marker_offset,
962                        name_len,
963                        pos,
964                        input.len()
965                    );
966                    parse_success = false;
967                    break;
968                }
969
970                // Column name (UTF-8 string)
971                let name_bytes = &input[pos..pos + name_len];
972                let column_name = match std::str::from_utf8(name_bytes) {
973                    Ok(s) => s.to_string(),
974                    Err(e) => {
975                        let name_hex: String = name_bytes
976                            .iter()
977                            .map(|b| format!("{:02x}", b))
978                            .collect::<Vec<_>>()
979                            .join(" ");
980                        log::debug!(
981                            "Column {} parsing failed at offset {}: UTF-8 decode error for column name at pos {} (len={}): {:?}, bytes: {}",
982                            col_idx,
983                            marker_offset,
984                            pos,
985                            name_len,
986                            e,
987                            name_hex
988                        );
989                        parse_success = false;
990                        break;
991                    }
992                };
993                pos += name_len;
994
995                if pos >= input.len() {
996                    log::debug!(
997                        "Column {} ('{}') parsing failed at offset {}: no data available for type length byte (pos={}, len={})",
998                        col_idx,
999                        column_name,
1000                        marker_offset,
1001                        pos,
1002                        input.len()
1003                    );
1004                    parse_success = false;
1005                    break;
1006                }
1007
1008                // Parse type length as VInt (can exceed 127 for collection types)
1009                let type_len_result = parse_vuint(&input[pos..]);
1010                let (type_remaining, type_len_u64) = match type_len_result {
1011                    Ok(r) => r,
1012                    Err(_) => {
1013                        log::debug!(
1014                            "Column {} ('{}') parsing failed at offset {}: VInt parse error at pos {}",
1015                            col_idx,
1016                            column_name,
1017                            marker_offset,
1018                            pos
1019                        );
1020                        parse_success = false;
1021                        break;
1022                    }
1023                };
1024                let type_len = type_len_u64 as usize;
1025                pos = input.len() - type_remaining.len();
1026
1027                if type_len == 0 || type_len > 5000 || pos + type_len > input.len() {
1028                    log::debug!(
1029                        "Column {} ('{}') parsing failed at offset {}: type_len sanity check failed (type_len={}, pos={}, buffer_len={})",
1030                        col_idx,
1031                        column_name,
1032                        marker_offset,
1033                        type_len,
1034                        pos,
1035                        input.len()
1036                    );
1037                    parse_success = false;
1038                    break;
1039                }
1040
1041                // Column type (Cassandra internal type name)
1042                let type_bytes = &input[pos..pos + type_len];
1043                let internal_type = match std::str::from_utf8(type_bytes) {
1044                    Ok(s) => s.to_string(),
1045                    Err(e) => {
1046                        let type_hex: String = type_bytes
1047                            .iter()
1048                            .map(|b| format!("{:02x}", b))
1049                            .collect::<Vec<_>>()
1050                            .join(" ");
1051                        log::debug!(
1052                            "Column {} ('{}') parsing failed at offset {}: UTF-8 decode error for column type at pos {} (len={}): {:?}, bytes: {}",
1053                            col_idx,
1054                            column_name,
1055                            marker_offset,
1056                            pos,
1057                            type_len,
1058                            e,
1059                            type_hex
1060                        );
1061                        parse_success = false;
1062                        break;
1063                    }
1064                };
1065                pos += type_len;
1066
1067                // Convert Cassandra marshal type to CQL type
1068                let cql_type = convert_marshal_type_to_cql(&internal_type);
1069
1070                parsed_columns.push(ColumnInfo {
1071                    name: column_name,
1072                    column_type: cql_type,
1073                    is_primary_key: false, // Will be determined from partition/clustering info
1074                    key_position: None,
1075                    is_static: false,
1076                    is_clustering: false,
1077                });
1078            }
1079
1080            if parse_success && parsed_columns.len() == column_count {
1081                // Successfully parsed all columns
1082                let column_names: Vec<&str> =
1083                    parsed_columns.iter().map(|c| c.name.as_str()).collect();
1084                log::debug!(
1085                    "Successfully parsed {} columns at offset {}: {:?}",
1086                    parsed_columns.len(),
1087                    marker_offset,
1088                    column_names
1089                );
1090                if !partition_key_types.is_empty() {
1091                    log::debug!(
1092                        "Extracted {} partition key types via backtracking: {:?}",
1093                        partition_key_types.len(),
1094                        partition_key_types
1095                    );
1096                }
1097
1098                let remaining = &input[pos..];
1099                return Ok((remaining, (partition_key_types, parsed_columns)));
1100            }
1101        }
1102
1103        search_offset += 1;
1104    }
1105
1106    // Column section not found - return empty vecs (not an error, some files may have no regular columns)
1107    log::debug!(
1108        "Regular column section not found: searched {} bytes",
1109        search_offset
1110    );
1111    Ok((input, (Vec::new(), Vec::new())))
1112}
1113
1114/// ASCII fallback parser for SerializationHeader when structured parsing fails
1115fn fallback_parse_serialization_header_ascii(
1116    input: &[u8],
1117) -> Option<(Vec<String>, Vec<String>, Vec<super::header::ColumnInfo>)> {
1118    use super::header::ColumnInfo;
1119
1120    // Helper to find subsequence
1121    fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
1122        haystack
1123            .windows(needle.len())
1124            .position(|window| window == needle)
1125    }
1126
1127    let mut partition_types = Vec::new();
1128    let mut clustering_types = Vec::new();
1129    let mut columns = Vec::new();
1130
1131    // Extract partition key types from CompositeType(...)
1132    if let Some(comp_idx) = find_subsequence(input, b"CompositeType(") {
1133        let start = comp_idx + "CompositeType(".len();
1134        let mut end = start;
1135        while end < input.len() && input[end] != b')' {
1136            end += 1;
1137        }
1138        if end <= input.len() {
1139            if let Ok(inner) = std::str::from_utf8(&input[start..end]) {
1140                partition_types = inner
1141                    .split(',')
1142                    .map(|s| s.trim().to_string())
1143                    .filter(|s| !s.is_empty())
1144                    .collect();
1145            }
1146
1147            // Attempt to extract clustering types immediately after the composite type string
1148            let mut cursor = end + 1;
1149            while cursor < input.len() && input[cursor] < 0x20 {
1150                cursor += 1;
1151            }
1152            if cursor < input.len() && input[cursor] == b'(' {
1153                cursor += 1;
1154                let mut cluster_end = cursor;
1155                while cluster_end < input.len() && input[cluster_end] >= 0x20 {
1156                    cluster_end += 1;
1157                }
1158                if cluster_end > cursor {
1159                    if let Ok(cluster_str) = std::str::from_utf8(&input[cursor..cluster_end]) {
1160                        if cluster_str.contains("org.apache.cassandra.db.marshal") {
1161                            clustering_types = cluster_str
1162                                .split(',')
1163                                .map(|s| s.trim().to_string())
1164                                .filter(|s| !s.is_empty())
1165                                .collect();
1166                        }
1167                    }
1168                }
1169                // Set scan position for column parsing after clustering types/control bytes
1170                let mut scan_start = cluster_end;
1171                while scan_start < input.len() && input[scan_start] < 0x20 {
1172                    scan_start += 1;
1173                }
1174
1175                // Parse regular columns using [len][name][type] pattern with control-byte delimiters
1176                let mut idx = scan_start;
1177                while idx < input.len() {
1178                    let name_len = input[idx] as usize;
1179                    if name_len == 0 || name_len > 64 {
1180                        idx += 1;
1181                        continue;
1182                    }
1183
1184                    let name_start = idx + 1;
1185                    let name_end = name_start + name_len;
1186                    if name_end > input.len() {
1187                        break;
1188                    }
1189
1190                    let name_bytes = &input[name_start..name_end];
1191                    if !name_bytes
1192                        .iter()
1193                        .all(|b| b.is_ascii_alphanumeric() || *b == b'_')
1194                    {
1195                        idx += 1;
1196                        continue;
1197                    }
1198
1199                    if name_end >= input.len() || input[name_end] != b'(' {
1200                        idx += 1;
1201                        continue;
1202                    }
1203
1204                    let type_start = name_end + 1;
1205                    let mut type_end = type_start;
1206                    while type_end < input.len() && input[type_end] >= 0x20 {
1207                        type_end += 1;
1208                    }
1209
1210                    if type_end == type_start {
1211                        idx += 1;
1212                        continue;
1213                    }
1214
1215                    let type_bytes = &input[type_start..type_end];
1216                    if !type_bytes.windows(10).any(|w| w == b"org.apach") {
1217                        idx += 1;
1218                        continue;
1219                    }
1220
1221                    let column_name = match std::str::from_utf8(name_bytes) {
1222                        Ok(s) => s.to_string(),
1223                        Err(_) => {
1224                            idx += 1;
1225                            continue;
1226                        }
1227                    };
1228
1229                    let internal_type = match std::str::from_utf8(type_bytes) {
1230                        Ok(s) => s.trim().to_string(),
1231                        Err(_) => {
1232                            idx += 1;
1233                            continue;
1234                        }
1235                    };
1236
1237                    let cql_type = convert_marshal_type_to_cql(&internal_type);
1238                    columns.push(ColumnInfo {
1239                        name: column_name,
1240                        column_type: cql_type,
1241                        is_primary_key: false,
1242                        key_position: None,
1243                        is_static: false,
1244                        is_clustering: false,
1245                    });
1246
1247                    // Advance past control bytes to next potential column entry
1248                    idx = type_end;
1249                    while idx < input.len() && input[idx] < 0x20 {
1250                        idx += 1;
1251                    }
1252                }
1253            }
1254        }
1255    }
1256
1257    if partition_types.is_empty() && columns.is_empty() {
1258        return None;
1259    }
1260
1261    Some((partition_types, clustering_types, columns))
1262}
1263
1264/// Extract inner type from parameterized type string with proper parenthesis matching
1265///
1266/// Given a string that starts AFTER the opening parenthesis of a wrapper type,
1267/// returns the content up to (but not including) the matching closing parenthesis.
1268///
1269/// Example: For input "ListType(Int32Type))" (after stripping "FrozenType("),
1270/// returns Some("ListType(Int32Type)") - the content before the MATCHING close paren.
1271fn extract_inner_type(type_with_close_paren: &str) -> Option<&str> {
1272    let mut depth = 1; // We're already inside one opening paren (the wrapper type)
1273    for (idx, ch) in type_with_close_paren.char_indices() {
1274        match ch {
1275            '(' => depth += 1,
1276            ')' => {
1277                depth -= 1;
1278                if depth == 0 {
1279                    // Return None if extracted string is empty (malformed input like ")")
1280                    if idx == 0 {
1281                        return None;
1282                    }
1283                    return Some(&type_with_close_paren[..idx]);
1284                }
1285            }
1286            _ => {}
1287        }
1288    }
1289    None // Unmatched parentheses
1290}
1291
1292/// Split a type argument list on top-level commas, ignoring nested parentheses
1293fn split_type_arguments(input: &str) -> Vec<&str> {
1294    let mut args = Vec::new();
1295    let mut depth = 0;
1296    let mut start = 0;
1297    for (idx, ch) in input.char_indices() {
1298        match ch {
1299            '(' => depth += 1,
1300            ')' => {
1301                if depth > 0 {
1302                    depth -= 1;
1303                } else {
1304                    log::warn!(
1305                        "Unmatched closing parenthesis at position {} in type arguments: '{}'",
1306                        idx,
1307                        input
1308                    );
1309                }
1310            }
1311            ',' if depth == 0 => {
1312                let part = input[start..idx].trim();
1313                if !part.is_empty() {
1314                    args.push(part);
1315                }
1316                start = idx + ch.len_utf8();
1317            }
1318            _ => {}
1319        }
1320    }
1321
1322    let tail = input[start..].trim();
1323    if !tail.is_empty() {
1324        args.push(tail);
1325    }
1326
1327    args
1328}
1329
1330/// Convert Cassandra internal marshal type to CQL type name
1331fn convert_marshal_type_to_cql(marshal_type: &str) -> String {
1332    fn strip_wrapping_parens(mut value: &str) -> &str {
1333        loop {
1334            let trimmed = value.trim();
1335            if trimmed.starts_with('(') && trimmed.ends_with(')') && trimmed.len() > 2 {
1336                value = &trimmed[1..trimmed.len() - 1];
1337            } else {
1338                return trimmed;
1339            }
1340        }
1341    }
1342
1343    fn strip_namespace(type_name: &str) -> &str {
1344        type_name.rsplit('.').next().unwrap_or(type_name)
1345    }
1346
1347    fn strip_type_suffix(name: &str) -> &str {
1348        name.trim_end_matches("Type")
1349    }
1350
1351    let mut cleaned = strip_wrapping_parens(marshal_type);
1352
1353    // Special case: Preserve UserType definitions unchanged
1354    // UserType contains critical metadata (keyspace, type name, field definitions) that must
1355    // reach the parser intact. Converting it to a simplified CQL type would lose this information.
1356    if cleaned.contains("org.apache.cassandra.db.marshal.UserType(") {
1357        return marshal_type.to_string();
1358    }
1359
1360    // Normalize known wrappers by recursively converting inner types
1361    // Use extract_inner_type() for proper parenthesis matching (fixes nested types)
1362    for prefix in [
1363        "org.apache.cassandra.db.marshal.ReversedType(",
1364        "ReversedType(",
1365    ] {
1366        if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1367            if let Some(inner) = extract_inner_type(params_with_close) {
1368                return convert_marshal_type_to_cql(inner);
1369            }
1370        }
1371    }
1372
1373    for prefix in ["org.apache.cassandra.db.marshal.FrozenType(", "FrozenType("] {
1374        if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1375            if let Some(inner) = extract_inner_type(params_with_close) {
1376                return format!("frozen<{}>", convert_marshal_type_to_cql(inner));
1377            }
1378        }
1379    }
1380
1381    for prefix in ["org.apache.cassandra.db.marshal.ListType(", "ListType("] {
1382        if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1383            if let Some(inner) = extract_inner_type(params_with_close) {
1384                return format!("list<{}>", convert_marshal_type_to_cql(inner));
1385            }
1386        }
1387    }
1388
1389    for prefix in ["org.apache.cassandra.db.marshal.SetType(", "SetType("] {
1390        if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1391            if let Some(inner) = extract_inner_type(params_with_close) {
1392                return format!("set<{}>", convert_marshal_type_to_cql(inner));
1393            }
1394        }
1395    }
1396
1397    for prefix in ["org.apache.cassandra.db.marshal.MapType(", "MapType("] {
1398        if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
1399            if let Some(inner) = extract_inner_type(params_with_close) {
1400                let args = split_type_arguments(inner);
1401                if args.len() == 2 {
1402                    let key = convert_marshal_type_to_cql(args[0]);
1403                    let value = convert_marshal_type_to_cql(args[1]);
1404                    return format!("map<{}, {}>", key, value);
1405                } else if args.len() == 1 {
1406                    let value = convert_marshal_type_to_cql(args[0]);
1407                    return format!("map<text, {}>", value);
1408                }
1409            }
1410        }
1411    }
1412
1413    cleaned = strip_wrapping_parens(cleaned);
1414    let base = strip_type_suffix(strip_namespace(cleaned)).trim_end_matches(')');
1415
1416    // Map common types to CQL equivalents
1417    match base {
1418        "UTF8" => "text".to_string(),
1419        "Int32" => "int".to_string(),
1420        "Integer" => "int".to_string(),
1421        "Long" => "bigint".to_string(),
1422        "Short" => "smallint".to_string(),
1423        "Byte" => "tinyint".to_string(),
1424        "SimpleDate" => "date".to_string(),
1425        "Timestamp" => "timestamp".to_string(),
1426        "Boolean" => "boolean".to_string(),
1427        "Decimal" => "decimal".to_string(),
1428        "Float" => "float".to_string(),
1429        "Double" => "double".to_string(),
1430        "Bytes" => "blob".to_string(),
1431        "Ascii" => "ascii".to_string(),
1432        "InetAddress" => "inet".to_string(),
1433        "UUID" => "uuid".to_string(),
1434        "TimeUUID" => "timeuuid".to_string(),
1435        "Duration" => "duration".to_string(),
1436        "Time" => "time".to_string(),
1437        "Counter" | "CounterColumn" => "counter".to_string(),
1438        other => other.to_lowercase(),
1439    }
1440}
1441
1442/// Construct ColumnInfo entries for partition key definitions found in SerializationHeader
1443fn build_partition_key_columns(partition_types: &[String]) -> Vec<super::header::ColumnInfo> {
1444    if partition_types.is_empty() {
1445        return Vec::new();
1446    }
1447
1448    let total = partition_types.len();
1449    partition_types
1450        .iter()
1451        .enumerate()
1452        .map(|(idx, marshal_type)| {
1453            let cql_type = convert_marshal_type_to_cql(marshal_type);
1454            let name = if total == 1 {
1455                match cql_type.as_str() {
1456                    "uuid" | "timeuuid" => "id".to_string(),
1457                    _ => "partition_key".to_string(),
1458                }
1459            } else {
1460                format!("partition_key_{}", idx)
1461            };
1462
1463            super::header::ColumnInfo {
1464                name,
1465                column_type: cql_type,
1466                is_primary_key: true,
1467                key_position: Some(idx as u16),
1468                is_static: false,
1469                is_clustering: false,
1470            }
1471        })
1472        .collect()
1473}
1474
1475/// Construct ColumnInfo entries for clustering key definitions found in SerializationHeader
1476fn build_clustering_key_columns(clustering_types: &[String]) -> Vec<super::header::ColumnInfo> {
1477    if clustering_types.is_empty() {
1478        return Vec::new();
1479    }
1480
1481    let total = clustering_types.len();
1482    clustering_types
1483        .iter()
1484        .enumerate()
1485        .map(|(idx, marshal_type)| {
1486            let cql_type = convert_marshal_type_to_cql(marshal_type);
1487            let name = if total == 1 {
1488                "clustering_key".to_string()
1489            } else {
1490                format!("clustering_key_{}", idx)
1491            };
1492
1493            super::header::ColumnInfo {
1494                name,
1495                column_type: cql_type,
1496                is_primary_key: true,
1497                key_position: Some(idx as u16),
1498                is_static: false,
1499                is_clustering: true,
1500            }
1501        })
1502        .collect()
1503}
1504
1505/// Parse SerializationHeader using sequential VInt parsing (Issue #216)
1506///
1507/// This function assumes the input starts EXACTLY at the SerializationHeader
1508/// (immediately after EncodingStats). It does NOT search for markers.
1509///
1510/// Format (from SerializationHeader.java):
1511/// [VInt pk_type_len] [pk_type_string]
1512/// [VInt ck_count] [for each: VInt ck_type_len, ck_type_string]
1513/// [VInt static_count] [for each: VInt name_len, name, VInt type_len, type]
1514/// [VInt regular_count] [for each: VInt name_len, name, VInt type_len, type]
1515fn parse_serialization_header_sequential(
1516    input: &[u8],
1517) -> IResult<&[u8], SerializationHeaderResult> {
1518    // Step 1: Parse partition key type (VInt length + string)
1519    let (input, pk_type_len) = parse_vuint(input)?;
1520
1521    // Validate partition key type length
1522    if pk_type_len == 0 || pk_type_len > 5000 {
1523        log::debug!(
1524            "Invalid partition key type length: {} (expected 1-2000)",
1525            pk_type_len
1526        );
1527        return Err(nom::Err::Error(nom::error::Error::new(
1528            input,
1529            nom::error::ErrorKind::Verify,
1530        )));
1531    }
1532
1533    let (input, pk_type_bytes) = nom::bytes::complete::take(pk_type_len as usize)(input)?;
1534    let partition_key_type = std::str::from_utf8(pk_type_bytes)
1535        .map_err(|_| nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify)))?
1536        .to_string();
1537
1538    log::debug!(
1539        "Sequential parser: partition key type (len={}): {}",
1540        pk_type_len,
1541        partition_key_type
1542    );
1543
1544    // Step 2: Parse clustering key count and types
1545    let (input, clustering_count) = parse_vuint(input)?;
1546    let clustering_count = clustering_count as usize;
1547
1548    if clustering_count > 100 {
1549        log::debug!(
1550            "Invalid clustering key count: {} (expected 0-100)",
1551            clustering_count
1552        );
1553        return Err(nom::Err::Error(nom::error::Error::new(
1554            input,
1555            nom::error::ErrorKind::Verify,
1556        )));
1557    }
1558
1559    log::debug!(
1560        "Sequential parser: clustering key count: {}",
1561        clustering_count
1562    );
1563
1564    let mut clustering_key_types = Vec::with_capacity(clustering_count);
1565    let mut input = input;
1566
1567    for idx in 0..clustering_count {
1568        let (remaining, type_len) = parse_vuint(input)?;
1569
1570        if type_len == 0 || type_len > 5000 {
1571            log::debug!("Invalid clustering key {} type length: {}", idx, type_len);
1572            return Err(nom::Err::Error(nom::error::Error::new(
1573                input,
1574                nom::error::ErrorKind::Verify,
1575            )));
1576        }
1577
1578        let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
1579        let clustering_type = std::str::from_utf8(type_bytes)
1580            .map_err(|_| {
1581                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1582            })?
1583            .to_string();
1584
1585        log::debug!(
1586            "Sequential parser: clustering key {} type (len={}): {}",
1587            idx,
1588            type_len,
1589            clustering_type
1590        );
1591
1592        clustering_key_types.push(clustering_type);
1593        input = remaining;
1594    }
1595
1596    // Step 3: Parse static columns
1597    let (input, static_count) = parse_vuint(input)?;
1598    let static_count = static_count as usize;
1599
1600    if static_count > 200 {
1601        log::debug!(
1602            "Invalid static column count: {} (expected 0-200)",
1603            static_count
1604        );
1605        return Err(nom::Err::Error(nom::error::Error::new(
1606            input,
1607            nom::error::ErrorKind::Verify,
1608        )));
1609    }
1610
1611    log::debug!("Sequential parser: static column count: {}", static_count);
1612
1613    let mut static_columns = Vec::with_capacity(static_count);
1614    let mut input = input;
1615
1616    for idx in 0..static_count {
1617        // Column name (VInt length + UTF-8)
1618        let (remaining, name_len) = parse_vuint(input)?;
1619
1620        if name_len == 0 || name_len > 200 {
1621            log::debug!("Invalid static column {} name length: {}", idx, name_len);
1622            return Err(nom::Err::Error(nom::error::Error::new(
1623                input,
1624                nom::error::ErrorKind::Verify,
1625            )));
1626        }
1627
1628        let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
1629        let column_name = std::str::from_utf8(name_bytes)
1630            .map_err(|_| {
1631                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1632            })?
1633            .to_string();
1634
1635        // Column type (VInt length + UTF-8)
1636        let (remaining, type_len) = parse_vuint(remaining)?;
1637
1638        if type_len == 0 || type_len > 5000 {
1639            log::debug!(
1640                "Invalid static column '{}' type length: {}",
1641                column_name,
1642                type_len
1643            );
1644            return Err(nom::Err::Error(nom::error::Error::new(
1645                input,
1646                nom::error::ErrorKind::Verify,
1647            )));
1648        }
1649
1650        let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
1651        let internal_type = std::str::from_utf8(type_bytes)
1652            .map_err(|_| {
1653                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1654            })?
1655            .to_string();
1656
1657        let cql_type = convert_marshal_type_to_cql(&internal_type);
1658
1659        log::debug!(
1660            "Sequential parser: static column {}: name='{}', type='{}'",
1661            idx,
1662            column_name,
1663            cql_type
1664        );
1665
1666        static_columns.push(super::header::ColumnInfo {
1667            name: column_name,
1668            column_type: cql_type,
1669            is_primary_key: false,
1670            key_position: None,
1671            is_static: true,
1672            is_clustering: false,
1673        });
1674
1675        input = remaining;
1676    }
1677
1678    // Step 4: Parse regular columns
1679    let (input, regular_count) = parse_vuint(input)?;
1680    let regular_count = regular_count as usize;
1681
1682    if regular_count > 500 {
1683        log::debug!(
1684            "Invalid regular column count: {} (expected 0-500)",
1685            regular_count
1686        );
1687        return Err(nom::Err::Error(nom::error::Error::new(
1688            input,
1689            nom::error::ErrorKind::Verify,
1690        )));
1691    }
1692
1693    log::debug!("Sequential parser: regular column count: {}", regular_count);
1694
1695    let mut regular_columns = Vec::with_capacity(regular_count);
1696    let mut input = input;
1697
1698    for idx in 0..regular_count {
1699        // Column name (VInt length + UTF-8)
1700        let (remaining, name_len) = parse_vuint(input)?;
1701
1702        if name_len == 0 || name_len > 200 {
1703            log::debug!("Invalid regular column {} name length: {}", idx, name_len);
1704            return Err(nom::Err::Error(nom::error::Error::new(
1705                input,
1706                nom::error::ErrorKind::Verify,
1707            )));
1708        }
1709
1710        let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
1711        let column_name = std::str::from_utf8(name_bytes)
1712            .map_err(|_| {
1713                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1714            })?
1715            .to_string();
1716
1717        // Column type (VInt length + UTF-8)
1718        let (remaining, type_len) = parse_vuint(remaining)?;
1719
1720        if type_len == 0 || type_len > 5000 {
1721            log::debug!(
1722                "Invalid regular column '{}' type length: {}",
1723                column_name,
1724                type_len
1725            );
1726            return Err(nom::Err::Error(nom::error::Error::new(
1727                input,
1728                nom::error::ErrorKind::Verify,
1729            )));
1730        }
1731
1732        let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
1733        let internal_type = std::str::from_utf8(type_bytes)
1734            .map_err(|_| {
1735                nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
1736            })?
1737            .to_string();
1738
1739        let cql_type = convert_marshal_type_to_cql(&internal_type);
1740
1741        log::debug!(
1742            "Sequential parser: regular column {}: name='{}', type='{}'",
1743            idx,
1744            column_name,
1745            cql_type
1746        );
1747
1748        regular_columns.push(super::header::ColumnInfo {
1749            name: column_name,
1750            column_type: cql_type,
1751            is_primary_key: false,
1752            key_position: None,
1753            is_static: false,
1754            is_clustering: false,
1755        });
1756
1757        input = remaining;
1758    }
1759
1760    // Combine static and regular columns (static columns first)
1761    let mut all_columns = static_columns;
1762    all_columns.extend(regular_columns);
1763
1764    log::debug!(
1765        "Sequential parser complete: partition_key='{}', {} clustering keys, {} total columns",
1766        partition_key_type,
1767        clustering_key_types.len(),
1768        all_columns.len()
1769    );
1770
1771    Ok((
1772        input,
1773        (vec![partition_key_type], clustering_key_types, all_columns),
1774    ))
1775}
1776
1777/// Parse the schema portion of a SerializationHeader (after EncodingStats have been consumed).
1778///
1779/// Format:
1780/// 1. keyType (VInt length + UTF-8 type string)
1781/// 2. clusteringTypes (VInt count + [VInt type_len + type]*)
1782/// 3. staticColumns (VInt count + [VInt name_len + name + VInt type_len + type]*)
1783/// 4. regularColumns (VInt count + [VInt name_len + name + VInt type_len + type]*)
1784fn parse_serialization_header_schema(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
1785    // Parse keyType (partition key type)
1786    let (input, pk_type_len) = parse_vuint(input)?;
1787    if pk_type_len == 0 || pk_type_len > 5000 {
1788        log::debug!("Invalid pk_type_len: {}", pk_type_len);
1789        return Err(nom::Err::Error(nom::error::Error::new(
1790            input,
1791            nom::error::ErrorKind::Verify,
1792        )));
1793    }
1794    if pk_type_len > 1000 {
1795        log::warn!(
1796            "Unusually long partition key type string: {} bytes (typical <1000)",
1797            pk_type_len
1798        );
1799    }
1800
1801    let (input, pk_type_bytes) = take(pk_type_len as usize)(input)?;
1802    let partition_key_type = match std::str::from_utf8(pk_type_bytes) {
1803        Ok(s) => convert_marshal_type_to_cql(s),
1804        Err(_) => {
1805            log::debug!("Invalid UTF-8 in partition key type");
1806            return Err(nom::Err::Error(nom::error::Error::new(
1807                input,
1808                nom::error::ErrorKind::Verify,
1809            )));
1810        }
1811    };
1812
1813    log::debug!(
1814        "HEADER: Partition key type: {} ({} bytes)",
1815        partition_key_type,
1816        pk_type_len
1817    );
1818
1819    // Step 3: Parse clusteringTypes
1820    let (input, clustering_count) = parse_vuint(input)?;
1821    // Sanity check: Cassandra tables rarely have >100 clustering keys
1822    if clustering_count > 1000 {
1823        log::warn!(
1824            "Suspicious clustering_count={} in SerializationHeader (expected <100)",
1825            clustering_count
1826        );
1827        return Err(nom::Err::Error(nom::error::Error::new(
1828            input,
1829            nom::error::ErrorKind::Verify,
1830        )));
1831    }
1832    log::debug!("HEADER: {} clustering key types", clustering_count);
1833
1834    let mut input = input;
1835    let mut clustering_key_types = Vec::with_capacity(clustering_count as usize);
1836
1837    for i in 0..clustering_count {
1838        let (remaining, ck_type_len) = parse_vuint(input)?;
1839        if ck_type_len == 0 || ck_type_len > 5000 {
1840            log::debug!("Invalid clustering key type length: {}", ck_type_len);
1841            return Err(nom::Err::Error(nom::error::Error::new(
1842                input,
1843                nom::error::ErrorKind::Verify,
1844            )));
1845        }
1846        if ck_type_len > 1000 {
1847            log::warn!(
1848                "Unusually long clustering key type string: {} bytes (typical <1000)",
1849                ck_type_len
1850            );
1851        }
1852
1853        let (remaining, ck_type_bytes) = take(ck_type_len as usize)(remaining)?;
1854        let ck_type = match std::str::from_utf8(ck_type_bytes) {
1855            Ok(s) => convert_marshal_type_to_cql(s),
1856            Err(_) => {
1857                log::debug!("Invalid UTF-8 in clustering key type {}", i);
1858                return Err(nom::Err::Error(nom::error::Error::new(
1859                    input,
1860                    nom::error::ErrorKind::Verify,
1861                )));
1862            }
1863        };
1864
1865        log::debug!(
1866            "HEADER: Clustering key {}: {} ({} bytes)",
1867            i,
1868            ck_type,
1869            ck_type_len
1870        );
1871        clustering_key_types.push(ck_type);
1872        input = remaining;
1873    }
1874
1875    // Step 4: Parse staticColumns
1876    let (input, static_count) = parse_vuint(input)?;
1877    // Sanity check: Cassandra tables rarely have >1000 static columns
1878    if static_count > 10000 {
1879        log::warn!(
1880            "Suspicious static_count={} in SerializationHeader (expected <1000)",
1881            static_count
1882        );
1883        return Err(nom::Err::Error(nom::error::Error::new(
1884            input,
1885            nom::error::ErrorKind::Verify,
1886        )));
1887    }
1888    log::debug!("HEADER: {} static columns", static_count);
1889
1890    let mut input = input;
1891    let mut static_columns = Vec::with_capacity(static_count as usize);
1892
1893    for i in 0..static_count {
1894        // Column name
1895        let (remaining, name_len) = parse_vuint(input)?;
1896        if name_len == 0 || name_len > 200 {
1897            log::debug!("Invalid static column name length: {}", name_len);
1898            return Err(nom::Err::Error(nom::error::Error::new(
1899                input,
1900                nom::error::ErrorKind::Verify,
1901            )));
1902        }
1903
1904        let (remaining, name_bytes) = take(name_len as usize)(remaining)?;
1905        let column_name = match std::str::from_utf8(name_bytes) {
1906            Ok(s) => s.to_string(),
1907            Err(_) => {
1908                log::debug!("Invalid UTF-8 in static column name {}", i);
1909                return Err(nom::Err::Error(nom::error::Error::new(
1910                    input,
1911                    nom::error::ErrorKind::Verify,
1912                )));
1913            }
1914        };
1915
1916        // Column type
1917        let (remaining, type_len) = parse_vuint(remaining)?;
1918        if type_len == 0 || type_len > 5000 {
1919            log::debug!("Invalid static column type length: {}", type_len);
1920            return Err(nom::Err::Error(nom::error::Error::new(
1921                input,
1922                nom::error::ErrorKind::Verify,
1923            )));
1924        }
1925        if type_len > 1000 {
1926            log::warn!(
1927                "Unusually long static column type string: {} bytes (typical <1000)",
1928                type_len
1929            );
1930        }
1931
1932        let (remaining, type_bytes) = take(type_len as usize)(remaining)?;
1933        let cql_type = match std::str::from_utf8(type_bytes) {
1934            Ok(s) => convert_marshal_type_to_cql(s),
1935            Err(_) => {
1936                log::debug!("Invalid UTF-8 in static column type {}", i);
1937                return Err(nom::Err::Error(nom::error::Error::new(
1938                    input,
1939                    nom::error::ErrorKind::Verify,
1940                )));
1941            }
1942        };
1943
1944        log::debug!(
1945            "HEADER: Static column '{}': {} ({} bytes)",
1946            column_name,
1947            cql_type,
1948            type_len
1949        );
1950
1951        static_columns.push(super::header::ColumnInfo {
1952            name: column_name,
1953            column_type: cql_type,
1954            is_primary_key: false,
1955            key_position: None,
1956            is_static: true,
1957            is_clustering: false,
1958        });
1959
1960        input = remaining;
1961    }
1962
1963    // Step 5: Parse regularColumns
1964    let (input, regular_count) = parse_vuint(input)?;
1965    // Sanity check: Cassandra tables rarely have >1000 regular columns
1966    if regular_count > 10000 {
1967        log::warn!(
1968            "Suspicious regular_count={} in SerializationHeader (expected <1000)",
1969            regular_count
1970        );
1971        return Err(nom::Err::Error(nom::error::Error::new(
1972            input,
1973            nom::error::ErrorKind::Verify,
1974        )));
1975    }
1976    log::debug!("HEADER: {} regular columns", regular_count);
1977
1978    let mut input = input;
1979    let mut regular_columns = Vec::with_capacity(regular_count as usize);
1980
1981    for i in 0..regular_count {
1982        // Column name
1983        let (remaining, name_len) = parse_vuint(input)?;
1984        if name_len == 0 || name_len > 200 {
1985            log::debug!("Invalid regular column name length: {}", name_len);
1986            return Err(nom::Err::Error(nom::error::Error::new(
1987                input,
1988                nom::error::ErrorKind::Verify,
1989            )));
1990        }
1991
1992        let (remaining, name_bytes) = take(name_len as usize)(remaining)?;
1993        let column_name = match std::str::from_utf8(name_bytes) {
1994            Ok(s) => s.to_string(),
1995            Err(_) => {
1996                log::debug!("Invalid UTF-8 in regular column name {}", i);
1997                return Err(nom::Err::Error(nom::error::Error::new(
1998                    input,
1999                    nom::error::ErrorKind::Verify,
2000                )));
2001            }
2002        };
2003
2004        // Column type
2005        let (remaining, type_len) = parse_vuint(remaining)?;
2006        if type_len == 0 || type_len > 5000 {
2007            log::debug!("Invalid regular column type length: {}", type_len);
2008            return Err(nom::Err::Error(nom::error::Error::new(
2009                input,
2010                nom::error::ErrorKind::Verify,
2011            )));
2012        }
2013        if type_len > 1000 {
2014            log::warn!(
2015                "Unusually long regular column type string: {} bytes (typical <1000)",
2016                type_len
2017            );
2018        }
2019
2020        let (remaining, type_bytes) = take(type_len as usize)(remaining)?;
2021        let cql_type = match std::str::from_utf8(type_bytes) {
2022            Ok(s) => convert_marshal_type_to_cql(s),
2023            Err(_) => {
2024                log::debug!("Invalid UTF-8 in regular column type {}", i);
2025                return Err(nom::Err::Error(nom::error::Error::new(
2026                    input,
2027                    nom::error::ErrorKind::Verify,
2028                )));
2029            }
2030        };
2031
2032        log::debug!(
2033            "HEADER: Regular column '{}': {} ({} bytes)",
2034            column_name,
2035            cql_type,
2036            type_len
2037        );
2038
2039        regular_columns.push(super::header::ColumnInfo {
2040            name: column_name,
2041            column_type: cql_type,
2042            is_primary_key: false,
2043            key_position: None,
2044            is_static: false,
2045            is_clustering: false,
2046        });
2047
2048        input = remaining;
2049    }
2050
2051    // Combine static and regular columns
2052    let mut all_columns = static_columns;
2053    all_columns.extend(regular_columns);
2054
2055    log::debug!(
2056        "HEADER parsing complete: partition_key='{}', {} clustering keys, {} total columns",
2057        partition_key_type,
2058        clustering_key_types.len(),
2059        all_columns.len()
2060    );
2061
2062    Ok((
2063        input,
2064        (vec![partition_key_type], clustering_key_types, all_columns),
2065    ))
2066}
2067
2068/// Parse minimal EncodingStats section from nb-format Statistics.db
2069///
2070/// Returns: (min_timestamp, min_deletion_time, min_ttl, partition_keys, clustering_keys, columns)
2071///
2072/// # Arguments
2073/// * `input` - The data starting at the STATS component
2074/// * `full_input` - The complete Statistics.db content (needed for TOC-based HEADER lookup)
2075/// * `header_offset` - Optional offset to SerializationHeader from TOC (Issue #216)
2076/// * `gates` - Optional VersionGates for VG3 version-sensitive decoding decisions.
2077///   Pass `None` from standalone tools/tests to use nb-compatible defaults.
2078fn parse_minimal_encoding_stats<'a>(
2079    input: &'a [u8],
2080    full_input: &'a [u8],
2081    header_offset: Option<usize>,
2082    gates: Option<&VersionGates>,
2083) -> IResult<&'a [u8], EncodingStatsResult> {
2084    // The SERIALIZATION_HEADER component (type 3) starts with EncodingStats:
2085    //   [vuint minTimestamp_delta] [vuint minLocalDeletionTime_delta] [vuint minTTL_delta]
2086    // These are unsigned VInt deltas from epoch constants (see EncodingStats.Serializer).
2087    // Use the TOC-based offset to read from the correct location.
2088
2089    let Some(offset) = header_offset else {
2090        log::debug!("No HEADER TOC offset, using fallback EncodingStats parsing");
2091        return parse_encoding_stats_fallback(input, gates);
2092    };
2093
2094    if offset >= full_input.len() {
2095        log::warn!(
2096            "TOC offset 0x{:x} exceeds input length {}, using fallback",
2097            offset,
2098            full_input.len()
2099        );
2100        return parse_encoding_stats_fallback(input, gates);
2101    }
2102
2103    let header_data = &full_input[offset..];
2104    log::debug!(
2105        "Parsing EncodingStats + SerializationHeader at TOC offset 0x{:x} ({} bytes available)",
2106        offset,
2107        header_data.len()
2108    );
2109
2110    // Parse EncodingStats (3 unsigned VInts at start of SERIALIZATION_HEADER)
2111    let (rest, (min_timestamp, min_deletion_time, min_ttl)) =
2112        parse_encoding_stats_vuints(header_data, gates)?;
2113
2114    log::debug!(
2115        "EncodingStats from HEADER: min_timestamp={}, min_deletion_time={}, min_ttl={:?}",
2116        min_timestamp,
2117        min_deletion_time,
2118        min_ttl
2119    );
2120
2121    // Parse the rest of the SerializationHeader (schema info)
2122    let (partition_types, clustering_types, columns) = match parse_serialization_header_schema(rest)
2123    {
2124        Ok((_, result)) => result,
2125        Err(e) => {
2126            log::warn!(
2127                "Schema parsing after EncodingStats failed: {:?}, falling back to marker search",
2128                e
2129            );
2130            parse_serialization_header(input)?.1
2131        }
2132    };
2133
2134    let (partition_key_columns, clustering_key_columns) =
2135        build_column_infos(&partition_types, &clustering_types);
2136
2137    Ok((
2138        input,
2139        (
2140            min_timestamp,
2141            min_deletion_time,
2142            min_ttl,
2143            partition_key_columns,
2144            clustering_key_columns,
2145            columns,
2146        ),
2147    ))
2148}
2149
2150/// Parse 3 EncodingStats unsigned VInt deltas and convert to absolute values by adding epochs.
2151/// Returns (min_timestamp, min_deletion_time, min_ttl).
2152///
2153/// # VG3 authority note
2154///
2155/// The `EncodingStats.Serializer` (EncodingStats.java:274-276) uses the SAME unsigned-VInt
2156/// + epoch-offset format for **both** `nb` and `oa`:
2157///
2158/// ```text
2159/// out.writeUnsignedVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
2160/// out.writeUnsignedVInt32((int)(stats.minLocalDeletionTime - DELETION_TIME_EPOCH))
2161/// out.writeUnsignedVInt32(stats.minTTL - TTL_EPOCH)
2162/// ```
2163///
2164/// The `hasUIntDeletionTime` gate (BigFormat.java:409) affects only the **StatsMetadata**
2165/// (STATS component in Statistics.db), not the SerializationHeader component where
2166/// EncodingStats lives.  The epoch-relative decoding here is correct for both nb and oa.
2167/// `gates` is accepted (not consumed) for API completeness; `None` is fine too.
2168fn parse_encoding_stats_vuints<'a>(
2169    input: &'a [u8],
2170    // VG3: gates threaded here for authority completeness.
2171    // Authority investigation: the EncodingStats.Serializer (EncodingStats.java:274-276)
2172    // uses the SAME unsigned VInt + epoch format for both nb and oa:
2173    //   out.writeUnsignedVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
2174    //   out.writeUnsignedVInt32((int)(stats.minLocalDeletionTime - DELETION_TIME_EPOCH))
2175    //   out.writeUnsignedVInt32(stats.minTTL - TTL_EPOCH)
2176    // The `hasUIntDeletionTime` gate (BigFormat.java:409) affects ONLY the
2177    // StatsMetadata section (Statistics.db STATS component), NOT the
2178    // SerializationHeader component where EncodingStats lives.  No decode
2179    // difference applies here.  Gates accepted but not consumed.
2180    _gates: Option<&VersionGates>,
2181) -> IResult<&'a [u8], (i64, i64, Option<i64>)> {
2182    let (rest, min_ts_delta) = parse_vuint(input)?;
2183    let (rest, min_ldt_delta) = parse_vuint(rest)?;
2184    let (rest, min_ttl_delta) = parse_vuint(rest)?;
2185
2186    Ok((
2187        rest,
2188        (
2189            min_ts_delta as i64 + TIMESTAMP_EPOCH,
2190            // EncodingStats.java:289: `long minLocalDeletionTime = in.readUnsignedVInt32() + DELETION_TIME_EPOCH`
2191            // Same formula for nb and oa — DELETION_TIME_EPOCH is always added back.
2192            min_ldt_delta as i64 + DELETION_TIME_EPOCH,
2193            Some(min_ttl_delta as i64 + TTL_EPOCH),
2194        ),
2195    ))
2196}
2197
2198/// Build ColumnInfo vectors from parsed type strings.
2199fn build_column_infos(
2200    partition_types: &[String],
2201    clustering_types: &[String],
2202) -> (
2203    Vec<super::header::ColumnInfo>,
2204    Vec<super::header::ColumnInfo>,
2205) {
2206    let partition_key_columns = build_partition_key_columns(partition_types);
2207    let clustering_key_columns = build_clustering_key_columns(clustering_types);
2208
2209    log::debug!(
2210        "Constructed ColumnInfo entries from SerializationHeader: {} partition keys, {} clustering keys",
2211        partition_key_columns.len(),
2212        clustering_key_columns.len()
2213    );
2214
2215    (partition_key_columns, clustering_key_columns)
2216}
2217
2218/// Fallback EncodingStats parser for when no TOC HEADER offset is available.
2219/// Uses ad-hoc parsing from the data following the file header.
2220fn parse_encoding_stats_fallback<'a>(
2221    input: &'a [u8],
2222    gates: Option<&VersionGates>,
2223) -> IResult<&'a [u8], EncodingStatsResult> {
2224    // Skip metadata_type (u32 BE) at start of data section
2225    let (rest, _metadata_type) = be_u32(input)?;
2226
2227    // Parse data section length (VInt)
2228    let (rest, _data_length) = parse_vuint(rest)?;
2229
2230    // Parse partitioner string length (VInt)
2231    let (rest, partitioner_len) = parse_vuint(rest)?;
2232
2233    // Skip partitioner string
2234    let (rest, _) = take(partitioner_len as usize)(rest)?;
2235
2236    // Skip additional metadata (observed: ~2 VInts before timestamp fields)
2237    let (rest, _metadata1) = parse_vuint(rest)?;
2238    let (rest, _metadata2) = parse_vuint(rest)?;
2239
2240    // Parse EncodingStats fields (unsigned VInt deltas from epoch)
2241    let (rest, (min_timestamp, min_deletion_time, min_ttl)) =
2242        parse_encoding_stats_vuints(rest, gates)?;
2243
2244    // Fall back to marker-based header search for schema
2245    let (_, (partition_types, clustering_types, columns)) = parse_serialization_header(rest)?;
2246
2247    let (partition_key_columns, clustering_key_columns) =
2248        build_column_infos(&partition_types, &clustering_types);
2249
2250    Ok((
2251        input,
2252        (
2253            min_timestamp,
2254            min_deletion_time,
2255            min_ttl,
2256            partition_key_columns,
2257            clustering_key_columns,
2258            columns,
2259        ),
2260    ))
2261}
2262
2263/// Main enhanced parser for real Statistics.db files (minimal implementation for Issue #162)
2264///
2265/// This function parses the header and minimal EncodingStats fields from nb-format
2266/// Statistics.db files. Only timestamp-related fields are extracted; all other
2267/// statistics (histograms, column stats, etc.) are populated with placeholder values.
2268///
2269/// This is sufficient for V5CompressedLegacy parser which requires min_timestamp,
2270/// min_local_deletion_time, and min_ttl for delta decoding baseline.
2271///
2272/// # Arguments
2273///
2274/// * `gates` - Optional [`VersionGates`] for version-sensitive decoding decisions
2275///   (VG1 plumbing).  Pass `None` from standalone tools/tests to use nb-compatible
2276///   defaults; pass `Some(&gates)` from `SSTableReader` to enable VG3 gating.
2277///
2278/// # Returns
2279///
2280/// SSTableStatistics with only header and timestamp_stats populated from real data.
2281pub fn parse_enhanced_statistics_file<'a>(
2282    input: &'a [u8],
2283    gates: Option<&VersionGates>,
2284) -> IResult<&'a [u8], SSTableStatistics> {
2285    // Parse the 32-byte header
2286    let (remaining, header) = parse_nb_format_header(input)?;
2287
2288    // Parse minimal statistics data (EncodingStats + SerializationHeader columns)
2289    // Pass full input for TOC-based HEADER offset lookup (Issue #216)
2290    let result = parse_nb_format_statistics_data(remaining, &header, input, gates);
2291
2292    match result {
2293        Ok((
2294            row_stats,
2295            timestamp_stats,
2296            table_stats,
2297            partition_stats,
2298            compression_stats,
2299            partition_columns,
2300            clustering_columns,
2301            columns,
2302        )) => {
2303            log::debug!(
2304                "Successfully parsed Statistics.db serialization header: {} partition keys, {} clustering keys, {} regular columns",
2305                partition_columns.len(),
2306                clustering_columns.len(),
2307                columns.len()
2308            );
2309
2310            let statistics = SSTableStatistics {
2311                header,
2312                row_stats,
2313                timestamp_stats,
2314                column_stats: vec![],
2315                table_stats,
2316                partition_stats,
2317                compression_stats,
2318                metadata: std::collections::HashMap::new(),
2319                serialization_header_columns: columns,
2320                serialization_header_partition_keys: partition_columns,
2321                serialization_header_clustering_keys: clustering_columns,
2322            };
2323
2324            Ok((remaining, statistics))
2325        }
2326        Err(e) => {
2327            // Convert Error to nom::Err
2328            log::warn!("Failed to parse nb-format Statistics.db: {}", e);
2329            Err(nom::Err::Error(nom::error::Error::new(
2330                input,
2331                nom::error::ErrorKind::Verify,
2332            )))
2333        }
2334    }
2335}
2336
2337/// Enhanced statistics reader with fallback (minimal implementation for Issue #162)
2338///
2339/// Attempts to parse nb-format Statistics.db with minimal EncodingStats extraction.
2340/// This provides the minimum fields needed for delta-coded timestamp decoding.
2341///
2342/// # Arguments
2343///
2344/// * `gates` - Optional [`VersionGates`] threaded from `SSTableReader` for VG3
2345///   version-sensitive decoding.  Pass `None` from standalone tools/tests; the
2346///   nb-compatible behaviour is used when `gates` is `None`.
2347///
2348/// # Returns
2349///
2350/// SSTableStatistics with minimal fields populated, or error if parsing fails.
2351pub fn parse_statistics_with_fallback<'a>(
2352    input: &'a [u8],
2353    gates: Option<&VersionGates>,
2354) -> IResult<&'a [u8], SSTableStatistics> {
2355    // Try the minimal enhanced parser
2356    parse_enhanced_statistics_file(input, gates)
2357}
2358
2359#[cfg(test)]
2360mod tests {
2361    use super::*;
2362
2363    #[test]
2364    fn test_serialization_header_with_no_clustering_keys() {
2365        // Test SerializationHeader with partition key and regular columns, no clustering keys
2366        // Format: [VInt partition_type_len] [0x00 0x00] [partition_type] [clustering_count=0] [0x00 0x00 column_count] [columns...]
2367
2368        let mut test_data = vec![];
2369
2370        // Partition key type: 41 bytes "(org.apache.cassandra.db.marshal.UUIDType"
2371        let partition_type = b"(org.apache.cassandra.db.marshal.UUIDType";
2372        test_data.extend_from_slice(&[0x00, 0x00]); // Marker
2373        test_data.push(partition_type.len() as u8);
2374        test_data.extend_from_slice(partition_type);
2375
2376        // Clustering key count = 0
2377        test_data.push(0x00);
2378
2379        // Regular columns section: separator (0x00) + count
2380        test_data.push(0x00); // section separator
2381        test_data.push(0x02); // column count
2382
2383        // Column 1: "id" (UUID)
2384        test_data.push(0x02); // name length = 2
2385        test_data.extend_from_slice(b"id");
2386        test_data.push(0x28); // type length = 40
2387        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UUIDType");
2388
2389        // Column 2: "name" (UTF8/text)
2390        test_data.push(0x04); // name length = 4
2391        test_data.extend_from_slice(b"name");
2392        test_data.push(0x28); // type length = 40
2393        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2394
2395        // Add some garbage data before the SerializationHeader
2396        let mut full_data = vec![0xFF; 100];
2397        full_data.extend_from_slice(&test_data);
2398
2399        let result = parse_serialization_header(&full_data);
2400        assert!(
2401            result.is_ok(),
2402            "Failed to parse SerializationHeader: {:?}",
2403            result.as_ref().err()
2404        );
2405
2406        let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
2407
2408        // Verify partition key
2409        assert_eq!(partition_types.len(), 1, "Expected 1 partition key");
2410        assert!(partition_types[0].contains("UUIDType"));
2411
2412        // Verify clustering keys (should be none)
2413        assert_eq!(clustering_types.len(), 0, "Expected 0 clustering keys");
2414
2415        // Verify regular columns
2416        assert_eq!(columns.len(), 2, "Expected 2 columns");
2417        assert_eq!(columns[0].name, "id");
2418        assert_eq!(columns[0].column_type, "uuid");
2419        assert_eq!(columns[1].name, "name");
2420        assert_eq!(columns[1].column_type, "text");
2421    }
2422
2423    #[test]
2424    fn test_serialization_header_with_clustering_keys() {
2425        // Test SerializationHeader with partition key, 2 clustering keys, and regular columns
2426
2427        let mut test_data = vec![];
2428
2429        // Partition key type: 41 bytes
2430        let partition_type = b"(org.apache.cassandra.db.marshal.UUIDType";
2431        test_data.extend_from_slice(&[0x00, 0x00]); // Marker
2432        test_data.push(partition_type.len() as u8);
2433        test_data.extend_from_slice(partition_type);
2434
2435        // Clustering key count = 2
2436        test_data.push(0x02);
2437
2438        // Clustering key 1: ReversedType(TimestampType)
2439        let ck1 =
2440            b"[org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimestampType)";
2441        test_data.push(ck1.len() as u8);
2442        test_data.extend_from_slice(ck1);
2443
2444        // Clustering key 2: UTF8Type
2445        let ck2 = b"(org.apache.cassandra.db.marshal.UTF8Type)";
2446        test_data.push(ck2.len() as u8);
2447        test_data.extend_from_slice(ck2);
2448
2449        // Regular columns section
2450        test_data.push(0x00); // separator
2451        test_data.push(0x02); // count
2452
2453        // Column 1: "data" (UTF8)
2454        test_data.push(0x04); // name length
2455        test_data.extend_from_slice(b"data");
2456        test_data.push(0x28); // type length
2457        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2458
2459        // Column 2: "value" (Int32)
2460        test_data.push(0x05); // name length
2461        test_data.extend_from_slice(b"value");
2462        test_data.push(0x29); // type length
2463        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
2464
2465        // Add garbage data before SerializationHeader
2466        let mut full_data = vec![0xFF; 100];
2467        full_data.extend_from_slice(&test_data);
2468
2469        let result = parse_serialization_header(&full_data);
2470        assert!(
2471            result.is_ok(),
2472            "Failed to parse SerializationHeader with clustering keys: {:?}",
2473            result.err()
2474        );
2475
2476        let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
2477
2478        // Verify partition key
2479        assert_eq!(partition_types.len(), 1);
2480        assert!(partition_types[0].contains("UUIDType"));
2481
2482        // Verify clustering keys
2483        assert_eq!(clustering_types.len(), 2, "Expected 2 clustering keys");
2484        assert!(clustering_types[0].contains("ReversedType"));
2485        assert!(clustering_types[0].contains("TimestampType"));
2486        assert!(clustering_types[1].contains("UTF8Type"));
2487
2488        // Verify regular columns
2489        assert_eq!(columns.len(), 2);
2490        assert_eq!(columns[0].name, "data");
2491        assert_eq!(columns[0].column_type, "text");
2492        assert_eq!(columns[1].name, "value");
2493        assert_eq!(columns[1].column_type, "int");
2494    }
2495
2496    #[test]
2497    fn test_serialization_header_with_static_columns() {
2498        // Test SerializationHeader with static columns (Issue #210)
2499        // Schema: partition key (uuid), clustering key (timestamp),
2500        //         static column (text), regular columns (text, int)
2501
2502        let mut test_data = vec![];
2503
2504        // Marker
2505        test_data.extend_from_slice(&[0x00, 0x00]);
2506
2507        // Partition key type: UUIDType (40 bytes)
2508        let partition_type = b"org.apache.cassandra.db.marshal.UUIDType";
2509        test_data.push(partition_type.len() as u8);
2510        test_data.extend_from_slice(partition_type);
2511
2512        // Clustering key count = 1
2513        test_data.push(0x01);
2514
2515        // Clustering key 1: TimestampType (45 bytes)
2516        let ck1 = b"org.apache.cassandra.db.marshal.TimestampType";
2517        test_data.push(ck1.len() as u8);
2518        test_data.extend_from_slice(ck1);
2519
2520        // Static column count = 1 (NOT a separator - this is the key fix!)
2521        test_data.push(0x01);
2522
2523        // Static column 1: "static_data" (UTF8Type)
2524        test_data.push(0x0b); // name length = 11
2525        test_data.extend_from_slice(b"static_data");
2526        test_data.push(0x28); // type length = 40
2527        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2528
2529        // Regular column count = 2
2530        test_data.push(0x02);
2531
2532        // Regular column 1: "row_data" (UTF8)
2533        test_data.push(0x08); // name length
2534        test_data.extend_from_slice(b"row_data");
2535        test_data.push(0x28); // type length = 40
2536        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2537
2538        // Regular column 2: "row_value" (Int32)
2539        test_data.push(0x09); // name length
2540        test_data.extend_from_slice(b"row_value");
2541        test_data.push(0x29); // type length = 41
2542        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
2543
2544        // Add garbage data before SerializationHeader
2545        let mut full_data = vec![0xFF; 100];
2546        full_data.extend_from_slice(&test_data);
2547
2548        let result = parse_serialization_header(&full_data);
2549        assert!(
2550            result.is_ok(),
2551            "Failed to parse SerializationHeader with static columns: {:?}",
2552            result.err()
2553        );
2554
2555        let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
2556
2557        // Verify partition key
2558        assert_eq!(partition_types.len(), 1);
2559        assert!(partition_types[0].contains("UUIDType"));
2560
2561        // Verify clustering keys
2562        assert_eq!(clustering_types.len(), 1);
2563        assert!(clustering_types[0].contains("TimestampType"));
2564
2565        // Verify columns (static + regular = 3 total)
2566        assert_eq!(
2567            columns.len(),
2568            3,
2569            "Expected 3 columns (1 static + 2 regular)"
2570        );
2571
2572        // Static column should be first and marked as static
2573        assert_eq!(columns[0].name, "static_data");
2574        assert_eq!(columns[0].column_type, "text");
2575        assert!(
2576            columns[0].is_static,
2577            "static_data should be marked as static"
2578        );
2579
2580        // Regular columns should NOT be static
2581        assert_eq!(columns[1].name, "row_data");
2582        assert_eq!(columns[1].column_type, "text");
2583        assert!(
2584            !columns[1].is_static,
2585            "row_data should NOT be marked as static"
2586        );
2587
2588        assert_eq!(columns[2].name, "row_value");
2589        assert_eq!(columns[2].column_type, "int");
2590        assert!(
2591            !columns[2].is_static,
2592            "row_value should NOT be marked as static"
2593        );
2594    }
2595
2596    #[test]
2597    fn test_marshal_type_conversion() {
2598        // Simple types should be converted to CQL names
2599        assert_eq!(
2600            convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.Int32Type"),
2601            "int"
2602        );
2603        assert_eq!(
2604            convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.UTF8Type"),
2605            "text"
2606        );
2607        assert_eq!(
2608            convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.UUIDType"),
2609            "uuid"
2610        );
2611        assert_eq!(
2612            convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.TimestampType"),
2613            "timestamp"
2614        );
2615        assert_eq!(
2616            convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.DecimalType"),
2617            "decimal"
2618        );
2619        assert_eq!(
2620            convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.SimpleDataType"),
2621            "simpledata"
2622        );
2623
2624        // UserType should be preserved unchanged (contains critical metadata)
2625        let udt = "org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type,63697479:org.apache.cassandra.db.marshal.UTF8Type)";
2626        assert_eq!(
2627            convert_marshal_type_to_cql(udt),
2628            udt,
2629            "UserType definitions must be preserved to retain keyspace, type name, and field metadata"
2630        );
2631
2632        // Frozen UserType should also be preserved
2633        let frozen_udt = "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type))";
2634        assert!(
2635            convert_marshal_type_to_cql(frozen_udt).contains("UserType("),
2636            "UserType inside FrozenType should be preserved"
2637        );
2638
2639        // List of frozen UDT should preserve the UserType
2640        let list_udt = "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type)))";
2641        assert!(
2642            convert_marshal_type_to_cql(list_udt).contains("UserType("),
2643            "UserType inside List should be preserved"
2644        );
2645    }
2646
2647    #[test]
2648    fn test_nb_format_header_parsing() {
2649        // Test data based on real file hex dump
2650        let test_data = vec![
2651            0x00, 0x00, 0x00, 0x04, // version_type = 4
2652            0x26, 0x29, 0x1b, 0x05, // statistics_kind
2653            0x00, 0x00, 0x00, 0x00, // reserved
2654            0x00, 0x00, 0x00, 0x2c, // data_length = 44
2655            0x00, 0x00, 0x00, 0x01, // metadata1 = 1
2656            0x00, 0x00, 0x00, 0x65, // metadata2 = 101
2657            0x00, 0x00, 0x00, 0x02, // metadata3 = 2
2658            0x00, 0x00, 0x14, 0xd4, // checksum/more = 5332
2659        ];
2660
2661        let result = parse_nb_format_header(&test_data);
2662        assert!(result.is_ok());
2663
2664        let (_, header) = result.unwrap();
2665        assert_eq!(header.version, 4);
2666        assert_eq!(header.statistics_kind, 0x2629_1b05);
2667        assert_eq!(header.data_length, 44);
2668        assert_eq!(header.metadata1, 1);
2669        assert_eq!(header.metadata2, 101);
2670        assert_eq!(header.metadata3, 2);
2671        assert_eq!(header.checksum, 0x14d4);
2672    }
2673
2674    #[test]
2675    fn test_statistics_data_extraction_with_invalid_data() {
2676        // Test with insufficient/invalid data - should fail to parse VInts
2677        let header = StatisticsHeader {
2678            version: 4,
2679            statistics_kind: 0x2629_1b05,
2680            data_length: 44,
2681            metadata1: 1,
2682            metadata2: 101,
2683            metadata3: 2,
2684            checksum: 0x14d4,
2685            table_id: None,
2686        };
2687
2688        let dummy_data = vec![0xFF; 10]; // Too short to parse properly
2689        let result = parse_nb_format_statistics_data(&dummy_data, &header, &dummy_data, None);
2690
2691        // Should return error because data is too short for VInt parsing
2692        assert!(result.is_err());
2693    }
2694
2695    #[test]
2696    fn test_enhanced_statistics_file_with_incomplete_data() {
2697        // Test data with valid header but missing data section
2698        let test_data = vec![
2699            0x00, 0x00, 0x00, 0x04, // version = 4
2700            0x26, 0x29, 0x1b, 0x05, // statistics_kind
2701            0x00, 0x00, 0x00, 0x00, // reserved
2702            0x00, 0x00, 0x00, 0x2c, // data_length = 44
2703            0x00, 0x00, 0x00, 0x01, // metadata1 = 1
2704            0x00, 0x00, 0x00, 0x65, // metadata2 = 101
2705            0x00, 0x00, 0x00, 0x02, // metadata3 = 2
2706            0x00, 0x00, 0x14,
2707            0xd4, // checksum = 5332
2708                  // No data section - should fail parsing
2709        ];
2710
2711        let result = parse_enhanced_statistics_file(&test_data, None);
2712
2713        // Should fail since there's no data section to parse
2714        assert!(result.is_err());
2715    }
2716
2717    #[test]
2718    fn test_parser_fallback_with_incomplete_data() {
2719        // Test with valid header but incomplete data
2720        let test_data = vec![
2721            0x00, 0x00, 0x00, 0x04, // version = 4
2722            0x26, 0x29, 0x1b, 0x05, // statistics_kind
2723            0x00, 0x00, 0x00, 0x00, // reserved
2724            0x00, 0x00, 0x00, 0x2c, // data_length = 44
2725            0x00, 0x00, 0x00, 0x01, // metadata1 = 1
2726            0x00, 0x00, 0x00, 0x65, // metadata2 = 101
2727            0x00, 0x00, 0x00, 0x02, // metadata3 = 2
2728            0x00, 0x00, 0x14, 0xd4, // checksum = 5332
2729        ];
2730
2731        let result = parse_statistics_with_fallback(&test_data, None);
2732
2733        // Should fail - incomplete data
2734        assert!(result.is_err());
2735    }
2736
2737    #[test]
2738    fn test_invalid_data_returns_error() {
2739        // Test with insufficient data
2740        let invalid_data = vec![0xFF; 10];
2741        let result = parse_statistics_with_fallback(&invalid_data, None);
2742        assert!(result.is_err(), "Invalid data should fail to parse");
2743    }
2744
2745    #[test]
2746    fn test_partition_key_extraction_via_backtracking() {
2747        // Test the backtracking logic to extract partition key type before the column marker
2748        // This simulates the real ttl_test_table case where we have:
2749        // VInt(40) + "org.apache.cassandra.db.marshal.UUIDType" + 0x00 0x00 + [count]
2750        // Note: Real files use 2-byte VInt: 0x80 0x28 for length 40
2751
2752        let mut test_data = vec![];
2753
2754        // Add some garbage data before the partition key
2755        test_data.extend_from_slice(&[0xFF; 50]);
2756
2757        // Partition key type: 40 bytes "org.apache.cassandra.db.marshal.UUIDType"
2758        test_data.extend_from_slice(&[0x80, 0x28]); // VInt: 40 (2-byte encoding)
2759        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UUIDType");
2760
2761        // Marker: 0x00 0x00 followed by column count
2762        // NOTE: In SerializationHeader, partition keys are NOT in the regular columns section
2763        // Only regular (non-key) columns are listed here
2764        test_data.push(0x00); // separator
2765        test_data.push(0x02); // 2 regular columns
2766
2767        // Regular Column 1: "expiring_value" (Int32)
2768        test_data.push(0x0E); // name length = 14
2769        test_data.extend_from_slice(b"expiring_value");
2770        test_data.push(0x29); // type length = 41
2771        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
2772
2773        // Regular Column 2: "session_info" (UTF8)
2774        test_data.push(0x0C); // name length = 12
2775        test_data.extend_from_slice(b"session_info");
2776        test_data.push(0x28); // type length = 40
2777        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2778
2779        // Parse the regular columns section which should extract partition key via backtracking
2780        let result = parse_regular_columns(&test_data);
2781        assert!(
2782            result.is_ok(),
2783            "Failed to parse columns with backtracking: {:?}",
2784            result.err()
2785        );
2786
2787        let (_remaining, (partition_keys, columns)) = result.unwrap();
2788
2789        // Verify partition key was extracted
2790        assert_eq!(
2791            partition_keys.len(),
2792            1,
2793            "Expected 1 partition key via backtracking"
2794        );
2795        assert_eq!(
2796            partition_keys[0],
2797            "org.apache.cassandra.db.marshal.UUIDType"
2798        );
2799
2800        // Verify regular columns
2801        assert_eq!(columns.len(), 2, "Expected 2 regular columns");
2802        assert_eq!(columns[0].name, "expiring_value");
2803        assert_eq!(columns[0].column_type, "int");
2804        assert!(!columns[0].is_primary_key);
2805        assert_eq!(columns[1].name, "session_info");
2806        assert_eq!(columns[1].column_type, "text");
2807        assert!(!columns[1].is_primary_key);
2808    }
2809
2810    #[test]
2811    fn test_partition_key_extraction_with_longer_type() {
2812        // Test with a composite partition key type (longer type string)
2813        let mut test_data = vec![0xFF; 100]; // Garbage prefix
2814
2815        // CompositeType with multiple components: 75 bytes
2816        let composite_type =
2817            "(org.apache.cassandra.db.marshal.CompositeType(UTF8Type,Int32Type,UUIDType)";
2818        let type_len = composite_type.len() as u8;
2819
2820        // VInt encode the length (75 = 0x4B, fits in single byte)
2821        test_data.push(type_len);
2822        test_data.extend_from_slice(composite_type.as_bytes());
2823
2824        // Marker + column count
2825        test_data.push(0x00); // separator
2826        test_data.push(0x01); // column count
2827
2828        // Single column: "data" (UTF8)
2829        test_data.push(0x04);
2830        test_data.extend_from_slice(b"data");
2831        test_data.push(0x28);
2832        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2833
2834        let result = parse_regular_columns(&test_data);
2835        assert!(result.is_ok(), "Failed to parse: {:?}", result.err());
2836
2837        let (_remaining, (partition_keys, columns)) = result.unwrap();
2838
2839        assert_eq!(partition_keys.len(), 1);
2840        assert_eq!(partition_keys[0], composite_type);
2841
2842        // Expect 1 regular column
2843        assert_eq!(columns.len(), 1);
2844        assert_eq!(columns[0].name, "data");
2845        assert!(!columns[0].is_primary_key);
2846    }
2847
2848    #[test]
2849    fn test_backtracking_with_no_partition_key() {
2850        // Test case where there's no partition key before the marker
2851        // This should still parse columns successfully but return empty partition key list
2852
2853        let mut test_data = vec![];
2854
2855        // Just the marker and columns, no partition key type before
2856        test_data.push(0x00); // separator
2857        test_data.push(0x01); // count
2858
2859        // Column: "name" (UTF8)
2860        test_data.push(0x04);
2861        test_data.extend_from_slice(b"name");
2862        test_data.push(0x28);
2863        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2864
2865        let result = parse_regular_columns(&test_data);
2866        assert!(result.is_ok());
2867
2868        let (_remaining, (partition_keys, columns)) = result.unwrap();
2869
2870        assert_eq!(partition_keys.len(), 0, "Should have no partition keys");
2871        assert_eq!(columns.len(), 1);
2872        assert_eq!(columns[0].name, "name");
2873    }
2874
2875    #[test]
2876    fn test_backtracking_rejects_invalid_types() {
2877        // Test that backtracking rejects strings that don't match Cassandra type patterns
2878        let mut test_data = vec![0xFF; 50];
2879
2880        // Invalid type: doesn't start with '(' and doesn't contain "org.apache.cassandra"
2881        test_data.push(0x15); // VInt: 21 bytes
2882        test_data.extend_from_slice(b"InvalidTypeDescriptor");
2883
2884        // Marker + column count
2885        test_data.extend_from_slice(&[0x00, 0x00, 0x01]);
2886
2887        // Column
2888        test_data.push(0x04);
2889        test_data.extend_from_slice(b"test");
2890        test_data.push(0x28);
2891        test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
2892
2893        let result = parse_regular_columns(&test_data);
2894        assert!(result.is_ok());
2895
2896        let (_remaining, (partition_keys, _columns)) = result.unwrap();
2897
2898        // Should not extract the invalid type
2899        assert_eq!(
2900            partition_keys.len(),
2901            0,
2902            "Should reject invalid type pattern"
2903        );
2904    }
2905}