Skip to main content

idb/innodb/
field_decode.rs

1//! Field-level value decoding for InnoDB records.
2//!
3//! Decodes raw bytes from InnoDB compact-format records into typed values
4//! using column metadata from SDI (MySQL 8.0+ data dictionary). Handles
5//! InnoDB's internal storage encodings: big-endian integers with XOR'd
6//! sign bit, IEEE 754 floats with sign-bit manipulation, packed DATE and
7//! DATETIME fields, and UTF-8 string fields.
8//!
9//! # Supported types
10//!
11//! | SQL Type | InnoDB encoding | Decoder |
12//! |----------|----------------|---------|
13//! | TINYINT–BIGINT | Big-endian, XOR high bit for signed | `decode_int` |
14//! | FLOAT | 4-byte IEEE 754 with sign handling | `decode_float` |
15//! | DOUBLE | 8-byte IEEE 754 with sign handling | `decode_double` |
16//! | DECIMAL | Packed BCD with sign in first nibble | `decode_decimal` |
17//! | DATE | 3-byte packed year/month/day | `decode_date` |
18//! | DATETIME | 5+fsp bytes packed bit-field | `decode_datetime` |
19//! | TIMESTAMP | 4+fsp bytes UTC seconds | `decode_timestamp` |
20//! | TIME | 3+fsp bytes packed bit-field | `decode_time` |
21//! | YEAR | 1 byte + 1900 | `decode_year` |
22//! | VARCHAR/CHAR | UTF-8 with lossy fallback | `decode_string` |
23//! | ENUM | 1-2 byte index into element list | `decode_enum` |
24//! | SET | Bitmask into element list | `decode_set` |
25//! | BLOB/TEXT | Inline bytes or hex | `decode_string` |
26//! | JSON | Inline bytes as hex | `decode_hex` |
27//! | GEOMETRY | Inline bytes as hex (WKB) | `decode_hex` |
28//! | Others | Raw hex | `decode_hex` |
29
30use serde::Serialize;
31
32use crate::innodb::schema::DdTable;
33
34/// Decoded field value from an InnoDB record.
35#[derive(Debug, Clone, Serialize)]
36#[serde(untagged)]
37pub enum FieldValue {
38    /// SQL NULL.
39    Null,
40    /// Signed integer (TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT).
41    Int(i64),
42    /// Unsigned integer.
43    Uint(u64),
44    /// Single-precision float.
45    Float(f32),
46    /// Double-precision float.
47    Double(f64),
48    /// String value (VARCHAR, CHAR, DATE, DATETIME, etc.).
49    Str(String),
50    /// Hex-encoded bytes for unsupported types.
51    Hex(String),
52}
53
54/// Physical storage information for a single column.
55#[derive(Debug, Clone)]
56pub struct ColumnStorageInfo {
57    /// Column name.
58    pub name: String,
59    /// MySQL dd_type code.
60    pub dd_type: u64,
61    /// SQL type string (e.g., "int", "varchar(255)").
62    pub column_type: String,
63    /// Whether the column allows NULL.
64    pub is_nullable: bool,
65    /// Whether the column is unsigned.
66    pub is_unsigned: bool,
67    /// Fixed-length size in bytes (0 for variable-length).
68    pub fixed_len: usize,
69    /// Whether this is a variable-length field.
70    pub is_variable: bool,
71    /// Maximum bytes per character for string types.
72    pub charset_max_bytes: usize,
73    /// Fractional seconds precision for DATETIME/TIMESTAMP/TIME.
74    pub datetime_precision: u64,
75    /// Whether this is a system column (DB_TRX_ID, DB_ROLL_PTR, DB_ROW_ID).
76    pub is_system_column: bool,
77    /// ENUM/SET element names (from SDI metadata).
78    pub elements: Vec<String>,
79    /// Numeric precision for DECIMAL.
80    pub numeric_precision: u64,
81    /// Numeric scale for DECIMAL.
82    pub numeric_scale: u64,
83}
84
85// MySQL dd_type codes (from sql/dd/types/column.h)
86const DD_TYPE_TINY: u64 = 2; // TINYINT
87const DD_TYPE_SHORT: u64 = 3; // SMALLINT
88const DD_TYPE_INT24: u64 = 5; // MEDIUMINT
89const DD_TYPE_LONG: u64 = 4; // INT
90const DD_TYPE_LONGLONG: u64 = 9; // BIGINT
91const DD_TYPE_FLOAT: u64 = 6; // FLOAT
92const DD_TYPE_DOUBLE: u64 = 7; // DOUBLE
93const DD_TYPE_NEWDECIMAL: u64 = 20; // DECIMAL
94const DD_TYPE_DATE: u64 = 13; // DATE (newdate)
95const DD_TYPE_DATETIME: u64 = 18; // DATETIME2
96const DD_TYPE_TIMESTAMP: u64 = 17; // TIMESTAMP2
97const DD_TYPE_YEAR: u64 = 14; // YEAR
98const DD_TYPE_VARCHAR: u64 = 16; // VARCHAR
99const DD_TYPE_STRING: u64 = 15; // CHAR
100const DD_TYPE_BLOB: u64 = 19; // BLOB/TEXT
101const DD_TYPE_JSON: u64 = 21; // JSON
102const DD_TYPE_ENUM: u64 = 22; // ENUM
103const DD_TYPE_SET: u64 = 23; // SET
104const DD_TYPE_BIT: u64 = 24; // BIT
105const DD_TYPE_GEOMETRY: u64 = 25; // GEOMETRY
106const DD_TYPE_TIME2: u64 = 12; // TIME2
107
108/// Build a column layout from SDI table metadata.
109///
110/// Maps SDI columns to physical InnoDB storage order:
111/// 1. Primary key columns (from clustered index)
112/// 2. DB_TRX_ID (6 bytes) — system column
113/// 3. DB_ROLL_PTR (7 bytes) — system column
114/// 4. Remaining user columns in ordinal order
115///
116/// Hidden columns (hidden == 2, i.e., SE-hidden) are included as system columns.
117/// Virtual/generated columns (is_virtual) are excluded.
118pub fn build_column_layout(dd_table: &DdTable) -> Vec<ColumnStorageInfo> {
119    let mut layout = Vec::new();
120
121    // Find the PRIMARY/clustered index
122    let primary_idx = dd_table.indexes.iter().find(|i| i.index_type == 1);
123
124    // Collect PK column ordinal positions
125    let mut pk_col_positions: Vec<u64> = Vec::new();
126    if let Some(pk) = primary_idx {
127        for elem in &pk.elements {
128            if !elem.hidden {
129                pk_col_positions.push(elem.column_opx);
130            }
131        }
132    }
133
134    // Build visible user columns sorted by ordinal_position
135    let mut user_columns: Vec<&crate::innodb::schema::DdColumn> = dd_table
136        .columns
137        .iter()
138        .filter(|c| !c.is_virtual && (c.hidden == 1 || c.hidden == 4)) // HT_VISIBLE + HT_HIDDEN_USER only
139        .collect();
140    user_columns.sort_by_key(|c| c.ordinal_position);
141
142    // PK columns first
143    for &pk_opx in &pk_col_positions {
144        if let Some(col) = dd_table.columns.get(pk_opx as usize) {
145            if !col.is_virtual && (col.hidden == 1 || col.hidden == 4) {
146                layout.push(column_to_storage_info(col, false));
147            }
148        }
149    }
150
151    // System columns
152    layout.push(ColumnStorageInfo {
153        name: "DB_TRX_ID".to_string(),
154        dd_type: 0,
155        column_type: "system".to_string(),
156        is_nullable: false,
157        is_unsigned: true,
158        fixed_len: 6,
159        is_variable: false,
160        charset_max_bytes: 0,
161        datetime_precision: 0,
162        is_system_column: true,
163        elements: Vec::new(),
164        numeric_precision: 0,
165        numeric_scale: 0,
166    });
167    layout.push(ColumnStorageInfo {
168        name: "DB_ROLL_PTR".to_string(),
169        dd_type: 0,
170        column_type: "system".to_string(),
171        is_nullable: false,
172        is_unsigned: true,
173        fixed_len: 7,
174        is_variable: false,
175        charset_max_bytes: 0,
176        datetime_precision: 0,
177        is_system_column: true,
178        elements: Vec::new(),
179        numeric_precision: 0,
180        numeric_scale: 0,
181    });
182
183    // Remaining non-PK user columns
184    for col in &user_columns {
185        let col_opx = dd_table.columns.iter().position(|c| std::ptr::eq(c, *col));
186        if let Some(opx) = col_opx {
187            if !pk_col_positions.contains(&(opx as u64)) {
188                layout.push(column_to_storage_info(col, false));
189            }
190        }
191    }
192
193    layout
194}
195
196/// Convert a DdColumn to a ColumnStorageInfo.
197fn column_to_storage_info(
198    col: &crate::innodb::schema::DdColumn,
199    is_system: bool,
200) -> ColumnStorageInfo {
201    let (fixed_len, is_variable) = compute_storage_size(col);
202    let charset_max_bytes = charset_max_bytes_from_collation(col.collation_id);
203    let elements: Vec<String> = col.elements.iter().map(|e| e.name.clone()).collect();
204
205    ColumnStorageInfo {
206        name: col.name.clone(),
207        dd_type: col.dd_type,
208        column_type: col.column_type_utf8.clone(),
209        is_nullable: col.is_nullable,
210        is_unsigned: col.is_unsigned,
211        fixed_len,
212        is_variable,
213        charset_max_bytes,
214        datetime_precision: col.datetime_precision,
215        is_system_column: is_system,
216        elements,
217        numeric_precision: col.numeric_precision,
218        numeric_scale: col.numeric_scale,
219    }
220}
221
222/// Compute the fixed storage size and variable-length flag for a column.
223fn compute_storage_size(col: &crate::innodb::schema::DdColumn) -> (usize, bool) {
224    match col.dd_type {
225        DD_TYPE_TINY => (1, false),
226        DD_TYPE_SHORT => (2, false),
227        DD_TYPE_INT24 => (3, false),
228        DD_TYPE_LONG => (4, false),
229        DD_TYPE_LONGLONG => (8, false),
230        DD_TYPE_FLOAT => (4, false),
231        DD_TYPE_DOUBLE => (8, false),
232        DD_TYPE_YEAR => (1, false),
233        DD_TYPE_DATE => (3, false),
234        DD_TYPE_DATETIME | DD_TYPE_TIMESTAMP | DD_TYPE_TIME2 => {
235            // Base size + fractional seconds storage
236            let base = match col.dd_type {
237                DD_TYPE_DATETIME => 5,
238                DD_TYPE_TIMESTAMP => 4,
239                DD_TYPE_TIME2 => 3,
240                _ => unreachable!(),
241            };
242            let fsp_bytes = fsp_storage_bytes(col.datetime_precision);
243            (base + fsp_bytes, false)
244        }
245        DD_TYPE_VARCHAR | DD_TYPE_BLOB | DD_TYPE_JSON | DD_TYPE_GEOMETRY => {
246            (0, true) // variable-length
247        }
248        DD_TYPE_STRING => {
249            // CHAR: fixed-length = char_length * charset_max_bytes
250            let max_bytes = charset_max_bytes_from_collation(col.collation_id);
251            if max_bytes > 1 {
252                // Multi-byte CHAR is stored as variable-length in compact format
253                (0, true)
254            } else {
255                (col.char_length as usize, false)
256            }
257        }
258        DD_TYPE_ENUM => {
259            // ENUM: 1 byte for <=255 values, 2 bytes otherwise
260            let n = col.elements.len();
261            if n <= 255 {
262                (1, false)
263            } else {
264                (2, false)
265            }
266        }
267        DD_TYPE_SET => {
268            // SET: ceil(n_elements / 8) bytes
269            let n = col.elements.len();
270            let bytes = n.div_ceil(8).max(1);
271            (bytes, false)
272        }
273        DD_TYPE_BIT => {
274            // BIT(M): ceil(M / 8) bytes
275            let bits = col.char_length as usize;
276            let bytes = bits.div_ceil(8).max(1);
277            (bytes, false)
278        }
279        DD_TYPE_NEWDECIMAL => {
280            // DECIMAL: complex packed BCD, approximate
281            let precision = col.numeric_precision as usize;
282            let scale = col.numeric_scale as usize;
283            let intg = precision - scale;
284            let intg_bytes = (intg / 9) * 4 + decimal_leftover_bytes(intg % 9);
285            let frac_bytes = (scale / 9) * 4 + decimal_leftover_bytes(scale % 9);
286            (intg_bytes + frac_bytes, false)
287        }
288        _ => (0, true), // Unknown: treat as variable
289    }
290}
291
292/// Bytes needed for leftover digits in DECIMAL packed BCD.
293fn decimal_leftover_bytes(digits: usize) -> usize {
294    match digits {
295        0 => 0,
296        1..=2 => 1,
297        3..=4 => 2,
298        5..=6 => 3,
299        7..=9 => 4,
300        _ => 4,
301    }
302}
303
304/// Storage bytes for fractional seconds precision.
305fn fsp_storage_bytes(fsp: u64) -> usize {
306    match fsp {
307        0 => 0,
308        1 | 2 => 1,
309        3 | 4 => 2,
310        5 | 6 => 3,
311        _ => 0,
312    }
313}
314
315/// Determine max bytes per character from collation ID.
316/// Common collation IDs from MySQL:
317///   - 33 (utf8_general_ci): 3 bytes
318///   - 45 (utf8mb4_general_ci): 4 bytes
319///   - 255 (utf8mb4_0900_ai_ci): 4 bytes
320///   - 8 (latin1_swedish_ci): 1 byte
321///   - 63 (binary): 1 byte
322fn charset_max_bytes_from_collation(collation_id: u64) -> usize {
323    match collation_id {
324        // latin1 collations
325        5 | 8 | 15 | 31 | 47 | 48 | 49 | 94 => 1,
326        // binary
327        63 => 1,
328        // ascii
329        11 | 65 => 1,
330        // utf8 (3-byte)
331        33 | 83 | 192..=215 => 3,
332        // utf8mb4 (4-byte) — most common in MySQL 8.0+
333        45 | 46 | 224..=247 | 255 | 256..=310 => 4,
334        // Default: assume 4-byte (safe upper bound)
335        _ => 4,
336    }
337}
338
339/// Decode a field value from raw bytes based on column storage info.
340pub fn decode_field(data: &[u8], col: &ColumnStorageInfo) -> FieldValue {
341    if data.is_empty() {
342        return FieldValue::Null;
343    }
344
345    match col.dd_type {
346        DD_TYPE_TINY => decode_int(data, 1, col.is_unsigned),
347        DD_TYPE_SHORT => decode_int(data, 2, col.is_unsigned),
348        DD_TYPE_INT24 => decode_int(data, 3, col.is_unsigned),
349        DD_TYPE_LONG => decode_int(data, 4, col.is_unsigned),
350        DD_TYPE_LONGLONG => decode_int(data, 8, col.is_unsigned),
351        DD_TYPE_FLOAT => decode_float(data),
352        DD_TYPE_DOUBLE => decode_double(data),
353        DD_TYPE_NEWDECIMAL => decode_decimal(data, col.numeric_precision, col.numeric_scale),
354        DD_TYPE_DATE => decode_date(data),
355        DD_TYPE_DATETIME => decode_datetime(data, col.datetime_precision),
356        DD_TYPE_TIMESTAMP => decode_timestamp(data, col.datetime_precision),
357        DD_TYPE_TIME2 => decode_time(data, col.datetime_precision),
358        DD_TYPE_YEAR => decode_year(data),
359        DD_TYPE_VARCHAR | DD_TYPE_STRING => decode_string(data),
360        DD_TYPE_BLOB => decode_string(data),
361        DD_TYPE_ENUM => decode_enum(data, &col.elements),
362        DD_TYPE_SET => decode_set(data, &col.elements),
363        DD_TYPE_JSON | DD_TYPE_GEOMETRY => decode_hex(data),
364        // System columns: decode as unsigned int
365        0 if col.is_system_column => decode_int(data, data.len(), true),
366        // Everything else: hex fallback
367        _ => decode_hex(data),
368    }
369}
370
371/// Decode a big-endian integer with XOR'd sign bit.
372///
373/// InnoDB stores signed integers with the high bit XOR'd so that
374/// memcmp ordering matches numeric ordering.
375fn decode_int(data: &[u8], size: usize, unsigned: bool) -> FieldValue {
376    if data.len() < size {
377        return decode_hex(data);
378    }
379
380    // Read big-endian unsigned value
381    let mut val: u64 = 0;
382    for &b in &data[..size] {
383        val = (val << 8) | b as u64;
384    }
385
386    // XOR the high bit (InnoDB encoding for memcmp ordering)
387    let sign_bit: u64 = 1 << (size * 8 - 1);
388    val ^= sign_bit;
389
390    if unsigned {
391        FieldValue::Uint(val)
392    } else {
393        // Convert to signed: if original high bit was 0 (now 1 after XOR),
394        // value is positive. If original was 1 (now 0), value is negative.
395        let max_unsigned: u64 = if size == 8 {
396            u64::MAX
397        } else {
398            (1u64 << (size * 8)) - 1
399        };
400        if val > (max_unsigned >> 1) {
401            // Negative: val is in [sign_bit..max], map to negative range
402            // For size bytes, negative range is [-(sign_bit)..-1]
403            let signed = (val as i64).wrapping_sub(1i64.wrapping_shl((size * 8) as u32));
404            FieldValue::Int(signed)
405        } else {
406            FieldValue::Int(val as i64)
407        }
408    }
409}
410
411/// Decode a 4-byte InnoDB float.
412///
413/// InnoDB float encoding: if high bit is set, XOR all bits (positive).
414/// If high bit is clear, XOR only high bit (negative, to flip sign for ordering).
415fn decode_float(data: &[u8]) -> FieldValue {
416    if data.len() < 4 {
417        return decode_hex(data);
418    }
419
420    let mut bytes = [data[0], data[1], data[2], data[3]];
421    if bytes[0] & 0x80 != 0 {
422        // Positive: XOR all bits
423        for b in &mut bytes {
424            *b ^= 0xFF;
425        }
426    } else {
427        // Negative: XOR high bit only
428        bytes[0] ^= 0x80;
429    }
430    // Now reverse the byte order — InnoDB stores float in big-endian
431    bytes.reverse();
432    let f = f32::from_le_bytes(bytes);
433    FieldValue::Float(f)
434}
435
436/// Decode an 8-byte InnoDB double.
437///
438/// Same encoding as float but 8 bytes.
439fn decode_double(data: &[u8]) -> FieldValue {
440    if data.len() < 8 {
441        return decode_hex(data);
442    }
443
444    let mut bytes = [
445        data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
446    ];
447    if bytes[0] & 0x80 != 0 {
448        for b in &mut bytes {
449            *b ^= 0xFF;
450        }
451    } else {
452        bytes[0] ^= 0x80;
453    }
454    bytes.reverse();
455    let d = f64::from_le_bytes(bytes);
456    FieldValue::Double(d)
457}
458
459/// Decode a 3-byte InnoDB DATE (newdate format).
460///
461/// Stored as: day(5) | month(4) | year(15) packed into 3 bytes big-endian.
462/// Actually stored as little-endian 3-byte integer in InnoDB.
463fn decode_date(data: &[u8]) -> FieldValue {
464    if data.len() < 3 {
465        return decode_hex(data);
466    }
467
468    // 3-byte big-endian packed date
469    let val = ((data[0] as u32) << 16) | ((data[1] as u32) << 8) | data[2] as u32;
470
471    // XOR high bit for signed ordering
472    let val = val ^ (1 << 23);
473
474    let day = val & 0x1F;
475    let month = (val >> 5) & 0x0F;
476    let year = val >> 9;
477
478    if year == 0 && month == 0 && day == 0 {
479        FieldValue::Str("0000-00-00".to_string())
480    } else {
481        FieldValue::Str(format!("{:04}-{:02}-{:02}", year, month, day))
482    }
483}
484
485/// Decode a DATETIME2 (5 + fsp bytes).
486///
487/// Packed as big-endian integer with XOR'd sign bit:
488/// - year_month (17 bits): year * 13 + month
489/// - day (5 bits)
490/// - hour (5 bits)
491/// - minute (6 bits)
492/// - second (6 bits)
493///
494/// Total: 40 bits = 5 bytes
495fn decode_datetime(data: &[u8], fsp: u64) -> FieldValue {
496    let base_len = 5;
497    let fsp_bytes = fsp_storage_bytes(fsp);
498    if data.len() < base_len + fsp_bytes {
499        return decode_hex(data);
500    }
501
502    // Read 5-byte big-endian
503    let mut val: u64 = 0;
504    for &b in &data[..5] {
505        val = (val << 8) | b as u64;
506    }
507
508    // XOR high bit
509    val ^= 1 << 39;
510
511    let second = val & 0x3F;
512    let minute = (val >> 6) & 0x3F;
513    let hour = (val >> 12) & 0x1F;
514    let day = (val >> 17) & 0x1F;
515    let year_month = val >> 22;
516
517    let year = year_month / 13;
518    let month = year_month % 13;
519
520    if fsp > 0 && fsp_bytes > 0 {
521        // Read fractional seconds
522        let mut frac: u32 = 0;
523        for &b in &data[5..5 + fsp_bytes] {
524            frac = (frac << 8) | b as u32;
525        }
526        // fsp 1-2: 1 byte, fsp 3-4: 2 bytes, fsp 5-6: 3 bytes
527        // Adjust to microseconds
528        let micros = match fsp {
529            1 | 2 => frac as u64 * 10000,
530            3 | 4 => frac as u64 * 100,
531            5 | 6 => frac as u64,
532            _ => 0,
533        };
534        let frac_str = format!("{:06}", micros);
535        let frac_trimmed = &frac_str[..fsp as usize];
536        FieldValue::Str(format!(
537            "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{}",
538            year, month, day, hour, minute, second, frac_trimmed
539        ))
540    } else {
541        FieldValue::Str(format!(
542            "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
543            year, month, day, hour, minute, second
544        ))
545    }
546}
547
548/// Decode a TIMESTAMP2 (4 + fsp bytes).
549///
550/// 4-byte big-endian UTC seconds since epoch.
551fn decode_timestamp(data: &[u8], fsp: u64) -> FieldValue {
552    if data.len() < 4 {
553        return decode_hex(data);
554    }
555
556    let secs = ((data[0] as u32) << 24)
557        | ((data[1] as u32) << 16)
558        | ((data[2] as u32) << 8)
559        | data[3] as u32;
560
561    if secs == 0 {
562        return FieldValue::Str("0000-00-00 00:00:00".to_string());
563    }
564
565    // Convert to date/time components (UTC)
566    // Simple UTC conversion without timezone support
567    let days_since_epoch = secs / 86400;
568    let time_of_day = secs % 86400;
569    let hour = time_of_day / 3600;
570    let minute = (time_of_day % 3600) / 60;
571    let second = time_of_day % 60;
572
573    // Days since 1970-01-01 to Y-M-D
574    let (year, month, day) = days_to_ymd(days_since_epoch);
575
576    let fsp_bytes = fsp_storage_bytes(fsp);
577    if fsp > 0 && fsp_bytes > 0 && data.len() >= 4 + fsp_bytes {
578        let mut frac: u32 = 0;
579        for &b in &data[4..4 + fsp_bytes] {
580            frac = (frac << 8) | b as u32;
581        }
582        let micros = match fsp {
583            1 | 2 => frac as u64 * 10000,
584            3 | 4 => frac as u64 * 100,
585            5 | 6 => frac as u64,
586            _ => 0,
587        };
588        let frac_str = format!("{:06}", micros);
589        let frac_trimmed = &frac_str[..fsp as usize];
590        FieldValue::Str(format!(
591            "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{}",
592            year, month, day, hour, minute, second, frac_trimmed
593        ))
594    } else {
595        FieldValue::Str(format!(
596            "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
597            year, month, day, hour, minute, second
598        ))
599    }
600}
601
602/// Convert days since 1970-01-01 to (year, month, day).
603fn days_to_ymd(days: u32) -> (u32, u32, u32) {
604    // Algorithm from https://howardhinnant.github.io/date_algorithms.html
605    let z = days + 719468;
606    let era = z / 146097;
607    let doe = z - era * 146097;
608    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
609    let y = yoe + era * 400;
610    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
611    let mp = (5 * doy + 2) / 153;
612    let d = doy - (153 * mp + 2) / 5 + 1;
613    let m = if mp < 10 { mp + 3 } else { mp - 9 };
614    let y = if m <= 2 { y + 1 } else { y };
615    (y, m, d)
616}
617
618/// Decode a 1-byte YEAR field.
619fn decode_year(data: &[u8]) -> FieldValue {
620    if data.is_empty() {
621        return FieldValue::Null;
622    }
623    let val = data[0];
624    if val == 0 {
625        FieldValue::Str("0000".to_string())
626    } else {
627        FieldValue::Uint(1900 + val as u64)
628    }
629}
630
631/// Decode a string field (VARCHAR or CHAR).
632fn decode_string(data: &[u8]) -> FieldValue {
633    // CHAR fields have trailing padding (0x20 for ASCII/UTF-8)
634    let trimmed = trim_trailing_spaces(data);
635    FieldValue::Str(String::from_utf8_lossy(trimmed).to_string())
636}
637
638/// Trim trailing 0x20 (space) bytes.
639fn trim_trailing_spaces(data: &[u8]) -> &[u8] {
640    let mut end = data.len();
641    while end > 0 && data[end - 1] == 0x20 {
642        end -= 1;
643    }
644    &data[..end]
645}
646
647/// Decode InnoDB packed BCD DECIMAL.
648///
649/// InnoDB DECIMAL storage (packed BCD):
650/// - Digits are grouped into groups of 9, stored as 4-byte big-endian ints
651/// - Leftover digits (< 9) use 1-4 bytes depending on count
652/// - The first byte has the sign bit XOR'd for memcmp ordering
653/// - Positive values: high bit set in stored form
654/// - Negative values: all bytes XOR'd with 0xFF
655fn decode_decimal(data: &[u8], precision: u64, scale: u64) -> FieldValue {
656    if precision == 0 {
657        return decode_hex(data);
658    }
659
660    let intg = (precision - scale) as usize;
661    let frac = scale as usize;
662    let intg_full = intg / 9;
663    let intg_left = intg % 9;
664    let frac_full = frac / 9;
665    let frac_left = frac % 9;
666
667    let expected_len = intg_full * 4
668        + decimal_leftover_bytes(intg_left)
669        + frac_full * 4
670        + decimal_leftover_bytes(frac_left);
671
672    if data.len() < expected_len {
673        return decode_hex(data);
674    }
675
676    // Copy and handle sign
677    let mut buf = data[..expected_len].to_vec();
678    let negative = buf[0] & 0x80 == 0;
679
680    // For negative values, XOR all bytes with 0xFF to restore original BCD.
681    if negative {
682        for b in &mut buf {
683            *b ^= 0xFF;
684        }
685    }
686    // Clear sign bit (high bit of first byte) in all cases
687    buf[0] &= 0x7F;
688
689    let mut result = String::new();
690    if negative {
691        result.push('-');
692    }
693
694    let mut pos = 0;
695
696    // Decode integer leftover digits
697    if intg_left > 0 {
698        let bytes = decimal_leftover_bytes(intg_left);
699        let mut val: u32 = 0;
700        for &b in &buf[pos..pos + bytes] {
701            val = (val << 8) | b as u32;
702        }
703        result.push_str(&val.to_string());
704        pos += bytes;
705    }
706
707    // Decode full integer groups
708    for i in 0..intg_full {
709        let val = ((buf[pos] as u32) << 24)
710            | ((buf[pos + 1] as u32) << 16)
711            | ((buf[pos + 2] as u32) << 8)
712            | buf[pos + 3] as u32;
713        if i == 0 && intg_left == 0 {
714            result.push_str(&val.to_string());
715        } else {
716            result.push_str(&format!("{:09}", val));
717        }
718        pos += 4;
719    }
720
721    // If no integer part was written, write "0"
722    if intg == 0 || (intg_full == 0 && intg_left == 0) {
723        result.push('0');
724    }
725
726    // Decode fractional part
727    if frac > 0 {
728        result.push('.');
729
730        for _ in 0..frac_full {
731            let val = ((buf[pos] as u32) << 24)
732                | ((buf[pos + 1] as u32) << 16)
733                | ((buf[pos + 2] as u32) << 8)
734                | buf[pos + 3] as u32;
735            result.push_str(&format!("{:09}", val));
736            pos += 4;
737        }
738
739        if frac_left > 0 {
740            let bytes = decimal_leftover_bytes(frac_left);
741            let mut val: u32 = 0;
742            for &b in &buf[pos..pos + bytes] {
743                val = (val << 8) | b as u32;
744            }
745            let formatted = format!("{:0width$}", val, width = frac_left);
746            result.push_str(&formatted);
747        }
748    }
749
750    FieldValue::Str(result)
751}
752
753/// Decode a TIME2 field (3 + fsp bytes).
754///
755/// Stored as big-endian 3-byte integer with offset encoding:
756/// stored_value = signed_value + 0x800000.
757/// The signed value packs:
758/// - hours (10 bits)
759/// - minutes (6 bits)
760/// - seconds (6 bits)
761///
762/// Total: 24 bits = 3 bytes, plus FSP bytes for fractional seconds.
763fn decode_time(data: &[u8], fsp: u64) -> FieldValue {
764    let base_len = 3;
765    let fsp_bytes = fsp_storage_bytes(fsp);
766    if data.len() < base_len + fsp_bytes {
767        return decode_hex(data);
768    }
769
770    // Read 3-byte big-endian unsigned value
771    let stored: u32 = ((data[0] as u32) << 16) | ((data[1] as u32) << 8) | data[2] as u32;
772
773    // InnoDB TIME2 uses offset encoding: stored = signed_value + 0x800000
774    let signed_val: i32 = stored as i32 - 0x800000;
775    let negative = signed_val < 0;
776    let abs_val = signed_val.unsigned_abs();
777
778    let second = abs_val & 0x3F;
779    let minute = (abs_val >> 6) & 0x3F;
780    let hour = (abs_val >> 12) & 0x3FF;
781
782    let sign = if negative { "-" } else { "" };
783
784    if fsp > 0 && fsp_bytes > 0 {
785        let mut frac: u32 = 0;
786        for &b in &data[3..3 + fsp_bytes] {
787            frac = (frac << 8) | b as u32;
788        }
789        let micros = match fsp {
790            1 | 2 => frac as u64 * 10000,
791            3 | 4 => frac as u64 * 100,
792            5 | 6 => frac as u64,
793            _ => 0,
794        };
795        let frac_str = format!("{:06}", micros);
796        let frac_trimmed = &frac_str[..fsp as usize];
797        FieldValue::Str(format!(
798            "{}{:02}:{:02}:{:02}.{}",
799            sign, hour, minute, second, frac_trimmed
800        ))
801    } else {
802        FieldValue::Str(format!("{}{:02}:{:02}:{:02}", sign, hour, minute, second))
803    }
804}
805
806/// Decode an ENUM field (1 or 2 byte index).
807///
808/// ENUM stores a 1-based index into the element list.
809/// Index 0 means empty string ('').
810fn decode_enum(data: &[u8], elements: &[String]) -> FieldValue {
811    let idx = match data.len() {
812        1 => data[0] as usize,
813        2 => ((data[0] as usize) << 8) | data[1] as usize,
814        _ => return decode_hex(data),
815    };
816
817    if idx == 0 {
818        return FieldValue::Str(String::new());
819    }
820
821    // ENUM index is 1-based
822    if idx <= elements.len() {
823        FieldValue::Str(elements[idx - 1].clone())
824    } else {
825        FieldValue::Uint(idx as u64)
826    }
827}
828
829/// Decode a SET field (1-8 byte bitmask).
830///
831/// SET stores a bitmask where each bit corresponds to an element.
832fn decode_set(data: &[u8], elements: &[String]) -> FieldValue {
833    let mut bitmask: u64 = 0;
834    for (i, &b) in data.iter().enumerate() {
835        bitmask |= (b as u64) << (i * 8);
836    }
837
838    if bitmask == 0 {
839        return FieldValue::Str(String::new());
840    }
841
842    let mut selected = Vec::new();
843    for (i, elem) in elements.iter().enumerate() {
844        if bitmask & (1 << i) != 0 {
845            selected.push(elem.as_str());
846        }
847    }
848
849    FieldValue::Str(selected.join(","))
850}
851
852/// Hex-encode bytes as a fallback.
853fn decode_hex(data: &[u8]) -> FieldValue {
854    let hex: String = data.iter().map(|b| format!("{:02x}", b)).collect();
855    FieldValue::Hex(format!("0x{}", hex))
856}
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861
862    fn make_col(dd_type: u64, unsigned: bool) -> ColumnStorageInfo {
863        ColumnStorageInfo {
864            name: "test".to_string(),
865            dd_type,
866            column_type: "test".to_string(),
867            is_nullable: false,
868            is_unsigned: unsigned,
869            fixed_len: 4,
870            is_variable: false,
871            charset_max_bytes: 1,
872            datetime_precision: 0,
873            is_system_column: false,
874            elements: Vec::new(),
875            numeric_precision: 0,
876            numeric_scale: 0,
877        }
878    }
879
880    #[test]
881    fn test_decode_int_unsigned_zero() {
882        // InnoDB stores unsigned 0 as 0x00000000 with XOR: 0x80000000
883        let data = [0x80, 0x00, 0x00, 0x00];
884        let col = make_col(DD_TYPE_LONG, true);
885        match decode_field(&data, &col) {
886            FieldValue::Uint(v) => assert_eq!(v, 0),
887            other => panic!("Expected Uint(0), got {:?}", other),
888        }
889    }
890
891    #[test]
892    fn test_decode_int_unsigned_one() {
893        let data = [0x80, 0x00, 0x00, 0x01];
894        let col = make_col(DD_TYPE_LONG, true);
895        match decode_field(&data, &col) {
896            FieldValue::Uint(v) => assert_eq!(v, 1),
897            other => panic!("Expected Uint(1), got {:?}", other),
898        }
899    }
900
901    #[test]
902    fn test_decode_int_signed_zero() {
903        let data = [0x80, 0x00, 0x00, 0x00];
904        let col = make_col(DD_TYPE_LONG, false);
905        match decode_field(&data, &col) {
906            FieldValue::Int(v) => assert_eq!(v, 0),
907            other => panic!("Expected Int(0), got {:?}", other),
908        }
909    }
910
911    #[test]
912    fn test_decode_int_signed_positive() {
913        // Signed 42: stored as 0x8000002A (XOR high bit)
914        let data = [0x80, 0x00, 0x00, 0x2A];
915        let col = make_col(DD_TYPE_LONG, false);
916        match decode_field(&data, &col) {
917            FieldValue::Int(v) => assert_eq!(v, 42),
918            other => panic!("Expected Int(42), got {:?}", other),
919        }
920    }
921
922    #[test]
923    fn test_decode_int_signed_negative() {
924        // Signed -1: two's complement is 0xFFFFFFFF, XOR high bit => 0x7FFFFFFF
925        let data = [0x7F, 0xFF, 0xFF, 0xFF];
926        let col = make_col(DD_TYPE_LONG, false);
927        match decode_field(&data, &col) {
928            FieldValue::Int(v) => assert_eq!(v, -1),
929            other => panic!("Expected Int(-1), got {:?}", other),
930        }
931    }
932
933    #[test]
934    fn test_decode_int_signed_min() {
935        // INT minimum (-2147483648): stored as 0x00000000 (all zeros after XOR)
936        let data = [0x00, 0x00, 0x00, 0x00];
937        let col = make_col(DD_TYPE_LONG, false);
938        match decode_field(&data, &col) {
939            FieldValue::Int(v) => assert_eq!(v, -2147483648),
940            other => panic!("Expected Int(-2147483648), got {:?}", other),
941        }
942    }
943
944    #[test]
945    fn test_decode_tinyint() {
946        // Unsigned TINYINT 255: stored as 0xFF XOR 0x80 = 0x7F... wait
947        // TINYINT(1 byte): 255 unsigned stored as 0x80|0xFF = 0xFF XOR sign_bit
948        // sign_bit for 1 byte = 0x80, so 255 stored as 255 XOR 128 = 127?
949        // No: InnoDB XORs sign bit on store: store(v) = v XOR sign_bit
950        // So unsigned 0 -> 0x80, unsigned 255 -> 255^128 = 127? That seems wrong.
951        // Actually for UNSIGNED, high bit is XOR'd: stored = val ^ 0x80
952        // 0 -> 0x80, 127 -> 0xFF, 128 -> 0x00, 255 -> 0x7F
953        // On decode: val = stored ^ 0x80
954        // 0x80 -> 0, 0xFF -> 127, 0x00 -> 128, 0x7F -> 255
955        let data = [0x7F]; // stored value for unsigned 255
956        let col = make_col(DD_TYPE_TINY, true);
957        match decode_field(&data, &col) {
958            FieldValue::Uint(v) => assert_eq!(v, 255),
959            other => panic!("Expected Uint(255), got {:?}", other),
960        }
961    }
962
963    #[test]
964    fn test_decode_bigint() {
965        // Unsigned BIGINT 1: stored as 0x8000000000000001
966        let data = [0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01];
967        let col = make_col(DD_TYPE_LONGLONG, true);
968        match decode_field(&data, &col) {
969            FieldValue::Uint(v) => assert_eq!(v, 1),
970            other => panic!("Expected Uint(1), got {:?}", other),
971        }
972    }
973
974    #[test]
975    fn test_decode_year() {
976        let data = [126]; // 1900 + 126 = 2026
977        let col = make_col(DD_TYPE_YEAR, false);
978        match decode_field(&data, &col) {
979            FieldValue::Uint(v) => assert_eq!(v, 2026),
980            other => panic!("Expected Uint(2026), got {:?}", other),
981        }
982    }
983
984    #[test]
985    fn test_decode_year_zero() {
986        let data = [0];
987        let col = make_col(DD_TYPE_YEAR, false);
988        match decode_field(&data, &col) {
989            FieldValue::Str(s) => assert_eq!(s, "0000"),
990            other => panic!("Expected Str(0000), got {:?}", other),
991        }
992    }
993
994    #[test]
995    fn test_decode_string_varchar() {
996        let data = b"hello";
997        let col = ColumnStorageInfo {
998            name: "test".to_string(),
999            dd_type: DD_TYPE_VARCHAR,
1000            column_type: "varchar(255)".to_string(),
1001            is_nullable: false,
1002            is_unsigned: false,
1003            fixed_len: 0,
1004            is_variable: true,
1005            charset_max_bytes: 4,
1006            datetime_precision: 0,
1007            is_system_column: false,
1008            elements: vec![],
1009            numeric_precision: 0,
1010            numeric_scale: 0,
1011        };
1012        match decode_field(data, &col) {
1013            FieldValue::Str(s) => assert_eq!(s, "hello"),
1014            other => panic!("Expected Str(hello), got {:?}", other),
1015        }
1016    }
1017
1018    #[test]
1019    fn test_decode_string_char_padded() {
1020        // CHAR(10) with trailing spaces
1021        let data = b"hi        "; // "hi" + 8 spaces
1022        let col = ColumnStorageInfo {
1023            name: "test".to_string(),
1024            dd_type: DD_TYPE_STRING,
1025            column_type: "char(10)".to_string(),
1026            is_nullable: false,
1027            is_unsigned: false,
1028            fixed_len: 10,
1029            is_variable: false,
1030            charset_max_bytes: 1,
1031            datetime_precision: 0,
1032            is_system_column: false,
1033            elements: vec![],
1034            numeric_precision: 0,
1035            numeric_scale: 0,
1036        };
1037        match decode_field(data, &col) {
1038            FieldValue::Str(s) => assert_eq!(s, "hi"),
1039            other => panic!("Expected Str(hi), got {:?}", other),
1040        }
1041    }
1042
1043    #[test]
1044    fn test_decode_hex_fallback() {
1045        let data = [0xDE, 0xAD, 0xBE, 0xEF];
1046        // Use an unrecognized dd_type so it falls through to hex
1047        let col = make_col(255, false);
1048        match decode_field(&data, &col) {
1049            FieldValue::Hex(h) => assert_eq!(h, "0xdeadbeef"),
1050            other => panic!("Expected Hex, got {:?}", other),
1051        }
1052    }
1053
1054    #[test]
1055    fn test_decode_empty_data() {
1056        let data: &[u8] = &[];
1057        let col = make_col(DD_TYPE_LONG, false);
1058        match decode_field(data, &col) {
1059            FieldValue::Null => {}
1060            other => panic!("Expected Null, got {:?}", other),
1061        }
1062    }
1063
1064    #[test]
1065    fn test_build_column_layout_ordering() {
1066        // Minimal DdTable with 3 columns and a PRIMARY KEY on col 0
1067        let dd_table = DdTable {
1068            name: "test".to_string(),
1069            columns: vec![
1070                crate::innodb::schema::DdColumn {
1071                    name: "id".to_string(),
1072                    dd_type: DD_TYPE_LONG,
1073                    ordinal_position: 1,
1074                    is_unsigned: true,
1075                    hidden: 1, // HT_VISIBLE
1076                    ..Default::default()
1077                },
1078                crate::innodb::schema::DdColumn {
1079                    name: "name".to_string(),
1080                    dd_type: DD_TYPE_VARCHAR,
1081                    ordinal_position: 2,
1082                    column_type_utf8: "varchar(100)".to_string(),
1083                    hidden: 1, // HT_VISIBLE
1084                    ..Default::default()
1085                },
1086                crate::innodb::schema::DdColumn {
1087                    name: "age".to_string(),
1088                    dd_type: DD_TYPE_LONG,
1089                    ordinal_position: 3,
1090                    hidden: 1, // HT_VISIBLE
1091                    ..Default::default()
1092                },
1093            ],
1094            indexes: vec![crate::innodb::schema::DdIndex {
1095                name: "PRIMARY".to_string(),
1096                index_type: 1,
1097                elements: vec![crate::innodb::schema::DdIndexElement {
1098                    column_opx: 0,
1099                    length: 4294967295,
1100                    order: 2,
1101                    hidden: false,
1102                }],
1103                ..Default::default()
1104            }],
1105            ..Default::default()
1106        };
1107
1108        let layout = build_column_layout(&dd_table);
1109        // Expected order: id (PK), DB_TRX_ID, DB_ROLL_PTR, name, age
1110        assert_eq!(layout.len(), 5);
1111        assert_eq!(layout[0].name, "id");
1112        assert_eq!(layout[1].name, "DB_TRX_ID");
1113        assert!(layout[1].is_system_column);
1114        assert_eq!(layout[2].name, "DB_ROLL_PTR");
1115        assert!(layout[2].is_system_column);
1116        assert_eq!(layout[3].name, "name");
1117        assert_eq!(layout[4].name, "age");
1118    }
1119
1120    #[test]
1121    fn test_fsp_storage_bytes() {
1122        assert_eq!(fsp_storage_bytes(0), 0);
1123        assert_eq!(fsp_storage_bytes(1), 1);
1124        assert_eq!(fsp_storage_bytes(2), 1);
1125        assert_eq!(fsp_storage_bytes(3), 2);
1126        assert_eq!(fsp_storage_bytes(4), 2);
1127        assert_eq!(fsp_storage_bytes(5), 3);
1128        assert_eq!(fsp_storage_bytes(6), 3);
1129    }
1130
1131    #[test]
1132    fn test_charset_max_bytes() {
1133        assert_eq!(charset_max_bytes_from_collation(8), 1); // latin1
1134        assert_eq!(charset_max_bytes_from_collation(63), 1); // binary
1135        assert_eq!(charset_max_bytes_from_collation(33), 3); // utf8
1136        assert_eq!(charset_max_bytes_from_collation(255), 4); // utf8mb4
1137    }
1138
1139    // -----------------------------------------------------------------------
1140    // DECIMAL decoder tests
1141    // -----------------------------------------------------------------------
1142
1143    #[test]
1144    fn test_decode_decimal_positive() {
1145        // DECIMAL(10,2) value 12345.67
1146        // precision=10, scale=2 → intg=8, frac=2
1147        // intg: 8/9=0 full groups, 8 leftover → 4 bytes
1148        // frac: 2/9=0 full groups, 2 leftover → 1 byte
1149        // Total: 5 bytes
1150        // Positive: high bit set
1151        // intg_left(8 digits) = 12345 → stored as 4-byte BE with sign bit
1152        // frac_left(2 digits) = 67 → stored as 1 byte
1153        let mut col = make_col(DD_TYPE_NEWDECIMAL, false);
1154        col.numeric_precision = 10;
1155        col.numeric_scale = 2;
1156
1157        // Build: 0x80 | (12345 >> 24) ...
1158        // 12345 = 0x00003039
1159        // With sign bit: 0x80003039
1160        let data = [0x80, 0x00, 0x30, 0x39, 0x43]; // 12345 + 67
1161        match decode_field(&data, &col) {
1162            FieldValue::Str(s) => assert_eq!(s, "12345.67"),
1163            other => panic!("Expected Str(12345.67), got {:?}", other),
1164        }
1165    }
1166
1167    #[test]
1168    fn test_decode_decimal_zero() {
1169        // DECIMAL(5,2) value 0.00
1170        // intg=3 leftover → 2 bytes, frac=2 leftover → 1 byte = 3 bytes
1171        let mut col = make_col(DD_TYPE_NEWDECIMAL, false);
1172        col.numeric_precision = 5;
1173        col.numeric_scale = 2;
1174
1175        let data = [0x80, 0x00, 0x00]; // positive zero
1176        match decode_field(&data, &col) {
1177            FieldValue::Str(s) => assert_eq!(s, "0.00"),
1178            other => panic!("Expected Str(0.00), got {:?}", other),
1179        }
1180    }
1181
1182    #[test]
1183    fn test_decode_decimal_negative() {
1184        // DECIMAL(5,2) value -1.23
1185        // intg=3 leftover → 2 bytes, frac=2 leftover → 1 byte = 3 bytes
1186        // Negative: high bit clear, all other bytes XOR'd with 0xFF
1187        // Positive 1.23 would be: [0x80, 0x01, 0x17]  (1 in 2 bytes, 23 in 1 byte)
1188        // Negative -1.23: XOR high bit → [0x00, ...], XOR rest → [0x00, 0xFE, 0xE8]
1189        // Actually: positive is [0x80, 0x01, 0x17], negative: first byte 0x80 XOR = 0x00,
1190        // then remaining bytes XOR 0xFF: 0x01 ^ 0xFF = 0xFE, 0x17 ^ 0xFF = 0xE8
1191        let mut col = make_col(DD_TYPE_NEWDECIMAL, false);
1192        col.numeric_precision = 5;
1193        col.numeric_scale = 2;
1194
1195        // Negative storage: sign bit clear, rest inverted
1196        let data = [0x7F, 0xFE, 0xE8]; // -1.23
1197        match decode_field(&data, &col) {
1198            FieldValue::Str(s) => assert_eq!(s, "-1.23"),
1199            other => panic!("Expected Str(-1.23), got {:?}", other),
1200        }
1201    }
1202
1203    #[test]
1204    fn test_decode_decimal_multi_group() {
1205        // DECIMAL(20,4) value 1234567890.1234
1206        // intg = 16: 1 full group (9 digits) + 7 leftover (4 bytes)
1207        // frac = 4: 0 full groups + 4 leftover (2 bytes)
1208        // Total: 4 + 4 + 2 = 10 bytes
1209        //
1210        // Integer part: 1234567890 (10 digits in 16-digit field)
1211        //   leftover 7 digits: 0000001 → value 1 (stored in 4 bytes)
1212        //   full group: 234567890 (stored in 4 bytes)
1213        // Fractional part: .1234
1214        //   leftover 4 digits: 1234 (stored in 2 bytes)
1215        //
1216        // Positive: XOR sign bit on first byte
1217        // leftover value 1 → 4 bytes: [0x00, 0x00, 0x00, 0x01] → with sign: [0x80, 0x00, 0x00, 0x01]
1218        // full group: 234567890 = 0x0DFB38D2 → [0x0D, 0xFB, 0x38, 0xD2]
1219        // frac leftover: 1234 = 0x04D2 → [0x04, 0xD2]
1220        let mut col = make_col(DD_TYPE_NEWDECIMAL, false);
1221        col.numeric_precision = 20;
1222        col.numeric_scale = 4;
1223
1224        let data = [0x80, 0x00, 0x00, 0x01, 0x0D, 0xFB, 0x38, 0xD2, 0x04, 0xD2];
1225        match decode_field(&data, &col) {
1226            FieldValue::Str(s) => assert_eq!(s, "1234567890.1234"),
1227            other => panic!("Expected Str(1234567890.1234), got {:?}", other),
1228        }
1229    }
1230
1231    // -----------------------------------------------------------------------
1232    // TIME decoder tests
1233    // -----------------------------------------------------------------------
1234
1235    #[test]
1236    fn test_decode_time_positive() {
1237        // TIME value 12:30:45
1238        // Packed: hours=12, minutes=30, seconds=45
1239        // signed_val = (12 << 12) | (30 << 6) | 45 = 0xC7AD = 51117
1240        // Stored with offset: signed_val + 0x800000 = 0x80C7AD
1241        let mut col = make_col(DD_TYPE_TIME2, false);
1242        col.datetime_precision = 0;
1243        let data = [0x80, 0xC7, 0xAD]; // 12:30:45 with offset encoding
1244        match decode_field(&data, &col) {
1245            FieldValue::Str(s) => assert_eq!(s, "12:30:45"),
1246            other => panic!("Expected Str(12:30:45), got {:?}", other),
1247        }
1248    }
1249
1250    #[test]
1251    fn test_decode_time_zero() {
1252        // TIME value 00:00:00
1253        // signed_val = 0, stored = 0 + 0x800000 = 0x800000
1254        let mut col = make_col(DD_TYPE_TIME2, false);
1255        col.datetime_precision = 0;
1256        let data = [0x80, 0x00, 0x00]; // positive zero stored
1257        match decode_field(&data, &col) {
1258            FieldValue::Str(s) => assert_eq!(s, "00:00:00"),
1259            other => panic!("Expected Str(00:00:00), got {:?}", other),
1260        }
1261    }
1262
1263    // -----------------------------------------------------------------------
1264    // ENUM decoder tests
1265    // -----------------------------------------------------------------------
1266
1267    #[test]
1268    fn test_decode_enum_valid_index() {
1269        let mut col = make_col(DD_TYPE_ENUM, false);
1270        col.elements = vec!["red".to_string(), "green".to_string(), "blue".to_string()];
1271        col.fixed_len = 1;
1272
1273        // Index 2 = "green" (1-based)
1274        let data = [0x02];
1275        match decode_field(&data, &col) {
1276            FieldValue::Str(s) => assert_eq!(s, "green"),
1277            other => panic!("Expected Str(green), got {:?}", other),
1278        }
1279    }
1280
1281    #[test]
1282    fn test_decode_enum_zero_index() {
1283        let mut col = make_col(DD_TYPE_ENUM, false);
1284        col.elements = vec!["red".to_string(), "green".to_string()];
1285        col.fixed_len = 1;
1286
1287        // Index 0 = empty string
1288        let data = [0x00];
1289        match decode_field(&data, &col) {
1290            FieldValue::Str(s) => assert_eq!(s, ""),
1291            other => panic!("Expected Str(), got {:?}", other),
1292        }
1293    }
1294
1295    #[test]
1296    fn test_decode_enum_two_byte() {
1297        let mut col = make_col(DD_TYPE_ENUM, false);
1298        col.elements = vec!["a".to_string(); 300]; // > 255 elements
1299        col.elements[255] = "found_it".to_string();
1300        col.fixed_len = 2;
1301
1302        // Index 256 (0x0100) = 256th element (0-based index 255)
1303        let data = [0x01, 0x00];
1304        match decode_field(&data, &col) {
1305            FieldValue::Str(s) => assert_eq!(s, "found_it"),
1306            other => panic!("Expected Str(found_it), got {:?}", other),
1307        }
1308    }
1309
1310    // -----------------------------------------------------------------------
1311    // SET decoder tests
1312    // -----------------------------------------------------------------------
1313
1314    #[test]
1315    fn test_decode_set_single() {
1316        let mut col = make_col(DD_TYPE_SET, false);
1317        col.elements = vec![
1318            "read".to_string(),
1319            "write".to_string(),
1320            "execute".to_string(),
1321        ];
1322        col.fixed_len = 1;
1323
1324        // Bitmask 0x05 = bits 0 and 2 → "read,execute"
1325        let data = [0x05];
1326        match decode_field(&data, &col) {
1327            FieldValue::Str(s) => assert_eq!(s, "read,execute"),
1328            other => panic!("Expected Str(read,execute), got {:?}", other),
1329        }
1330    }
1331
1332    #[test]
1333    fn test_decode_set_all() {
1334        let mut col = make_col(DD_TYPE_SET, false);
1335        col.elements = vec!["a".to_string(), "b".to_string(), "c".to_string()];
1336        col.fixed_len = 1;
1337
1338        // Bitmask 0x07 = all three
1339        let data = [0x07];
1340        match decode_field(&data, &col) {
1341            FieldValue::Str(s) => assert_eq!(s, "a,b,c"),
1342            other => panic!("Expected Str(a,b,c), got {:?}", other),
1343        }
1344    }
1345
1346    #[test]
1347    fn test_decode_set_empty() {
1348        let mut col = make_col(DD_TYPE_SET, false);
1349        col.elements = vec!["a".to_string(), "b".to_string()];
1350        col.fixed_len = 1;
1351
1352        let data = [0x00];
1353        match decode_field(&data, &col) {
1354            FieldValue::Str(s) => assert_eq!(s, ""),
1355            other => panic!("Expected empty Str, got {:?}", other),
1356        }
1357    }
1358
1359    // -----------------------------------------------------------------------
1360    // BLOB/TEXT decoder test
1361    // -----------------------------------------------------------------------
1362
1363    #[test]
1364    fn test_decode_blob_inline() {
1365        let col = ColumnStorageInfo {
1366            name: "data".to_string(),
1367            dd_type: DD_TYPE_BLOB,
1368            column_type: "text".to_string(),
1369            is_nullable: true,
1370            is_unsigned: false,
1371            fixed_len: 0,
1372            is_variable: true,
1373            charset_max_bytes: 4,
1374            datetime_precision: 0,
1375            is_system_column: false,
1376            elements: Vec::new(),
1377            numeric_precision: 0,
1378            numeric_scale: 0,
1379        };
1380
1381        let data = b"hello world";
1382        match decode_field(data, &col) {
1383            FieldValue::Str(s) => assert_eq!(s, "hello world"),
1384            other => panic!("Expected Str, got {:?}", other),
1385        }
1386    }
1387
1388    // -----------------------------------------------------------------------
1389    // JSON/GEOMETRY decoder tests (hex fallback)
1390    // -----------------------------------------------------------------------
1391
1392    #[test]
1393    fn test_decode_json_hex() {
1394        let col = make_col(DD_TYPE_JSON, false);
1395        let data = [0x01, 0x02, 0x03];
1396        match decode_field(&data, &col) {
1397            FieldValue::Hex(h) => assert_eq!(h, "0x010203"),
1398            other => panic!("Expected Hex, got {:?}", other),
1399        }
1400    }
1401
1402    #[test]
1403    fn test_decode_geometry_hex() {
1404        let col = make_col(DD_TYPE_GEOMETRY, false);
1405        let data = [0xAB, 0xCD];
1406        match decode_field(&data, &col) {
1407            FieldValue::Hex(h) => assert_eq!(h, "0xabcd"),
1408            other => panic!("Expected Hex, got {:?}", other),
1409        }
1410    }
1411
1412    // -----------------------------------------------------------------------
1413    // DECIMAL storage size tests
1414    // -----------------------------------------------------------------------
1415
1416    #[test]
1417    fn test_decimal_storage_size() {
1418        assert_eq!(decimal_leftover_bytes(0), 0);
1419        assert_eq!(decimal_leftover_bytes(1), 1);
1420        assert_eq!(decimal_leftover_bytes(2), 1);
1421        assert_eq!(decimal_leftover_bytes(4), 2);
1422        assert_eq!(decimal_leftover_bytes(9), 4);
1423    }
1424
1425    // -----------------------------------------------------------------------
1426    // TIME2 storage size tests
1427    // -----------------------------------------------------------------------
1428
1429    #[test]
1430    fn test_time2_storage_size() {
1431        let col = crate::innodb::schema::DdColumn {
1432            dd_type: DD_TYPE_TIME2,
1433            datetime_precision: 0,
1434            ..Default::default()
1435        };
1436        assert_eq!(compute_storage_size(&col), (3, false));
1437
1438        let col3 = crate::innodb::schema::DdColumn {
1439            dd_type: DD_TYPE_TIME2,
1440            datetime_precision: 3,
1441            ..Default::default()
1442        };
1443        assert_eq!(compute_storage_size(&col3), (5, false));
1444
1445        let col6 = crate::innodb::schema::DdColumn {
1446            dd_type: DD_TYPE_TIME2,
1447            datetime_precision: 6,
1448            ..Default::default()
1449        };
1450        assert_eq!(compute_storage_size(&col6), (6, false));
1451    }
1452}