mysql_binlog_connector_rust/column/
column_value.rs1use super::column_type::ColumnType;
2use crate::{binlog_error::BinlogError, ext::cursor_ext::CursorExt};
3use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5use std::io::{Cursor, Read};
6
7#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
8pub enum ColumnValue {
9 None,
10 Tiny(i8),
12 Short(i16),
14 Long(i32),
16 LongLong(i64),
18 Float(f32),
20 Double(f64),
22 Decimal(String),
24 Time(String),
26 Date(String),
28 DateTime(String),
31 Timestamp(i64),
38 Year(u16),
41 String(Vec<u8>),
43 Blob(Vec<u8>),
45 Bit(u64),
47 Set(u64),
51 Enum(u32),
55 Json(Vec<u8>),
56}
57
58const DIG_PER_DEC: usize = 9;
59const COMPRESSED_BYTES: [usize; 10] = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4];
60
61impl ColumnValue {
62 pub fn parse(
64 cursor: &mut Cursor<&Vec<u8>>,
65 column_type: ColumnType,
66 column_meta: u16,
67 column_length: u16,
68 ) -> Result<Self, BinlogError> {
69 let value = match column_type {
70 ColumnType::Bit => ColumnValue::Bit(Self::parse_bit(cursor, column_meta)?),
71
72 ColumnType::Tiny => ColumnValue::Tiny(cursor.read_i8()?),
73
74 ColumnType::Short => ColumnValue::Short(cursor.read_i16::<LittleEndian>()?),
75
76 ColumnType::Int24 => ColumnValue::Long(cursor.read_i24::<LittleEndian>()?),
77
78 ColumnType::Long => ColumnValue::Long(cursor.read_i32::<LittleEndian>()?),
79
80 ColumnType::LongLong => ColumnValue::LongLong(cursor.read_i64::<LittleEndian>()?),
81
82 ColumnType::Float => ColumnValue::Float(cursor.read_f32::<LittleEndian>()?),
83
84 ColumnType::Double => ColumnValue::Double(cursor.read_f64::<LittleEndian>()?),
85
86 ColumnType::NewDecimal => {
87 let precision = (column_meta & 0xFF) as usize;
88 let scale = (column_meta >> 8) as usize;
89 ColumnValue::Decimal(Self::parse_decimal(cursor, precision, scale)?)
90 }
91
92 ColumnType::Date => ColumnValue::Date(Self::parse_date(cursor)?),
93
94 ColumnType::Time => ColumnValue::Time(Self::parse_time(cursor)?),
95
96 ColumnType::Time2 => ColumnValue::Time(Self::parse_time2(cursor, column_meta)?),
97
98 ColumnType::TimeStamp => ColumnValue::Timestamp(Self::parse_timestamp(cursor)?),
99
100 ColumnType::TimeStamp2 => {
101 ColumnValue::Timestamp(Self::parse_timestamp2(cursor, column_meta)?)
102 }
103
104 ColumnType::DateTime => ColumnValue::DateTime(Self::parse_datetime(cursor)?),
105
106 ColumnType::DateTime2 => {
107 ColumnValue::DateTime(Self::parse_datetime2(cursor, column_meta)?)
108 }
109
110 ColumnType::Year => ColumnValue::Year(cursor.read_u8()? as u16 + 1900),
111
112 ColumnType::VarChar | ColumnType::VarString => {
113 ColumnValue::String(Self::parse_string(cursor, column_meta)?)
114 }
115
116 ColumnType::String => ColumnValue::String(Self::parse_string(cursor, column_length)?),
117
118 ColumnType::Blob
119 | ColumnType::Geometry
120 | ColumnType::TinyBlob
121 | ColumnType::MediumBlob
122 | ColumnType::LongBlob => ColumnValue::Blob(Self::parse_blob(cursor, column_meta)?),
123
124 ColumnType::Enum => {
125 ColumnValue::Enum(cursor.read_int::<LittleEndian>(column_length as usize)? as u32)
126 }
127
128 ColumnType::Set => {
129 ColumnValue::Set(cursor.read_int::<LittleEndian>(column_length as usize)? as u64)
130 }
131
132 ColumnType::Json => ColumnValue::Json(Self::parse_blob(cursor, column_meta)?),
133
134 _ => {
135 return Err(BinlogError::UnsupportedColumnType(format!(
136 "{:?}",
137 column_type
138 )))
139 }
140 };
141
142 Ok(value)
143 }
144
145 #[allow(clippy::needless_range_loop)]
146 fn parse_bit(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<u64, BinlogError> {
147 let bit_count = (column_meta >> 8) * 8 + (column_meta & 0xFF);
148 let bytes = cursor.read_bits_as_bytes(bit_count as usize, true)?;
149 let mut result = 0u64;
150 for i in 0..bytes.len() {
151 result |= (bytes[i] as u64) << (i * 8);
152 }
153 Ok(result)
154 }
155
156 fn parse_date(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
157 let date_val = cursor.read_u24::<LittleEndian>()?;
160 let day = date_val % 32;
161 let month = (date_val >> 5) % 16;
162 let year = date_val >> 9;
163 Ok(format!("{}-{:02}-{:02}", year, month, day))
164 }
165
166 fn parse_time(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
167 let time_val = cursor.read_u24::<LittleEndian>()?;
169 let hour = (time_val / 100) / 100;
170 let minute = (time_val / 100) % 100;
171 let second = time_val % 100;
172 Ok(format!("{:02}:{:02}:{:02}", hour, minute, second))
173 }
174
175 fn parse_time2(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<String, BinlogError> {
176 let fraction_bytes = ((column_meta + 1) / 2) as usize;
191 let payload_bytes = 3 + fraction_bytes;
192 let payload_bits = payload_bytes * 8;
193
194 let mut time = cursor.read_uint::<BigEndian>(payload_bytes)?;
195 let negative = Self::bit_slice(time, 0, 1, payload_bits) == 0;
196
197 if negative {
198 time = !time + 1;
199 }
200
201 let hour = Self::bit_slice(time, 2, 10, payload_bits);
202 let minute = Self::bit_slice(time, 12, 6, payload_bits);
203 let second = Self::bit_slice(time, 18, 6, payload_bits);
204
205 let mut micro_second = 0;
206 if fraction_bytes > 0 {
207 let fraction: u64 = Self::bit_slice(time, 24, fraction_bytes * 8, payload_bits);
208 micro_second = fraction * 10_000 / u64::pow(100, fraction_bytes as u32 - 1);
209 }
210
211 if negative {
212 Ok(format!(
213 "-{:02}:{:02}:{:02}.{:06}",
214 hour, minute, second, micro_second
215 ))
216 } else {
217 Ok(format!(
218 "{:02}:{:02}:{:02}.{:06}",
219 hour, minute, second, micro_second
220 ))
221 }
222 }
223
224 fn parse_fraction(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<u32, BinlogError> {
225 let mut fraction = 0;
226 let length = ((column_meta + 1) / 2) as u32;
227 if length > 0 {
228 fraction = cursor.read_uint::<BigEndian>(length as usize)?;
229 fraction *= u64::pow(100, 3 - length);
230 }
231 Ok(fraction as u32)
232 }
233
234 fn parse_timestamp(cursor: &mut Cursor<&Vec<u8>>) -> Result<i64, BinlogError> {
235 Ok((cursor.read_u32::<LittleEndian>()?) as i64 * 1000000)
237 }
238
239 fn parse_timestamp2(
240 cursor: &mut Cursor<&Vec<u8>>,
241 column_meta: u16,
242 ) -> Result<i64, BinlogError> {
243 let second = cursor.read_u32::<BigEndian>()?;
244 let micros = Self::parse_fraction(cursor, column_meta)?;
245 Ok(1000000 * second as i64 + micros as i64)
246 }
247
248 fn parse_datetime(cursor: &mut Cursor<&Vec<u8>>) -> Result<String, BinlogError> {
249 let datetime_val = cursor.read_u64::<LittleEndian>()? * 1000;
250 let date_val = datetime_val / 1000000;
251 let time_val = datetime_val % 1000000;
252 let year = ((date_val / 100) / 100) as u32;
253 let month = ((date_val / 100) % 100) as u32;
254 let day = (date_val % 100) as u32;
255 let hour = ((time_val / 100) / 100) as u32;
256 let minute = ((time_val / 100) % 100) as u32;
257 let second = (time_val % 100) as u32;
258 Ok(format!(
259 "{}-{:02}-{:02} {:02}:{:02}:{:02}",
260 year, month, day, hour, minute, second,
261 ))
262 }
263
264 fn parse_datetime2(
265 cursor: &mut Cursor<&Vec<u8>>,
266 column_meta: u16,
267 ) -> Result<String, BinlogError> {
268 let val = cursor.read_uint::<BigEndian>(5)? - 0x8000000000;
273 let micros = Self::parse_fraction(cursor, column_meta)?;
274 let d_val = val >> 17;
275 let t_val = val % (1 << 17);
276 let year = ((d_val >> 5) / 13) as u32;
277 let month = ((d_val >> 5) % 13) as u32;
278 let day = (d_val % (1 << 5)) as u32;
279 let hour = ((val >> 12) % (1 << 5)) as u32;
280 let minute = ((t_val >> 6) % (1 << 6)) as u32;
281 let second = (t_val % (1 << 6)) as u32;
282 Ok(format!(
283 "{}-{:02}-{:02} {:02}:{:02}:{:02}.{:06}",
284 year, month, day, hour, minute, second, micros,
285 ))
286 }
287
288 fn parse_string(
289 cursor: &mut Cursor<&Vec<u8>>,
290 column_meta: u16,
291 ) -> Result<Vec<u8>, BinlogError> {
292 let size = if column_meta < 256 {
293 cursor.read_u8()? as usize
294 } else {
295 cursor.read_u16::<LittleEndian>()? as usize
296 };
297 cursor.read_bytes(size)
299 }
300
301 fn parse_blob(cursor: &mut Cursor<&Vec<u8>>, column_meta: u16) -> Result<Vec<u8>, BinlogError> {
302 let size = cursor.read_uint::<LittleEndian>(column_meta as usize)? as usize;
303 cursor.read_bytes(size)
304 }
305
306 #[allow(clippy::needless_range_loop)]
307 pub fn parse_decimal(
308 cursor: &mut Cursor<&Vec<u8>>,
309 precision: usize,
310 scale: usize,
311 ) -> Result<String, BinlogError> {
312 let integral = precision - scale;
317
318 let uncomp_intg = integral / DIG_PER_DEC;
330 let uncomp_frac = scale / DIG_PER_DEC;
331 let comp_intg = integral - (uncomp_intg * DIG_PER_DEC);
332 let comp_frac = scale - (uncomp_frac * DIG_PER_DEC);
333
334 let comp_frac_bytes = COMPRESSED_BYTES[comp_frac];
335 let comp_intg_bytes = COMPRESSED_BYTES[comp_intg];
336
337 let total_bytes = 4 * uncomp_intg + 4 * uncomp_frac + comp_frac_bytes + comp_intg_bytes;
338 let mut buf = vec![0u8; total_bytes];
339 cursor.read_exact(&mut buf)?;
340
341 let is_negative = (buf[0] & 0x80) == 0;
343 buf[0] ^= 0x80;
344 if is_negative {
345 for i in 0..buf.len() {
346 buf[i] ^= 0xFF;
347 }
348 }
349
350 let mut intg_str = String::new();
352 if is_negative {
353 intg_str = "-".to_string();
354 }
355
356 let mut decimal_cursor = Cursor::new(buf);
357 let mut is_intg_empty = true;
358 if comp_intg_bytes > 0 {
360 let value = decimal_cursor.read_uint::<BigEndian>(comp_intg_bytes)?;
361 if value > 0 {
362 intg_str += value.to_string().as_str();
363 is_intg_empty = false;
364 }
365 }
366
367 for _ in 0..uncomp_intg {
369 let value = decimal_cursor.read_u32::<BigEndian>()?;
370 if is_intg_empty {
371 if value > 0 {
372 intg_str += value.to_string().as_str();
373 is_intg_empty = false;
374 }
375 } else {
376 intg_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
377 }
378 }
379
380 if is_intg_empty {
381 intg_str += "0";
382 }
383
384 let mut frac_str = String::new();
385 for _ in 0..uncomp_frac {
387 let value = decimal_cursor.read_u32::<BigEndian>()?;
388 frac_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
389 }
390
391 if comp_frac_bytes > 0 {
393 let value = decimal_cursor.read_uint::<BigEndian>(comp_frac_bytes)?;
394 frac_str += format!("{value:0size$}", value = value, size = comp_frac).as_str();
395 }
396
397 if frac_str.is_empty() {
398 Ok(intg_str)
399 } else {
400 Ok(intg_str + "." + frac_str.as_str())
401 }
402 }
403
404 fn bit_slice(value: u64, bit_offset: usize, num_bits: usize, payload_size: usize) -> u64 {
405 (value >> (payload_size - (bit_offset + num_bits))) & ((1 << num_bits) - 1)
406 }
407}