mysql_binlog_connector_rust/column/
column_value.rs

1use super::column_type::ColumnType;
2use crate::{binlog_error::BinlogError, ext::cursor_ext::CursorExt};
3use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5use std::io::{Cursor, Read};
6
7#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
8pub enum ColumnValue {
9    None,
10    // A 8 bit signed integer
11    Tiny(i8),
12    // A 16 bit signed integer
13    Short(i16),
14    // A 32 bit signed integer
15    Long(i32),
16    // A 64 bit signed integer
17    LongLong(i64),
18    // A 32 bit floating point number
19    Float(f32),
20    // A 64 bit floating point number
21    Double(f64),
22    // A decimal value
23    Decimal(String),
24    // A datatype to store a time value
25    Time(String),
26    // A datatype to store a date value
27    Date(String),
28    // A datatype containing timestamp values ranging from
29    // '1000-01-01 00:00:00' to '9999-12-31 23:59:59'.
30    DateTime(String),
31    // A datatype containing timestamp values ranging from
32    // 1970-01-01 00:00:01' UTC to '2038-01-19 03:14:07' UTC.
33    // MySQL converts TIMESTAMP values from the current time zone to UTC for storage,
34    // and back from UTC to the current time zone for retrieval.
35    // (This does not occur for other types such as DATETIME.)
36    // refer: https://dev.mysql.com/doc/refman/8.0/en/datetime.html
37    Timestamp(i64),
38    // A datatype to store year with a range of 1901 to 2155,
39    // refer: https://dev.mysql.com/doc/refman/8.0/en/year.html
40    Year(u16),
41    // A datatype for string values
42    String(Vec<u8>),
43    // A datatype containing binary large objects
44    Blob(Vec<u8>),
45    // A datatype containing a set of bit
46    Bit(u64),
47    // A user defined set type
48    // refer: https://dev.mysql.com/doc/refman/8.0/en/set.html
49    // A SET column can have a maximum of 64 distinct members.
50    Set(u64),
51    // A user defined enum type
52    // refer: https://dev.mysql.com/doc/refman/8.0/en/enum.html
53    // An ENUM column can have a maximum of 65,535 distinct elements.
54    Enum(u32),
55    Json(Vec<u8>),
56}
57
58const DIG_PER_DEC: usize = 9;
59const COMPRESSED_BYTES: [usize; 10] = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4];
60
61impl ColumnValue {
62    // refer: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
63    pub fn parse(
64        cursor: &mut Cursor<&Vec<u8>>,
65        column_type: ColumnType,
66        column_meta: u16,
67        column_length: u16,
68    ) -> Result<Self, BinlogError> {
69        let value = match column_type {
70            ColumnType::Bit => ColumnValue::Bit(Self::parse_bit(cursor, column_meta)?),
71
72            ColumnType::Tiny => ColumnValue::Tiny(cursor.read_i8()?),
73
74            ColumnType::Short => ColumnValue::Short(cursor.read_i16::<LittleEndian>()?),
75
76            ColumnType::Int24 => ColumnValue::Long(cursor.read_i24::<LittleEndian>()?),
77
78            ColumnType::Long => ColumnValue::Long(cursor.read_i32::<LittleEndian>()?),
79
80            ColumnType::LongLong => ColumnValue::LongLong(cursor.read_i64::<LittleEndian>()?),
81
82            ColumnType::Float => ColumnValue::Float(cursor.read_f32::<LittleEndian>()?),
83
84            ColumnType::Double => ColumnValue::Double(cursor.read_f64::<LittleEndian>()?),
85
86            ColumnType::NewDecimal => {
87                let precision = (column_meta & 0xFF) as usize;
88                let scale = (column_meta >> 8) as usize;
89                ColumnValue::Decimal(Self::parse_decimal(cursor, precision, scale)?)
90            }
91
92            ColumnType::Date => ColumnValue::Date(Self::parse_date(cursor)?),
93
94            ColumnType::Time => ColumnValue::Time(Self::parse_time(cursor)?),
95
96            ColumnType::Time2 => ColumnValue::Time(Self::parse_time2(cursor, column_meta)?),
97
98            ColumnType::TimeStamp => ColumnValue::Timestamp(Self::parse_timestamp(cursor)?),
99
100            ColumnType::TimeStamp2 => {
101                ColumnValue::Timestamp(Self::parse_timestamp2(cursor, column_meta)?)
102            }
103
104            ColumnType::DateTime => ColumnValue::DateTime(Self::parse_datetime(cursor)?),
105
106            ColumnType::DateTime2 => {
107                ColumnValue::DateTime(Self::parse_datetime2(cursor, column_meta)?)
108            }
109
110            ColumnType::Year => ColumnValue::Year(cursor.read_u8()? as u16 + 1900),
111
112            ColumnType::VarChar | ColumnType::VarString => {
113                ColumnValue::String(Self::parse_string(cursor, column_meta)?)
114            }
115
116            ColumnType::String => ColumnValue::String(Self::parse_string(cursor, column_length)?),
117
118            ColumnType::Blob
119            | ColumnType::Geometry
120            | ColumnType::TinyBlob
121            | ColumnType::MediumBlob
122            | ColumnType::LongBlob => ColumnValue::Blob(Self::parse_blob(cursor, column_meta)?),
123
124            ColumnType::Enum => {
125                ColumnValue::Enum(cursor.read_int::<LittleEndian>(column_length as usize)? as u32)
126            }
127
128            ColumnType::Set => {
129                ColumnValue::Set(cursor.read_int::<LittleEndian>(column_length as usize)? as u64)
130            }
131
132            ColumnType::Json => ColumnValue::Json(Self::parse_blob(cursor, column_meta)?),
133
134            _ => {
135                return Err(BinlogError::UnsupportedColumnType(format!(
136                    "{:?}",
137                    column_type
138                )))
139            }
140        };
141
142        Ok(value)
143    }
144
145    #[allow(clippy::needless_range_loop)]
146    fn parse_bit(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<u64, BinlogError> {
147        let bit_count = (column_meta >> 8) * 8 + (column_meta & 0xFF);
148        let bytes = cursor.read_bits_as_bytes(bit_count as usize, true)?;
149        let mut result = 0u64;
150        for i in 0..bytes.len() {
151            result |= (bytes[i] as u64) << (i * 8);
152        }
153        Ok(result)
154    }
155
156    fn parse_date(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
157        // Stored as a 3 byte value where bits 1 to 5 store the day,
158        // bits 6 to 9 store the month and the remaining bits store the year.
159        let date_val = cursor.read_u24::<LittleEndian>()?;
160        let day = date_val % 32;
161        let month = (date_val >> 5) % 16;
162        let year = date_val >> 9;
163        Ok(format!("{}-{:02}-{:02}", year, month, day))
164    }
165
166    fn parse_time(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
167        // refer: https://dev.mysql.com/doc/refman/8.0/en/time.html
168        let time_val = cursor.read_u24::<LittleEndian>()?;
169        let hour = (time_val / 100) / 100;
170        let minute = (time_val / 100) % 100;
171        let second = time_val % 100;
172        Ok(format!("{:02}:{:02}:{:02}", hour, minute, second))
173    }
174
175    fn parse_time2(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<String, BinlogError> {
176        // (in big endian)
177
178        // 1 bit sign (1= non-negative, 0= negative)
179        // 1 bit unused (reserved for future extensions)
180        // 10 bits hour (0-838)
181        // 6 bits minute (0-59)
182        // 6 bits second (0-59)
183
184        // (3 bytes in total)
185
186        // + fractional-seconds storage (size depends on meta)
187
188        // refer to: https://github.com/debezium/debezium/blob/main/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/event/RowDeserializers.java#L341
189
190        let fraction_bytes = ((column_meta + 1) / 2) as usize;
191        let payload_bytes = 3 + fraction_bytes;
192        let payload_bits = payload_bytes * 8;
193
194        let mut time = cursor.read_uint::<BigEndian>(payload_bytes)?;
195        let negative = Self::bit_slice(time, 0, 1, payload_bits) == 0;
196
197        if negative {
198            time = !time + 1;
199        }
200
201        let hour = Self::bit_slice(time, 2, 10, payload_bits);
202        let minute = Self::bit_slice(time, 12, 6, payload_bits);
203        let second = Self::bit_slice(time, 18, 6, payload_bits);
204
205        let mut micro_second = 0;
206        if fraction_bytes > 0 {
207            let fraction: u64 = Self::bit_slice(time, 24, fraction_bytes * 8, payload_bits);
208            micro_second = fraction * 10_000 / u64::pow(100, fraction_bytes as u32 - 1);
209        }
210
211        if negative {
212            Ok(format!(
213                "-{:02}:{:02}:{:02}.{:06}",
214                hour, minute, second, micro_second
215            ))
216        } else {
217            Ok(format!(
218                "{:02}:{:02}:{:02}.{:06}",
219                hour, minute, second, micro_second
220            ))
221        }
222    }
223
224    fn parse_fraction(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<u32, BinlogError> {
225        let mut fraction = 0;
226        let length = ((column_meta + 1) / 2) as u32;
227        if length > 0 {
228            fraction = cursor.read_uint::<BigEndian>(length as usize)?;
229            fraction *= u64::pow(100, 3 - length);
230        }
231        Ok(fraction as u32)
232    }
233
234    fn parse_timestamp(cursor: &mut Cursor<&Vec<u8>>) -> Result<i64, BinlogError> {
235        // Stored as a 4 byte UNIX timestamp (number of seconds since 00:00, Jan 1 1970 UTC).
236        Ok((cursor.read_u32::<LittleEndian>()?) as i64 * 1000000)
237    }
238
239    fn parse_timestamp2(
240        cursor: &mut Cursor<&Vec<u8>>,
241        column_meta: u16,
242    ) -> Result<i64, BinlogError> {
243        let second = cursor.read_u32::<BigEndian>()?;
244        let micros = Self::parse_fraction(cursor, column_meta)?;
245        Ok(1000000 * second as i64 + micros as i64)
246    }
247
248    fn parse_datetime(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
249        let datetime_val = cursor.read_u64::<LittleEndian>()? * 1000;
250        let date_val = datetime_val / 1000000;
251        let time_val = datetime_val % 1000000;
252        let year = ((date_val / 100) / 100) as u32;
253        let month = ((date_val / 100) % 100) as u32;
254        let day = (date_val % 100) as u32;
255        let hour = ((time_val / 100) / 100) as u32;
256        let minute = ((time_val / 100) % 100) as u32;
257        let second = (time_val % 100) as u32;
258        Ok(format!(
259            "{}-{:02}-{:02} {:02}:{:02}:{:02}",
260            year, month, day, hour, minute, second,
261        ))
262    }
263
264    fn parse_datetime2(
265        cursor: &mut Cursor<&Vec<u8>>,
266        column_meta: u16,
267    ) -> Result<String, BinlogError> {
268        // Stored as 4-byte value,
269        // The number of decimals for the fractional part is stored in the table metadata as a one byte value.
270        // The number of bytes that follow the 5 byte datetime value can be calculated
271        // with the following formula: (decimals + 1) / 2
272        let val = cursor.read_uint::<BigEndian>(5)? - 0x8000000000;
273        let micros = Self::parse_fraction(cursor, column_meta)?;
274        let d_val = val >> 17;
275        let t_val = val % (1 << 17);
276        let year = ((d_val >> 5) / 13) as u32;
277        let month = ((d_val >> 5) % 13) as u32;
278        let day = (d_val % (1 << 5)) as u32;
279        let hour = ((val >> 12) % (1 << 5)) as u32;
280        let minute = ((t_val >> 6) % (1 << 6)) as u32;
281        let second = (t_val % (1 << 6)) as u32;
282        Ok(format!(
283            "{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
284            year, month, day, hour, minute, second, micros,
285        ))
286    }
287
288    fn parse_string(
289        cursor: &mut Cursor<&Vec<u8>>,
290        column_meta: u16,
291    ) -> Result<Vec<u8>, BinlogError> {
292        let size = if column_meta < 256 {
293            cursor.read_u8()? as usize
294        } else {
295            cursor.read_u16::<LittleEndian>()? as usize
296        };
297        // charset is not present in the binary log, return byte[] instead of an actual String
298        cursor.read_bytes(size)
299    }
300
301    fn parse_blob(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<Vec<u8>, BinlogError> {
302        let size = cursor.read_uint::<LittleEndian>(column_meta as usize)? as usize;
303        cursor.read_bytes(size)
304    }
305
306    #[allow(clippy::needless_range_loop)]
307    pub fn parse_decimal(
308        cursor: &mut Cursor<&Vec<u8>>,
309        precision: usize,
310        scale: usize,
311    ) -> Result<String, BinlogError> {
312        // Given a column to be DECIMAL(13,4), the numbers mean:
313        // 13: precision, the maximum number of digits, the maximum precesion for DECIMAL is 65.
314        // 4: scale, the number of digits to the right of the decimal point.
315        // 13 - 4: integral, the maximum number of digits to the left of the decimal point.
316        let integral = precision - scale;
317
318        // A decimal is stored in binlog like following:
319        // ([compressed bytes, 1-4]) ([fixed bytes: 4] * n) . ([fixed bytes: 4] * n) ([compressed bytes, 1-4])
320        // Both integral and scale are stored in BigEndian.
321        // refer: https://github.com/mysql/mysql-server/blob/8.0/strings/decimal.cc#L1488
322        // Examples:
323        // DECIMAL(10,4): [3 bytes] . [2 bytes]
324        // DECIMAL(18,9): [4 bytes] . [4 bytes]
325        // DECIMAL(27,13): [3 bytes][4 bytes] . [4 bytes][2 bytes]
326        // DECIMAL(47,25): [2 bytes][4 bytes][4 bytes] . [4 bytes][4 bytes][4 bytes]
327        // DIG_PER_DEC = 9: each 4 bytes represent 9 digits in a decimal number.
328        // COMPRESSED_BYTES = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]: bytes needed to compress n digits.
329        let uncomp_intg = integral / DIG_PER_DEC;
330        let uncomp_frac = scale / DIG_PER_DEC;
331        let comp_intg = integral - (uncomp_intg * DIG_PER_DEC);
332        let comp_frac = scale - (uncomp_frac * DIG_PER_DEC);
333
334        let comp_frac_bytes = COMPRESSED_BYTES[comp_frac];
335        let comp_intg_bytes = COMPRESSED_BYTES[comp_intg];
336
337        let total_bytes = 4 * uncomp_intg + 4 * uncomp_frac + comp_frac_bytes + comp_intg_bytes;
338        let mut buf = vec![0u8; total_bytes];
339        cursor.read_exact(&mut buf)?;
340
341        // handle negative
342        let is_negative = (buf[0] & 0x80) == 0;
343        buf[0] ^= 0x80;
344        if is_negative {
345            for i in 0..buf.len() {
346                buf[i] ^= 0xFF;
347            }
348        }
349
350        // negative sign
351        let mut intg_str = String::new();
352        if is_negative {
353            intg_str = "-".to_string();
354        }
355
356        let mut decimal_cursor = Cursor::new(buf);
357        let mut is_intg_empty = true;
358        // compressed integral
359        if comp_intg_bytes > 0 {
360            let value = decimal_cursor.read_uint::<BigEndian>(comp_intg_bytes)?;
361            if value > 0 {
362                intg_str += value.to_string().as_str();
363                is_intg_empty = false;
364            }
365        }
366
367        // uncompressed integral
368        for _ in 0..uncomp_intg {
369            let value = decimal_cursor.read_u32::<BigEndian>()?;
370            if is_intg_empty {
371                if value > 0 {
372                    intg_str += value.to_string().as_str();
373                    is_intg_empty = false;
374                }
375            } else {
376                intg_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
377            }
378        }
379
380        if is_intg_empty {
381            intg_str += "0";
382        }
383
384        let mut frac_str = String::new();
385        // uncompressed fractional
386        for _ in 0..uncomp_frac {
387            let value = decimal_cursor.read_u32::<BigEndian>()?;
388            frac_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
389        }
390
391        // compressed fractional
392        if comp_frac_bytes > 0 {
393            let value = decimal_cursor.read_uint::<BigEndian>(comp_frac_bytes)?;
394            frac_str += format!("{value:0size$}", value = value, size = comp_frac).as_str();
395        }
396
397        if frac_str.is_empty() {
398            Ok(intg_str)
399        } else {
400            Ok(intg_str + "." + frac_str.as_str())
401        }
402    }
403
404    fn bit_slice(value: u64, bit_offset: usize, num_bits: usize, payload_size: usize) -> u64 {
405        (value >> (payload_size - (bit_offset + num_bits))) & ((1 << num_bits) - 1)
406    }
407}