1use std::io::{Cursor, Read};
2
3use byteorder::ReadBytesExt;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 binlog_error::BinlogError, column::column_type::ColumnType, ext::cursor_ext::CursorExt,
8};
9
10use super::{default_charset::DefaultCharset, metadata_type::MetadataType};
11
12#[derive(Clone, Debug, Deserialize, Serialize, Default)]
14pub struct ColumnMetadata {
15 pub column_name: Option<String>,
17
18 pub is_signed: Option<bool>,
20
21 pub charset_collation: Option<u32>,
23
24 pub enum_string_values: Option<Vec<String>>,
26
27 pub set_string_values: Option<Vec<String>>,
29
30 pub geometry_type: Option<u32>,
32
33 pub is_simple_primary_key: Option<bool>,
35
36 pub primary_key_prefix: Option<u32>,
38
39 pub enum_and_set_charset_collation: Option<u32>,
41
42 pub is_visible: Option<bool>,
44}
45
46#[derive(Clone, Debug, Deserialize, Serialize)]
49pub struct TableMetadata {
50 pub default_charset: Option<DefaultCharset>,
52
53 pub enum_and_set_default_charset: Option<DefaultCharset>,
55
56 pub columns: Vec<ColumnMetadata>,
58}
59
60impl TableMetadata {
61 pub fn parse(
62 cursor: &mut Cursor<&Vec<u8>>,
63 column_types: &[u8],
64 column_metas: &[u16],
65 ) -> Result<Self, BinlogError> {
66 let mut columns: Vec<ColumnMetadata> = (0..column_types.len())
68 .map(|_| ColumnMetadata::default())
69 .collect();
70
71 let mut default_charset = None;
72 let mut enum_and_set_default_charset = None;
73
74 while cursor.available() > 0 {
75 let metadata_type = MetadataType::from_code(cursor.read_u8()?)?;
76 let metadata_length = cursor.read_packed_number()?;
77
78 let mut metadata = vec![0u8; metadata_length];
79 cursor.read_exact(&mut metadata)?;
80
81 let mut buffer = Cursor::new(&metadata);
82 match metadata_type {
83 MetadataType::Signedness => {
84 let signedness_values = read_signedness_bitmap(&mut buffer, column_types)?;
85 apply_signedness_to_columns(&mut columns, column_types, &signedness_values);
86 }
87 MetadataType::DefaultCharset => {
88 default_charset = Some(parse_default_charset(&mut buffer)?);
89 }
90 MetadataType::ColumnCharset => {
91 parse_column_charsets(&mut columns, &mut buffer)?;
92 }
93 MetadataType::ColumnName => {
94 parse_column_names(&mut columns, &mut buffer)?;
95 }
96 MetadataType::SetStrValue => {
97 parse_set_string_values(&mut columns, &mut buffer, column_types, column_metas)?;
98 }
99 MetadataType::EnumStrValue => {
100 parse_enum_string_values(
101 &mut columns,
102 &mut buffer,
103 column_types,
104 column_metas,
105 )?;
106 }
107 MetadataType::GeometryType => {
108 parse_geometry_types(&mut columns, &mut buffer)?;
109 }
110 MetadataType::SimplePrimaryKey => {
111 parse_simple_primary_keys(&mut columns, &mut buffer)?;
112 }
113 MetadataType::PrimaryKeyWithPrefix => {
114 parse_primary_key_prefixes(&mut columns, &mut buffer)?;
115 }
116 MetadataType::EnumAndSetDefaultCharset => {
117 enum_and_set_default_charset = Some(parse_default_charset(&mut buffer)?);
118 }
119 MetadataType::EnumAndSetColumnCharset => {
120 parse_enum_set_charsets(&mut columns, &mut buffer)?;
121 }
122 MetadataType::ColumnVisibility => {
123 let visibility = read_bitmap_reverted(&mut buffer, column_types.len())?;
124 apply_column_visibility(&mut columns, &visibility);
125 }
126 }
127 }
128
129 Ok(Self {
130 default_charset,
131 enum_and_set_default_charset,
132 columns,
133 })
134 }
135}
136
137fn parse_default_charset(cursor: &mut Cursor<&Vec<u8>>) -> Result<DefaultCharset, BinlogError> {
138 let default_collation = cursor.read_packed_number()?;
139 let mut charset_collations = Vec::new();
140 while cursor.available() > 0 {
141 let key = cursor.read_packed_number()? as u32;
142 let value = cursor.read_packed_number()? as u32;
143 charset_collations.push((key, value));
144 }
145 Ok(DefaultCharset::new(
146 default_collation as u32,
147 charset_collations,
148 ))
149}
150
151pub(crate) fn read_bitmap_reverted(
152 cursor: &mut Cursor<&Vec<u8>>,
153 bits_number: usize,
154) -> Result<Vec<bool>, BinlogError> {
155 let mut result = vec![false; bits_number];
156 let bytes_number = (bits_number + 7) / 8;
157 for i in 0..bytes_number {
158 let value = cursor.read_u8()?;
159 for y in 0..8 {
160 let index = (i << 3) + y;
161 if index == bits_number {
162 break;
163 }
164
165 result[index] = (value & (1 << (7 - y))) > 0;
167 }
168 }
169 Ok(result)
170}
171
172fn read_signedness_bitmap(
175 cursor: &mut Cursor<&Vec<u8>>,
176 column_types: &[u8],
177) -> Result<Vec<bool>, BinlogError> {
178 let count = get_numeric_column_count(column_types)?;
179 read_bitmap_reverted(cursor, count)
180}
181
182fn apply_signedness_to_columns(
183 columns: &mut [ColumnMetadata],
184 column_types: &[u8],
185 signedness_values: &[bool],
186) {
187 let mut signedness_index = 0;
188 for (i, &column_type_code) in column_types.iter().enumerate() {
189 let column_type = ColumnType::from_code(column_type_code);
190 if is_numeric_type(column_type) && signedness_index < signedness_values.len() {
191 columns[i].is_signed = Some(signedness_values[signedness_index]);
192 signedness_index += 1;
193 }
194 }
195}
196
197fn parse_column_charsets(
198 columns: &mut [ColumnMetadata],
199 cursor: &mut Cursor<&Vec<u8>>,
200) -> Result<(), BinlogError> {
201 let mut index = 0;
202 while cursor.available() > 0 && index < columns.len() {
203 let charset = cursor.read_packed_number()? as u32;
204 columns[index].charset_collation = Some(charset);
205 index += 1;
206 }
207 Ok(())
208}
209
210fn parse_column_names(
211 columns: &mut [ColumnMetadata],
212 cursor: &mut Cursor<&Vec<u8>>,
213) -> Result<(), BinlogError> {
214 let mut index = 0;
215 while cursor.available() > 0 && index < columns.len() {
216 let length = cursor.read_packed_number()?;
217 let name = cursor.read_string(length)?;
218 columns[index].column_name = Some(name);
219 index += 1;
220 }
221 Ok(())
222}
223
224fn parse_set_string_values(
225 columns: &mut [ColumnMetadata],
226 cursor: &mut Cursor<&Vec<u8>>,
227 column_types: &[u8],
228 column_metas: &[u16],
229) -> Result<(), BinlogError> {
230 let mut set_column_index = 0;
231
232 while cursor.available() > 0 {
233 let length = cursor.read_packed_number()?;
234 let mut values = Vec::new();
235 for _ in 0..length {
236 let str_length = cursor.read_packed_number()?;
237 let value = cursor.read_string(str_length)?;
238 values.push(value);
239 }
240
241 let mut current_set_index = 0;
244 for i in 0..column_types.len() {
245 if is_set_column(column_types[i], column_metas[i]) {
246 if current_set_index == set_column_index {
247 columns[i].set_string_values = Some(values);
248 break;
249 }
250 current_set_index += 1;
251 }
252 }
253 set_column_index += 1;
254 }
255 Ok(())
256}
257
258fn parse_enum_string_values(
259 columns: &mut [ColumnMetadata],
260 cursor: &mut Cursor<&Vec<u8>>,
261 column_types: &[u8],
262 column_metas: &[u16],
263) -> Result<(), BinlogError> {
264 let mut enum_column_index = 0;
265
266 while cursor.available() > 0 {
267 let length = cursor.read_packed_number()?;
268 let mut values = Vec::new();
269 for _ in 0..length {
270 let str_length = cursor.read_packed_number()?;
271 let value = cursor.read_string(str_length)?;
272 values.push(value);
273 }
274
275 let mut current_enum_index = 0;
278 for i in 0..column_types.len() {
279 if is_enum_column(column_types[i], column_metas[i]) {
280 if current_enum_index == enum_column_index {
281 columns[i].enum_string_values = Some(values);
282 break;
283 }
284 current_enum_index += 1;
285 }
286 }
287 enum_column_index += 1;
288 }
289 Ok(())
290}
291
292fn parse_geometry_types(
293 columns: &mut [ColumnMetadata],
294 cursor: &mut Cursor<&Vec<u8>>,
295) -> Result<(), BinlogError> {
296 let mut index = 0;
297 while cursor.available() > 0 && index < columns.len() {
298 let geometry_type = cursor.read_packed_number()? as u32;
299 columns[index].geometry_type = Some(geometry_type);
300 index += 1;
301 }
302 Ok(())
303}
304
305fn parse_simple_primary_keys(
306 columns: &mut [ColumnMetadata],
307 cursor: &mut Cursor<&Vec<u8>>,
308) -> Result<(), BinlogError> {
309 while cursor.available() > 0 {
310 let pk_column = cursor.read_packed_number()?;
311 if let Some(column) = columns.get_mut(pk_column) {
312 column.is_simple_primary_key = Some(true);
313 }
314 }
315 Ok(())
316}
317
318fn parse_primary_key_prefixes(
319 columns: &mut [ColumnMetadata],
320 cursor: &mut Cursor<&Vec<u8>>,
321) -> Result<(), BinlogError> {
322 while cursor.available() > 0 {
323 let column_index = cursor.read_packed_number()?;
324 let prefix_length = cursor.read_packed_number()? as u32;
325 if let Some(column) = columns.get_mut(column_index) {
326 column.primary_key_prefix = Some(prefix_length);
327 }
328 }
329 Ok(())
330}
331
332fn parse_enum_set_charsets(
333 columns: &mut [ColumnMetadata],
334 cursor: &mut Cursor<&Vec<u8>>,
335) -> Result<(), BinlogError> {
336 let mut index = 0;
337 while cursor.available() > 0 && index < columns.len() {
338 let charset = cursor.read_packed_number()? as u32;
339 columns[index].enum_and_set_charset_collation = Some(charset);
340 index += 1;
341 }
342 Ok(())
343}
344
345fn apply_column_visibility(columns: &mut [ColumnMetadata], visibility: &[bool]) {
346 for (i, &is_visible) in visibility.iter().enumerate().take(columns.len()) {
347 columns[i].is_visible = Some(is_visible);
348 }
349}
350
351fn is_numeric_type(column_type: ColumnType) -> bool {
352 matches!(
353 column_type,
354 ColumnType::Tiny
355 | ColumnType::Short
356 | ColumnType::Int24
357 | ColumnType::Long
358 | ColumnType::LongLong
359 | ColumnType::Float
360 | ColumnType::Double
361 | ColumnType::NewDecimal
362 )
363}
364
365fn is_enum_column(column_type_code: u8, column_meta: u16) -> bool {
366 if column_type_code == ColumnType::String as u8 {
367 if let Ok((real_column_type, _)) =
368 ColumnType::parse_string_column_meta(column_meta, column_type_code)
369 {
370 return real_column_type == ColumnType::Enum as u8;
371 }
372 }
373 false
374}
375
376fn is_set_column(column_type_code: u8, column_meta: u16) -> bool {
377 if column_type_code == ColumnType::String as u8 {
378 if let Ok((real_column_type, _)) =
379 ColumnType::parse_string_column_meta(column_meta, column_type_code)
380 {
381 return real_column_type == ColumnType::Set as u8;
382 }
383 }
384 false
385}
386
387pub(crate) fn get_numeric_column_count(column_types: &[u8]) -> Result<usize, BinlogError> {
388 let mut count = 0;
389 for &column_type_code in column_types {
390 let column_type = ColumnType::from_code(column_type_code);
391 if is_numeric_type(column_type) {
392 count += 1;
393 }
394 }
395 Ok(count)
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use std::io::Cursor;
402
403 fn create_test_column_types() -> Vec<u8> {
404 vec![
405 1, 3, 4, 5, 246, 253, 254, ]
413 }
414
415 fn create_test_column_metas() -> Vec<u16> {
416 vec![
417 0, 0, 0, 0, 0, 0, 0, ]
425 }
426
427 #[test]
428 fn test_get_numeric_column_count() {
429 let column_types = create_test_column_types();
430 let count = get_numeric_column_count(&column_types).unwrap();
431 assert_eq!(count, 5);
433 }
434
435 #[test]
436 fn test_parse_signedness_metadata() {
437 let test_data = vec![
439 1, 1, 0b11010000, ];
443
444 let column_types = create_test_column_types();
445 let column_metas = create_test_column_metas();
446 let mut cursor = Cursor::new(&test_data);
447 let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
448
449 assert_eq!(result.columns.len(), 7); assert_eq!(result.columns[0].is_signed, Some(true)); assert_eq!(result.columns[1].is_signed, Some(true)); assert_eq!(result.columns[2].is_signed, Some(false)); assert_eq!(result.columns[3].is_signed, Some(true)); assert_eq!(result.columns[4].is_signed, Some(false)); assert_eq!(result.columns[5].is_signed, None); assert_eq!(result.columns[6].is_signed, None); }
462
463 #[test]
464 fn test_parse_column_names_metadata() {
465 let test_data = vec![
467 4, 3, 2, b'i', b'd', ];
471
472 let column_types = create_test_column_types();
473 let column_metas = create_test_column_metas();
474 let mut cursor = Cursor::new(&test_data);
475 let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
476
477 assert_eq!(result.columns.len(), 7);
478
479 assert_eq!(result.columns[0].column_name, Some("id".to_string()));
481
482 for i in 1..7 {
484 assert_eq!(result.columns[i].column_name, None);
485 }
486 }
487
488 #[test]
489 fn test_parse_default_charset_metadata() {
490 let test_data = vec![
492 2, 1, 33, ];
496
497 let column_types = create_test_column_types();
498 let column_metas = create_test_column_metas();
499 let mut cursor = Cursor::new(&test_data);
500 let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
501
502 assert!(result.default_charset.is_some());
503 let default_charset = result.default_charset.unwrap();
504 assert_eq!(default_charset.default_charset_collation, 33);
505 assert_eq!(default_charset.charset_collations.len(), 0);
506 }
507
508 #[test]
509 fn test_parse_enum_string_values_metadata() {
510 let test_data = vec![
512 6, 7, 1, 5, b's', b'm', b'a', b'l', b'l', ];
517
518 let column_types = vec![
520 1, 254, 3, ];
524
525 let column_metas = vec![
528 0, (247 << 8) | 1, 0, ];
532
533 let mut cursor = Cursor::new(&test_data);
534 let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
535
536 assert_eq!(result.columns.len(), 3);
537
538 assert!(result.columns[0].enum_string_values.is_none());
540
541 assert!(result.columns[1].enum_string_values.is_some());
543 let enum_values = result.columns[1].enum_string_values.as_ref().unwrap();
544 assert_eq!(enum_values.len(), 1);
545 assert_eq!(enum_values[0], "small");
546
547 assert!(result.columns[2].enum_string_values.is_none());
549 }
550
551 #[test]
552 fn test_parse_multiple_metadata_types() {
553 let test_data = vec![
555 1, 1, 0b10100000, 4, 3, 2, b'i', b'd', ];
564
565 let column_types = create_test_column_types();
566 let column_metas = create_test_column_metas();
567 let mut cursor = Cursor::new(&test_data);
568 let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
569
570 assert_eq!(result.columns.len(), 7);
571
572 assert_eq!(result.columns[0].is_signed, Some(true)); assert_eq!(result.columns[1].is_signed, Some(false)); assert_eq!(result.columns[2].is_signed, Some(true)); assert_eq!(result.columns[0].column_name, Some("id".to_string()));
579 assert_eq!(result.columns[1].column_name, None);
580 }
581
582 #[test]
583 fn test_parse_empty_metadata() {
584 let test_data = vec![];
585 let column_types = create_test_column_types();
586 let column_metas = create_test_column_metas();
587 let mut cursor = Cursor::new(&test_data);
588 let result = TableMetadata::parse(&mut cursor, &column_types, &column_metas).unwrap();
589
590 assert_eq!(result.columns.len(), 7);
592 assert!(result.default_charset.is_none());
593 assert!(result.enum_and_set_default_charset.is_none());
594
595 for column in &result.columns {
597 assert_eq!(column.column_name, None);
598 assert_eq!(column.is_signed, None);
599 assert_eq!(column.charset_collation, None);
600 assert_eq!(column.enum_string_values, None);
601 assert_eq!(column.set_string_values, None);
602 assert_eq!(column.geometry_type, None);
603 assert_eq!(column.is_simple_primary_key, None);
604 assert_eq!(column.primary_key_prefix, None);
605 assert_eq!(column.enum_and_set_charset_collation, None);
606 assert_eq!(column.is_visible, None);
607 }
608 }
609
610 #[test]
611 fn test_read_bitmap_reverted() {
612 let test_data = vec![0b11010000]; let mut cursor = Cursor::new(&test_data);
615 let result = read_bitmap_reverted(&mut cursor, 5).unwrap();
616
617 assert_eq!(result.len(), 5);
618 assert_eq!(result[0], true); assert_eq!(result[1], true); assert_eq!(result[2], false); assert_eq!(result[3], true); assert_eq!(result[4], false); }
624
625 #[test]
626 fn test_column_metadata_creation() {
627 let column = ColumnMetadata::default();
628 assert_eq!(column.column_name, None);
629 assert_eq!(column.is_signed, None);
630 assert_eq!(column.charset_collation, None);
631 assert_eq!(column.enum_string_values, None);
632 assert_eq!(column.set_string_values, None);
633 assert_eq!(column.geometry_type, None);
634 assert_eq!(column.is_simple_primary_key, None);
635 assert_eq!(column.primary_key_prefix, None);
636 assert_eq!(column.enum_and_set_charset_collation, None);
637 assert_eq!(column.is_visible, None);
638 }
639}