mysql_binlog_connector_rust/event/table_map/
table_metadata.rs

1use std::io::{Cursor, Read};
2
3use byteorder::ReadBytesExt;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    binlog_error::BinlogError, column::column_type::ColumnType, ext::cursor_ext::CursorExt,
8};
9
10use super::{default_charset::DefaultCharset, metadata_type::MetadataType};
11
12/// Contains metadata for a single table column.
13#[derive(Clone, Debug, Deserialize, Serialize, Default)]
14pub struct ColumnMetadata {
15    /// Column name.
16    pub column_name: Option<String>,
17
18    /// Signedness of numeric column.
19    pub is_signed: Option<bool>,
20
21    /// Charset collation for character column.
22    pub charset_collation: Option<u32>,
23
24    /// String values for ENUM column.
25    pub enum_string_values: Option<Vec<String>>,
26
27    /// String values for SET column.
28    pub set_string_values: Option<Vec<String>>,
29
30    /// Real type for geometry column.
31    pub geometry_type: Option<u32>,
32
33    /// Whether this column is a simple primary key.
34    pub is_simple_primary_key: Option<bool>,
35
36    /// Primary key prefix length if this is a prefixed primary key.
37    pub primary_key_prefix: Option<u32>,
38
39    /// Charset collation for ENUM/SET column.
40    pub enum_and_set_charset_collation: Option<u32>,
41
42    /// Column visibility (for MySQL 8.0+ invisible columns).
43    pub is_visible: Option<bool>,
44}
45
46/// Contains metadata for table columns.
47/// Reference: https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Table__map__event.html
48#[derive(Clone, Debug, Deserialize, Serialize)]
49pub struct TableMetadata {
50    /// Default charset for character columns.
51    pub default_charset: Option<DefaultCharset>,
52
53    /// Default charset for ENUM and SET columns.
54    pub enum_and_set_default_charset: Option<DefaultCharset>,
55
56    /// Per-column metadata.
57    pub columns: Vec<ColumnMetadata>,
58}
59
60impl TableMetadata {
61    pub fn parse(
62        cursor: &mut Cursor<&Vec<u8>>,
63        column_types: &[u8],
64        column_metas: &[u16],
65    ) -> Result<Self, BinlogError> {
66        // Initialize columns with default values
67        let mut columns: Vec<ColumnMetadata> = (0..column_types.len())
68            .map(|_| ColumnMetadata::default())
69            .collect();
70
71        let mut default_charset = None;
72        let mut enum_and_set_default_charset = None;
73
74        while cursor.available() > 0 {
75            let metadata_type = MetadataType::from_code(cursor.read_u8()?)?;
76            let metadata_length = cursor.read_packed_number()?;
77
78            let mut metadata = vec![0u8; metadata_length];
79            cursor.read_exact(&mut metadata)?;
80
81            let mut buffer = Cursor::new(&metadata);
82            match metadata_type {
83                MetadataType::Signedness => {
84                    let signedness_values = read_signedness_bitmap(&mut buffer, column_types)?;
85                    apply_signedness_to_columns(&mut columns, column_types, &signedness_values);
86                }
87                MetadataType::DefaultCharset => {
88                    default_charset = Some(parse_default_charset(&mut buffer)?);
89                }
90                MetadataType::ColumnCharset => {
91                    parse_column_charsets(&mut columns, &mut buffer)?;
92                }
93                MetadataType::ColumnName => {
94                    parse_column_names(&mut columns, &mut buffer)?;
95                }
96                MetadataType::SetStrValue => {
97                    parse_set_string_values(&mut columns, &mut buffer, column_types, column_metas)?;
98                }
99                MetadataType::EnumStrValue => {
100                    parse_enum_string_values(
101                        &mut columns,
102                        &mut buffer,
103                        column_types,
104                        column_metas,
105                    )?;
106                }
107                MetadataType::GeometryType => {
108                    parse_geometry_types(&mut columns, &mut buffer)?;
109                }
110                MetadataType::SimplePrimaryKey => {
111                    parse_simple_primary_keys(&mut columns, &mut buffer)?;
112                }
113                MetadataType::PrimaryKeyWithPrefix => {
114                    parse_primary_key_prefixes(&mut columns, &mut buffer)?;
115                }
116                MetadataType::EnumAndSetDefaultCharset => {
117                    enum_and_set_default_charset = Some(parse_default_charset(&mut buffer)?);
118                }
119                MetadataType::EnumAndSetColumnCharset => {
120                    parse_enum_set_charsets(&mut columns, &mut buffer)?;
121                }
122                MetadataType::ColumnVisibility => {
123                    let visibility = read_bitmap_reverted(&mut buffer, column_types.len())?;
124                    apply_column_visibility(&mut columns, &visibility);
125                }
126            }
127        }
128
129        Ok(Self {
130            default_charset,
131            enum_and_set_default_charset,
132            columns,
133        })
134    }
135}
136
137fn parse_default_charset(cursor: &mut Cursor<&Vec<u8>>) -> Result<DefaultCharset, BinlogError> {
138    let default_collation = cursor.read_packed_number()?;
139    let mut charset_collations = Vec::new();
140    while cursor.available() > 0 {
141        let key = cursor.read_packed_number()? as u32;
142        let value = cursor.read_packed_number()? as u32;
143        charset_collations.push((key, value));
144    }
145    Ok(DefaultCharset::new(
146        default_collation as u32,
147        charset_collations,
148    ))
149}
150
151pub(crate) fn read_bitmap_reverted(
152    cursor: &mut Cursor<&Vec<u8>>,
153    bits_number: usize,
154) -> Result<Vec<bool>, BinlogError> {
155    let mut result = vec![false; bits_number];
156    let bytes_number = (bits_number + 7) / 8;
157    for i in 0..bytes_number {
158        let value = cursor.read_u8()?;
159        for y in 0..8 {
160            let index = (i << 3) + y;
161            if index == bits_number {
162                break;
163            }
164
165            // The difference from read_bits is that bits are reverted
166            result[index] = (value & (1 << (7 - y))) > 0;
167        }
168    }
169    Ok(result)
170}
171
172// Helper functions for applying metadata to columns
173
174fn read_signedness_bitmap(
175    cursor: &mut Cursor<&Vec<u8>>,
176    column_types: &[u8],
177) -> Result<Vec<bool>, BinlogError> {
178    let count = get_numeric_column_count(column_types)?;
179    read_bitmap_reverted(cursor, count)
180}
181
182fn apply_signedness_to_columns(
183    columns: &mut [ColumnMetadata],
184    column_types: &[u8],
185    signedness_values: &[bool],
186) {
187    let mut signedness_index = 0;
188    for (i, &column_type_code) in column_types.iter().enumerate() {
189        let column_type = ColumnType::from_code(column_type_code);
190        if is_numeric_type(column_type) && signedness_index < signedness_values.len() {
191            columns[i].is_signed = Some(signedness_values[signedness_index]);
192            signedness_index += 1;
193        }
194    }
195}
196
197fn parse_column_charsets(
198    columns: &mut [ColumnMetadata],
199    cursor: &mut Cursor<&Vec<u8>>,
200) -> Result<(), BinlogError> {
201    let mut index = 0;
202    while cursor.available() > 0 && index < columns.len() {
203        let charset = cursor.read_packed_number()? as u32;
204        columns[index].charset_collation = Some(charset);
205        index += 1;
206    }
207    Ok(())
208}
209
210fn parse_column_names(
211    columns: &mut [ColumnMetadata],
212    cursor: &mut Cursor<&Vec<u8>>,
213) -> Result<(), BinlogError> {
214    let mut index = 0;
215    while cursor.available() > 0 && index < columns.len() {
216        let length = cursor.read_packed_number()?;
217        let name = cursor.read_string(length)?;
218        columns[index].column_name = Some(name);
219        index += 1;
220    }
221    Ok(())
222}
223
224fn parse_set_string_values(
225    columns: &mut [ColumnMetadata],
226    cursor: &mut Cursor<&Vec<u8>>,
227    column_types: &[u8],
228    column_metas: &[u16],
229) -> Result<(), BinlogError> {
230    let mut set_column_index = 0;
231
232    while cursor.available() > 0 {
233        let length = cursor.read_packed_number()?;
234        let mut values = Vec::new();
235        for _ in 0..length {
236            let str_length = cursor.read_packed_number()?;
237            let value = cursor.read_string(str_length)?;
238            values.push(value);
239        }
240
241        // Find the set_column_index-th SET column and apply values to it
242        // SetStrValue metadata is provided in the order of columns that are actual SETs
243        let mut current_set_index = 0;
244        for i in 0..column_types.len() {
245            if is_set_column(column_types[i], column_metas[i]) {
246                if current_set_index == set_column_index {
247                    columns[i].set_string_values = Some(values);
248                    break;
249                }
250                current_set_index += 1;
251            }
252        }
253        set_column_index += 1;
254    }
255    Ok(())
256}
257
258fn parse_enum_string_values(
259    columns: &mut [ColumnMetadata],
260    cursor: &mut Cursor<&Vec<u8>>,
261    column_types: &[u8],
262    column_metas: &[u16],
263) -> Result<(), BinlogError> {
264    let mut enum_column_index = 0;
265
266    while cursor.available() > 0 {
267        let length = cursor.read_packed_number()?;
268        let mut values = Vec::new();
269        for _ in 0..length {
270            let str_length = cursor.read_packed_number()?;
271            let value = cursor.read_string(str_length)?;
272            values.push(value);
273        }
274
275        // Find the enum_column_index-th ENUM column and apply values to it
276        // EnumStrValue metadata is provided in the order of columns that are actual ENUMs
277        let mut current_enum_index = 0;
278        for i in 0..column_types.len() {
279            if is_enum_column(column_types[i], column_metas[i]) {
280                if current_enum_index == enum_column_index {
281                    columns[i].enum_string_values = Some(values);
282                    break;
283                }
284                current_enum_index += 1;
285            }
286        }
287        enum_column_index += 1;
288    }
289    Ok(())
290}
291
292fn parse_geometry_types(
293    columns: &mut [ColumnMetadata],
294    cursor: &mut Cursor<&Vec<u8>>,
295) -> Result<(), BinlogError> {
296    let mut index = 0;
297    while cursor.available() > 0 && index < columns.len() {
298        let geometry_type = cursor.read_packed_number()? as u32;
299        columns[index].geometry_type = Some(geometry_type);
300        index += 1;
301    }
302    Ok(())
303}
304
305fn parse_simple_primary_keys(
306    columns: &mut [ColumnMetadata],
307    cursor: &mut Cursor<&Vec<u8>>,
308) -> Result<(), BinlogError> {
309    while cursor.available() > 0 {
310        let pk_column = cursor.read_packed_number()?;
311        if let Some(column) = columns.get_mut(pk_column) {
312            column.is_simple_primary_key = Some(true);
313        }
314    }
315    Ok(())
316}
317
318fn parse_primary_key_prefixes(
319    columns: &mut [ColumnMetadata],
320    cursor: &mut Cursor<&Vec<u8>>,
321) -> Result<(), BinlogError> {
322    while cursor.available() > 0 {
323        let column_index = cursor.read_packed_number()?;
324        let prefix_length = cursor.read_packed_number()? as u32;
325        if let Some(column) = columns.get_mut(column_index) {
326            column.primary_key_prefix = Some(prefix_length);
327        }
328    }
329    Ok(())
330}
331
332fn parse_enum_set_charsets(
333    columns: &mut [ColumnMetadata],
334    cursor: &mut Cursor<&Vec<u8>>,
335) -> Result<(), BinlogError> {
336    let mut index = 0;
337    while cursor.available() > 0 && index < columns.len() {
338        let charset = cursor.read_packed_number()? as u32;
339        columns[index].enum_and_set_charset_collation = Some(charset);
340        index += 1;
341    }
342    Ok(())
343}
344
345fn apply_column_visibility(columns: &mut [ColumnMetadata], visibility: &[bool]) {
346    for (i, &is_visible) in visibility.iter().enumerate().take(columns.len()) {
347        columns[i].is_visible = Some(is_visible);
348    }
349}
350
351fn is_numeric_type(column_type: ColumnType) -> bool {
352    matches!(
353        column_type,
354        ColumnType::Tiny
355            | ColumnType::Short
356            | ColumnType::Int24
357            | ColumnType::Long
358            | ColumnType::LongLong
359            | ColumnType::Float
360            | ColumnType::Double
361            | ColumnType::NewDecimal
362    )
363}
364
365fn is_enum_column(column_type_code: u8, column_meta: u16) -> bool {
366    if column_type_code == ColumnType::String as u8 {
367        if let Ok((real_column_type, _)) =
368            ColumnType::parse_string_column_meta(column_meta, column_type_code)
369        {
370            return real_column_type == ColumnType::Enum as u8;
371        }
372    }
373    false
374}
375
376fn is_set_column(column_type_code: u8, column_meta: u16) -> bool {
377    if column_type_code == ColumnType::String as u8 {
378        if let Ok((real_column_type, _)) =
379            ColumnType::parse_string_column_meta(column_meta, column_type_code)
380        {
381            return real_column_type == ColumnType::Set as u8;
382        }
383    }
384    false
385}
386
387pub(crate) fn get_numeric_column_count(column_types: &[u8]) -> Result<usize, BinlogError> {
388    let mut count = 0;
389    for &column_type_code in column_types {
390        let column_type = ColumnType::from_code(column_type_code);
391        if is_numeric_type(column_type) {
392            count += 1;
393        }
394    }
395    Ok(count)
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::io::Cursor;
402
403    fn create_test_column_types() -> Vec<u8> {
404        vec![
405            1,   // MYSQL_TYPE_TINY (numeric, index 0)
406            3,   // MYSQL_TYPE_LONG (numeric, index 1)
407            4,   // MYSQL_TYPE_FLOAT (numeric, index 2)
408            5,   // MYSQL_TYPE_DOUBLE (numeric, index 3)
409            246, // MYSQL_TYPE_NEWDECIMAL (numeric, index 4)
410            253, // MYSQL_TYPE_VAR_STRING (non-numeric, index 5)
411            254, // MYSQL_TYPE_STRING (non-numeric, index 6)
412        ]
413    }
414
415    fn create_test_column_metas() -> Vec<u16> {
416        vec![
417            0, // TINY - no metadata
418            0, // LONG - no metadata
419            0, // FLOAT - no metadata
420            0, // DOUBLE - no metadata
421            0, // NEWDECIMAL - no metadata
422            0, // VAR_STRING - no metadata
423            0, // STRING - no metadata
424        ]
425    }
426
427    #[test]
428    fn test_get_numeric_column_count() {
429        let column_types = create_test_column_types();
430        let count = get_numeric_column_count(&column_types).unwrap();
431        // Should count TINY, LONG, FLOAT, DOUBLE, NEWDECIMAL = 5 numeric columns
432        assert_eq!(count, 5);
433    }
434
435    #[test]
436    fn test_parse_signedness_metadata() {
437        // Create test data for signedness metadata
438        let test_data = vec![
439            1,          // MetadataType::Signedness
440            1,          // Length of metadata (1 byte)
441            0b11010000, // Signedness bitmap: bits 7,6,4 are set (reverted), meaning numeric columns 0,1,3 are signed
442        ];
443
444        let column_types = create_test_column_types();
445        let column_metas = create_test_column_metas();
446        let mut cursor = Cursor::new(&test_data);
447        let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
448
449        assert_eq!(result.columns.len(), 7); // Total columns
450
451        // Check signedness for numeric columns only
452        assert_eq!(result.columns[0].is_signed, Some(true)); // TINY - bit 7 -> signed
453        assert_eq!(result.columns[1].is_signed, Some(true)); // LONG - bit 6 -> signed
454        assert_eq!(result.columns[2].is_signed, Some(false)); // FLOAT - bit 5 -> unsigned
455        assert_eq!(result.columns[3].is_signed, Some(true)); // DOUBLE - bit 4 -> signed
456        assert_eq!(result.columns[4].is_signed, Some(false)); // NEWDECIMAL - bit 3 -> unsigned
457
458        // Non-numeric columns should not have signedness
459        assert_eq!(result.columns[5].is_signed, None); // VAR_STRING
460        assert_eq!(result.columns[6].is_signed, None); // STRING
461    }
462
463    #[test]
464    fn test_parse_column_names_metadata() {
465        // Create test data for column names metadata
466        let test_data = vec![
467            4, // MetadataType::ColumnName
468            3, // Length of metadata (1 byte length + 2 bytes content)
469            2, b'i', b'd', // First column name: "id"
470        ];
471
472        let column_types = create_test_column_types();
473        let column_metas = create_test_column_metas();
474        let mut cursor = Cursor::new(&test_data);
475        let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
476
477        assert_eq!(result.columns.len(), 7);
478
479        // Only first column should have a name
480        assert_eq!(result.columns[0].column_name, Some("id".to_string()));
481
482        // Other columns should not have names
483        for i in 1..7 {
484            assert_eq!(result.columns[i].column_name, None);
485        }
486    }
487
488    #[test]
489    fn test_parse_default_charset_metadata() {
490        // Create test data for default charset metadata
491        let test_data = vec![
492            2,  // MetadataType::DefaultCharset
493            1,  // Length of metadata (just default collation)
494            33, // Default collation (utf8_general_ci = 33)
495        ];
496
497        let column_types = create_test_column_types();
498        let column_metas = create_test_column_metas();
499        let mut cursor = Cursor::new(&test_data);
500        let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
501
502        assert!(result.default_charset.is_some());
503        let default_charset = result.default_charset.unwrap();
504        assert_eq!(default_charset.default_charset_collation, 33);
505        assert_eq!(default_charset.charset_collations.len(), 0);
506    }
507
508    #[test]
509    fn test_parse_enum_string_values_metadata() {
510        // Create test data for ENUM string values
511        let test_data = vec![
512            6, // MetadataType::EnumStrValue
513            7, // Length of metadata (1 + 1 + 5 = 7)
514            1, // Number of values for first enum
515            5, b's', b'm', b'a', b'l', b'l', // "small"
516        ];
517
518        // Create column types with an actual ENUM column (String type with ENUM metadata)
519        let column_types = vec![
520            1,   // MYSQL_TYPE_TINY (numeric, index 0)
521            254, // MYSQL_TYPE_STRING - this will be decoded as ENUM (index 1)
522            3,   // MYSQL_TYPE_LONG (numeric, index 2)
523        ];
524
525        // Create column metadata with ENUM encoding for the String column
526        // For ENUM: high byte = 247 (ColumnType::Enum), low byte = number of values
527        let column_metas = vec![
528            0,              // TINY - no metadata
529            (247 << 8) | 1, // STRING with ENUM metadata (247 = ENUM type, 1 value)
530            0,              // LONG - no metadata
531        ];
532
533        let mut cursor = Cursor::new(&test_data);
534        let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
535
536        assert_eq!(result.columns.len(), 3);
537
538        // First column (TINY) should not have enum values
539        assert!(result.columns[0].enum_string_values.is_none());
540
541        // Second column (ENUM) should have enum values
542        assert!(result.columns[1].enum_string_values.is_some());
543        let enum_values = result.columns[1].enum_string_values.as_ref().unwrap();
544        assert_eq!(enum_values.len(), 1);
545        assert_eq!(enum_values[0], "small");
546
547        // Third column (LONG) should not have enum values
548        assert!(result.columns[2].enum_string_values.is_none());
549    }
550
551    #[test]
552    fn test_parse_multiple_metadata_types() {
553        // Create test data with multiple metadata types
554        let test_data = vec![
555            // Signedness metadata
556            1,          // MetadataType::Signedness
557            1,          // Length
558            0b10100000, // Bitmap (bit 7 and 5 set)
559            // Column names metadata
560            4, // MetadataType::ColumnName
561            3, // Length (1 byte length + 2 bytes content)
562            2, b'i', b'd', // "id"
563        ];
564
565        let column_types = create_test_column_types();
566        let column_metas = create_test_column_metas();
567        let mut cursor = Cursor::new(&test_data);
568        let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
569
570        assert_eq!(result.columns.len(), 7);
571
572        // Check signedness was applied
573        assert_eq!(result.columns[0].is_signed, Some(true)); // TINY - bit 7 set
574        assert_eq!(result.columns[1].is_signed, Some(false)); // LONG - bit 6 not set
575        assert_eq!(result.columns[2].is_signed, Some(true)); // FLOAT - bit 5 set
576
577        // Check column name was applied
578        assert_eq!(result.columns[0].column_name, Some("id".to_string()));
579        assert_eq!(result.columns[1].column_name, None);
580    }
581
582    #[test]
583    fn test_parse_empty_metadata() {
584        let test_data = vec![];
585        let column_types = create_test_column_types();
586        let column_metas = create_test_column_metas();
587        let mut cursor = Cursor::new(&test_data);
588        let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
589
590        // Should have all columns initialized but with no metadata
591        assert_eq!(result.columns.len(), 7);
592        assert!(result.default_charset.is_none());
593        assert!(result.enum_and_set_default_charset.is_none());
594
595        // All column metadata should be None
596        for column in &result.columns {
597            assert_eq!(column.column_name, None);
598            assert_eq!(column.is_signed, None);
599            assert_eq!(column.charset_collation, None);
600            assert_eq!(column.enum_string_values, None);
601            assert_eq!(column.set_string_values, None);
602            assert_eq!(column.geometry_type, None);
603            assert_eq!(column.is_simple_primary_key, None);
604            assert_eq!(column.primary_key_prefix, None);
605            assert_eq!(column.enum_and_set_charset_collation, None);
606            assert_eq!(column.is_visible, None);
607        }
608    }
609
610    #[test]
611    fn test_read_bitmap_reverted() {
612        // Test the bitmap reading with reverted bits
613        let test_data = vec![0b11010000]; // Binary: 11010000
614        let mut cursor = Cursor::new(&test_data);
615        let result = read_bitmap_reverted(&mut cursor, 5).unwrap();
616
617        assert_eq!(result.len(), 5);
618        assert_eq!(result[0], true); // bit 7
619        assert_eq!(result[1], true); // bit 6
620        assert_eq!(result[2], false); // bit 5
621        assert_eq!(result[3], true); // bit 4
622        assert_eq!(result[4], false); // bit 3
623    }
624
625    #[test]
626    fn test_column_metadata_creation() {
627        let column = ColumnMetadata::default();
628        assert_eq!(column.column_name, None);
629        assert_eq!(column.is_signed, None);
630        assert_eq!(column.charset_collation, None);
631        assert_eq!(column.enum_string_values, None);
632        assert_eq!(column.set_string_values, None);
633        assert_eq!(column.geometry_type, None);
634        assert_eq!(column.is_simple_primary_key, None);
635        assert_eq!(column.primary_key_prefix, None);
636        assert_eq!(column.enum_and_set_charset_collation, None);
637        assert_eq!(column.is_visible, None);
638    }
639}