zero_mysql/protocol/
value.rs

1/// MySQL Binary Protocol Value Types
2use crate::constant::{ColumnFlags, ColumnType};
3use crate::error::{Error, Result, eyre};
4use crate::protocol::command::ColumnTypeAndFlags;
5use crate::protocol::primitive::*;
6use zerocopy::byteorder::little_endian::{U16 as U16LE, U32 as U32LE};
7use zerocopy::{FromBytes, Immutable, KnownLayout};
8
9#[derive(Debug, Clone, Copy)]
10pub enum Value<'a> {
11    /// NULL value
12    Null,
13    /// Signed integer (TINYINT, SMALLINT, INT, BIGINT)
14    SignedInt(i64),
15    /// Unsigned integer (TINYINT UNSIGNED, SMALLINT UNSIGNED, INT UNSIGNED, BIGINT UNSIGNED)
16    UnsignedInt(u64),
17    /// FLOAT - 4-byte floating point
18    Float(f32),
19    /// DOUBLE - 8-byte floating point
20    Double(f64),
21    /// DATE/DATETIME/TIMESTAMP - 0 bytes (0000-00-00 00:00:00)
22    Timestamp0,
23    /// DATE/DATETIME/TIMESTAMP - 4 bytes (ymd)
24    Timestamp4(&'a Timestamp4),
25    /// DATE/DATETIME/TIMESTAMP - 7 bytes (ymd + hms)
26    Timestamp7(&'a Timestamp7),
27    /// DATE/DATETIME/TIMESTAMP - 11 bytes (ymd + hms + microseconds)
28    Timestamp11(&'a Timestamp11),
29    /// TIME - 0 bytes (00:00:00)
30    Time0,
31    /// TIME - 8 bytes (without microseconds)
32    Time8(&'a Time8),
33    /// TIME - 12 bytes (with microseconds)
34    Time12(&'a Time12),
35    /// BLOB, GEOMETRY, STRING, VARCHAR, VAR_STRING, ..
36    Byte(&'a [u8]),
37}
38
39impl<'a> Value<'a> {
40    /// Parse a single binary protocol value based on column type and flags
41    ///
42    /// Returns the parsed value and the remaining bytes
43    pub fn parse(type_and_flags: &ColumnTypeAndFlags, data: &'a [u8]) -> Result<(Self, &'a [u8])> {
44        let is_unsigned = type_and_flags.flags.contains(ColumnFlags::UNSIGNED_FLAG);
45
46        match type_and_flags.column_type {
47            ColumnType::MYSQL_TYPE_NULL => Ok((Value::Null, data)),
48
49            // Integer types
50            ColumnType::MYSQL_TYPE_TINY => {
51                let (val, rest) = read_int_1(data)?;
52                let value = if is_unsigned {
53                    Value::UnsignedInt(val as u64)
54                } else {
55                    Value::SignedInt(val as i8 as i64)
56                };
57                Ok((value, rest))
58            }
59
60            ColumnType::MYSQL_TYPE_SHORT | ColumnType::MYSQL_TYPE_YEAR => {
61                let (val, rest) = read_int_2(data)?;
62                let value = if is_unsigned {
63                    Value::UnsignedInt(val as u64)
64                } else {
65                    Value::SignedInt(val as i16 as i64)
66                };
67                Ok((value, rest))
68            }
69
70            ColumnType::MYSQL_TYPE_INT24 | ColumnType::MYSQL_TYPE_LONG => {
71                let (val, rest) = read_int_4(data)?;
72                let value = if is_unsigned {
73                    Value::UnsignedInt(val as u64)
74                } else {
75                    Value::SignedInt(val as i32 as i64)
76                };
77                Ok((value, rest))
78            }
79
80            ColumnType::MYSQL_TYPE_LONGLONG => {
81                let (val, rest) = read_int_8(data)?;
82                let value = if is_unsigned {
83                    Value::UnsignedInt(val)
84                } else {
85                    Value::SignedInt(val as i64)
86                };
87                Ok((value, rest))
88            }
89
90            // Floating point types
91            ColumnType::MYSQL_TYPE_FLOAT => {
92                let (val, rest) = read_int_4(data)?;
93                Ok((Value::Float(f32::from_bits(val)), rest))
94            }
95
96            ColumnType::MYSQL_TYPE_DOUBLE => {
97                let (val, rest) = read_int_8(data)?;
98                Ok((Value::Double(f64::from_bits(val)), rest))
99            }
100
101            // Temporal types - use length-encoded format
102            ColumnType::MYSQL_TYPE_DATE
103            | ColumnType::MYSQL_TYPE_DATETIME
104            | ColumnType::MYSQL_TYPE_TIMESTAMP
105            | ColumnType::MYSQL_TYPE_TIMESTAMP2
106            | ColumnType::MYSQL_TYPE_DATETIME2
107            | ColumnType::MYSQL_TYPE_NEWDATE => {
108                let (len, mut rest) = read_int_1(data)?;
109                match len {
110                    0 => Ok((Value::Timestamp0, rest)),
111                    4 => {
112                        let ts = Timestamp4::ref_from_bytes(&rest[..4])?;
113                        rest = &rest[4..];
114                        Ok((Value::Timestamp4(ts), rest))
115                    }
116                    7 => {
117                        let ts = Timestamp7::ref_from_bytes(&rest[..7])?;
118                        rest = &rest[7..];
119                        Ok((Value::Timestamp7(ts), rest))
120                    }
121                    11 => {
122                        let ts = Timestamp11::ref_from_bytes(&rest[..11])?;
123                        rest = &rest[11..];
124                        Ok((Value::Timestamp11(ts), rest))
125                    }
126                    _ => Err(Error::LibraryBug(eyre!(
127                        "invalid timestamp length: {}",
128                        len
129                    ))),
130                }
131            }
132
133            // TIME types
134            ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => {
135                let (len, mut rest) = read_int_1(data)?;
136                match len {
137                    0 => Ok((Value::Time0, rest)),
138                    8 => {
139                        let time = Time8::ref_from_bytes(&rest[..8])?;
140                        rest = &rest[8..];
141                        Ok((Value::Time8(time), rest))
142                    }
143                    12 => {
144                        let time = Time12::ref_from_bytes(&rest[..12])?;
145                        rest = &rest[12..];
146                        Ok((Value::Time12(time), rest))
147                    }
148                    _ => Err(Error::LibraryBug(eyre!("invalid time length: {}", len))),
149                }
150            }
151
152            // String and BLOB types - length-encoded
153            ColumnType::MYSQL_TYPE_VARCHAR
154            | ColumnType::MYSQL_TYPE_VAR_STRING
155            | ColumnType::MYSQL_TYPE_STRING
156            | ColumnType::MYSQL_TYPE_BLOB
157            | ColumnType::MYSQL_TYPE_TINY_BLOB
158            | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
159            | ColumnType::MYSQL_TYPE_LONG_BLOB
160            | ColumnType::MYSQL_TYPE_GEOMETRY
161            | ColumnType::MYSQL_TYPE_JSON
162            | ColumnType::MYSQL_TYPE_DECIMAL
163            | ColumnType::MYSQL_TYPE_NEWDECIMAL
164            | ColumnType::MYSQL_TYPE_ENUM
165            | ColumnType::MYSQL_TYPE_SET
166            | ColumnType::MYSQL_TYPE_BIT
167            | ColumnType::MYSQL_TYPE_TYPED_ARRAY => {
168                let (bytes, rest) = read_string_lenenc(data)?;
169                Ok((Value::Byte(bytes), rest))
170            }
171        }
172    }
173}
174
175// ============================================================================
176// Temporal Types
177// ============================================================================
178
179/// TIMESTAMP - 4 bytes (DATE/DATETIME/TIMESTAMP with date only)
180#[repr(C, packed)]
181#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable)]
182pub struct Timestamp4 {
183    pub year: U16LE,
184    pub month: u8,
185    pub day: u8,
186}
187
188impl Timestamp4 {
189    pub fn year(&self) -> u16 {
190        self.year.get()
191    }
192}
193
194/// TIMESTAMP - 7 bytes (DATE/DATETIME/TIMESTAMP without microseconds)
195#[repr(C, packed)]
196#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable)]
197pub struct Timestamp7 {
198    pub year: U16LE,
199    pub month: u8,
200    pub day: u8,
201    pub hour: u8,
202    pub minute: u8,
203    pub second: u8,
204}
205
206impl Timestamp7 {
207    pub fn year(&self) -> u16 {
208        self.year.get()
209    }
210}
211
212/// TIMESTAMP - 11 bytes (DATE/DATETIME/TIMESTAMP with microseconds)
213#[repr(C, packed)]
214#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable)]
215pub struct Timestamp11 {
216    pub year: U16LE,
217    pub month: u8,
218    pub day: u8,
219    pub hour: u8,
220    pub minute: u8,
221    pub second: u8,
222    pub microsecond: U32LE,
223}
224
225impl Timestamp11 {
226    pub fn year(&self) -> u16 {
227        self.year.get()
228    }
229
230    pub fn microsecond(&self) -> u32 {
231        self.microsecond.get()
232    }
233}
234
235/// TIME - 8 bytes
236#[repr(C, packed)]
237#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable)]
238pub struct Time8 {
239    pub is_negative: u8,
240    pub days: U32LE,
241    pub hour: u8,
242    pub minute: u8,
243    pub second: u8,
244}
245
246impl Time8 {
247    pub fn is_negative(&self) -> bool {
248        self.is_negative != 0
249    }
250
251    pub fn days(&self) -> u32 {
252        self.days.get()
253    }
254}
255
256/// TIME - 12 bytesative (1), days (4 LE), hour (1), minute (1), second (1), microsecond (4 LE)
257#[repr(C, packed)]
258#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable)]
259pub struct Time12 {
260    pub is_negative: u8,
261    pub days: U32LE,
262    pub hour: u8,
263    pub minute: u8,
264    pub second: u8,
265    pub microsecond: U32LE,
266}
267
268impl Time12 {
269    pub fn is_negative(&self) -> bool {
270        self.is_negative != 0
271    }
272
273    pub fn days(&self) -> u32 {
274        self.days.get()
275    }
276
277    pub fn microsecond(&self) -> u32 {
278        self.microsecond.get()
279    }
280}
281
282// ============================================================================
283// NULL Bitmap
284// ============================================================================
285
286/// NULL bitmap for binary protocol
287///
288/// In MySQL binary protocol, NULL values are indicated by a bitmap where each bit
289/// represents whether a column is NULL (1 = NULL, 0 = not NULL).
290///
291/// For result sets (COM_STMT_EXECUTE response), the bitmap has an offset of 2 bits.
292/// For prepared statement parameters, the offset is 0 bits.
293#[derive(Debug, Clone, Copy)]
294pub struct NullBitmap<'a> {
295    bitmap: &'a [u8],
296    offset: usize,
297}
298
299impl<'a> NullBitmap<'a> {
300    /// Create a NULL bitmap for result sets (offset = 2)
301    pub fn for_result_set(bitmap: &'a [u8]) -> Self {
302        Self { bitmap, offset: 2 }
303    }
304
305    /// Create a NULL bitmap for parameters (offset = 0)
306    pub fn for_parameters(bitmap: &'a [u8]) -> Self {
307        Self { bitmap, offset: 0 }
308    }
309
310    /// Check if the column at the given index is NULL
311    ///
312    /// # Arguments
313    /// * `idx` - Column index (0-based)
314    ///
315    /// # Returns
316    /// `true` if the column is NULL, `false` otherwise
317    pub fn is_null(&self, idx: usize) -> bool {
318        let bit_pos = idx + self.offset;
319        let byte_pos = bit_pos >> 3;
320        let bit_offset = bit_pos & 7;
321
322        if byte_pos >= self.bitmap.len() {
323            return false;
324        }
325
326        (self.bitmap[byte_pos] & (1 << bit_offset)) != 0
327    }
328
329    /// Get the raw bitmap bytes
330    pub fn as_bytes(&self) -> &'a [u8] {
331        self.bitmap
332    }
333}
334
335// ============================================================================
336// Tests
337// ============================================================================
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_value_parse_signed_integers() {
345        // TINYINT (-42)
346        let type_and_flags = ColumnTypeAndFlags {
347            column_type: ColumnType::MYSQL_TYPE_TINY,
348            flags: ColumnFlags::empty(),
349        };
350        let data = [214u8]; // -42 as i8
351        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
352        assert!(matches!(value, Value::SignedInt(-42)));
353        assert_eq!(rest.len(), 0);
354
355        // SMALLINT (-1000)
356        let type_and_flags = ColumnTypeAndFlags {
357            column_type: ColumnType::MYSQL_TYPE_SHORT,
358            flags: ColumnFlags::empty(),
359        };
360        let data = [0x18, 0xFC]; // -1000 as i16 LE
361        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
362        assert!(matches!(value, Value::SignedInt(-1000)));
363        assert_eq!(rest.len(), 0);
364
365        // INT (-100000)
366        let type_and_flags = ColumnTypeAndFlags {
367            column_type: ColumnType::MYSQL_TYPE_LONG,
368            flags: ColumnFlags::empty(),
369        };
370        let data = [0x60, 0x79, 0xFE, 0xFF]; // -100000 as i32 LE
371        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
372        assert!(matches!(value, Value::SignedInt(-100000)));
373        assert_eq!(rest.len(), 0);
374    }
375
376    #[test]
377    fn test_value_parse_unsigned_integers() {
378        // TINYINT UNSIGNED (200)
379        let type_and_flags = ColumnTypeAndFlags {
380            column_type: ColumnType::MYSQL_TYPE_TINY,
381            flags: ColumnFlags::UNSIGNED_FLAG,
382        };
383        let data = [200_u8];
384        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
385        assert!(matches!(value, Value::UnsignedInt(200)));
386        assert_eq!(rest.len(), 0);
387
388        // BIGINT UNSIGNED (large number)
389        let type_and_flags = ColumnTypeAndFlags {
390            column_type: ColumnType::MYSQL_TYPE_LONGLONG,
391            flags: ColumnFlags::UNSIGNED_FLAG,
392        };
393        let data = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x7F]; // i64::MAX
394        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
395        assert!(matches!(value, Value::UnsignedInt(9223372036854775807)));
396        assert_eq!(rest.len(), 0);
397    }
398
399    #[test]
400    fn test_value_parse_float_double() {
401        // FLOAT (3.14)
402        let type_and_flags = ColumnTypeAndFlags {
403            column_type: ColumnType::MYSQL_TYPE_FLOAT,
404            flags: ColumnFlags::empty(),
405        };
406        let data = 3.14f32.to_le_bytes();
407        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
408        if let Value::Float(f) = value {
409            assert!((f - 3.14).abs() < 0.001);
410        } else {
411            panic!("Expected Float value");
412        }
413        assert_eq!(rest.len(), 0);
414
415        // DOUBLE (3.141592653589793)
416        let type_and_flags = ColumnTypeAndFlags {
417            column_type: ColumnType::MYSQL_TYPE_DOUBLE,
418            flags: ColumnFlags::empty(),
419        };
420        let data = std::f64::consts::PI.to_le_bytes();
421        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
422        if let Value::Double(d) = value {
423            assert!((d - std::f64::consts::PI).abs() < 0.0000001);
424        } else {
425            panic!("Expected Double value");
426        }
427        assert_eq!(rest.len(), 0);
428    }
429
430    #[test]
431    fn test_value_parse_timestamp() {
432        let type_and_flags = ColumnTypeAndFlags {
433            column_type: ColumnType::MYSQL_TYPE_DATETIME,
434            flags: ColumnFlags::empty(),
435        };
436
437        // Timestamp0 (0000-00-00 00:00:00)
438        let data = [0_u8]; // length = 0
439        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
440        assert!(matches!(value, Value::Timestamp0));
441        assert_eq!(rest.len(), 0);
442
443        // Timestamp4 (2024-12-25)
444        let mut data = vec![4u8]; // length = 4
445        data.extend_from_slice(&2024u16.to_le_bytes()); // year
446        data.push(12); // month
447        data.push(25); // day
448        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
449        if let Value::Timestamp4(ts) = value {
450            assert_eq!(ts.year(), 2024);
451            assert_eq!(ts.month, 12);
452            assert_eq!(ts.day, 25);
453        } else {
454            panic!("Expected Timestamp4 value");
455        }
456        assert_eq!(rest.len(), 0);
457
458        // Timestamp7 (2024-12-25 15:30:45)
459        let mut data = vec![7u8]; // length = 7
460        data.extend_from_slice(&2024u16.to_le_bytes()); // year
461        data.push(12); // month
462        data.push(25); // day
463        data.push(15); // hour
464        data.push(30); // minute
465        data.push(45); // second
466        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
467        if let Value::Timestamp7(ts) = value {
468            assert_eq!(ts.year(), 2024);
469            assert_eq!(ts.month, 12);
470            assert_eq!(ts.day, 25);
471            assert_eq!(ts.hour, 15);
472            assert_eq!(ts.minute, 30);
473            assert_eq!(ts.second, 45);
474        } else {
475            panic!("Expected Timestamp7 value");
476        }
477        assert_eq!(rest.len(), 0);
478    }
479
480    #[test]
481    fn test_value_parse_time() {
482        let type_and_flags = ColumnTypeAndFlags {
483            column_type: ColumnType::MYSQL_TYPE_TIME,
484            flags: ColumnFlags::empty(),
485        };
486
487        // Time0 (00:00:00)
488        let data = [0_u8]; // length = 0
489        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
490        assert!(matches!(value, Value::Time0));
491        assert_eq!(rest.len(), 0);
492
493        // Time8 (negative, 1 day 12:30:45)
494        let mut data = vec![8u8]; // length = 8
495        data.push(1); // is_negative
496        data.extend_from_slice(&1u32.to_le_bytes()); // days
497        data.push(12); // hour
498        data.push(30); // minute
499        data.push(45); // second
500        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
501        if let Value::Time8(time) = value {
502            assert_eq!(time.is_negative(), true);
503            assert_eq!(time.days(), 1);
504            assert_eq!(time.hour, 12);
505            assert_eq!(time.minute, 30);
506            assert_eq!(time.second, 45);
507        } else {
508            panic!("Expected Time8 value");
509        }
510        assert_eq!(rest.len(), 0);
511    }
512
513    #[test]
514    fn test_value_parse_string() {
515        let type_and_flags = ColumnTypeAndFlags {
516            column_type: ColumnType::MYSQL_TYPE_VAR_STRING,
517            flags: ColumnFlags::empty(),
518        };
519
520        // Length-encoded string "Hello"
521        let mut data = vec![5u8]; // length = 5
522        data.extend_from_slice(b"Hello");
523        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
524        if let Value::Byte(bytes) = value {
525            assert_eq!(bytes, b"Hello");
526        } else {
527            panic!("Expected Byte value");
528        }
529        assert_eq!(rest.len(), 0);
530    }
531
532    #[test]
533    fn test_value_parse_blob() {
534        let type_and_flags = ColumnTypeAndFlags {
535            column_type: ColumnType::MYSQL_TYPE_BLOB,
536            flags: ColumnFlags::empty(),
537        };
538
539        // Length-encoded binary data
540        let mut data = vec![4u8]; // length = 4
541        data.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
542        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
543        if let Value::Byte(bytes) = value {
544            assert_eq!(bytes, &[0xDE, 0xAD, 0xBE, 0xEF]);
545        } else {
546            panic!("Expected Byte value");
547        }
548        assert_eq!(rest.len(), 0);
549    }
550
551    #[test]
552    fn test_value_parse_null() {
553        let type_and_flags = ColumnTypeAndFlags {
554            column_type: ColumnType::MYSQL_TYPE_NULL,
555            flags: ColumnFlags::empty(),
556        };
557
558        let data = []; // NULL takes no bytes
559        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
560        assert!(matches!(value, Value::Null));
561        assert_eq!(rest.len(), 0);
562    }
563
564    #[test]
565    fn test_value_parse_with_remaining_data() {
566        let type_and_flags = ColumnTypeAndFlags {
567            column_type: ColumnType::MYSQL_TYPE_TINY,
568            flags: ColumnFlags::UNSIGNED_FLAG,
569        };
570
571        let data = [42u8, 0xFF, 0xFF]; // 42 followed by extra data
572        let (value, rest) = Value::parse(&type_and_flags, &data).unwrap();
573        assert!(matches!(value, Value::UnsignedInt(42)));
574        assert_eq!(rest, &[0xFF, 0xFF]);
575    }
576
577    #[test]
578    fn test_null_bitmap_result_set() {
579        // Example bitmap for 8 columns with offset=2
580        // Bitmap bytes: [0b00000100, 0b00010000]
581        // With offset=2, this means:
582        // - Bit 0 (column -2, ignored) = 0
583        // - Bit 1 (column -1, ignored) = 0
584        // - Bit 2 (column 0) = 1 -> NULL
585        // - Bit 3 (column 1) = 0
586        // - Bit 4 (column 2) = 0
587        // - ...
588        // - Bit 12 (column 10) = 1 -> NULL
589        let bitmap = [0b00000100, 0b00010000];
590        let null_bitmap = NullBitmap::for_result_set(&bitmap);
591
592        assert!(null_bitmap.is_null(0)); // Bit 2
593        assert!(!null_bitmap.is_null(1)); // Bit 3
594        assert!(!null_bitmap.is_null(2)); // Bit 4
595        assert!(null_bitmap.is_null(10)); // Bit 12
596    }
597
598    #[test]
599    fn test_null_bitmap_parameters() {
600        // Example bitmap for parameters with offset=0
601        // Bitmap: [0b00000101]
602        // - Bit 0 (param 0) = 1 -> NULL
603        // - Bit 1 (param 1) = 0
604        // - Bit 2 (param 2) = 1 -> NULL
605        let bitmap = [0b00000101];
606        let null_bitmap = NullBitmap::for_parameters(&bitmap);
607
608        assert!(null_bitmap.is_null(0)); // Bit 0
609        assert!(!null_bitmap.is_null(1)); // Bit 1
610        assert!(null_bitmap.is_null(2)); // Bit 2
611        assert!(!null_bitmap.is_null(3)); // Bit 3
612    }
613}