mysql_binlog_connector_rust/column/json/
json_binary.rs

1use crate::{
2    binlog_error::BinlogError,
3    column::{column_type::ColumnType, column_value::ColumnValue},
4    ext::buf_ext::BufExt,
5};
6use byteorder::{LittleEndian, ReadBytesExt};
7use std::io::{Cursor, Read, Seek, SeekFrom};
8
9use super::{
10    json_formatter::JsonFormatter, json_string_formatter::JsonStringFormatter,
11    value_type::ValueType,
12};
13
14// refer: https://github.com/osheroff/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java
15pub struct JsonBinary<'a> {
16    reader: Cursor<&'a [u8]>,
17}
18
19impl JsonBinary<'_> {
20    pub fn parse_as_string(bytes: &[u8]) -> Result<String, BinlogError> {
21        /* check for mariaDB-format JSON strings inside columns marked JSON */
22        let is_json_string = bytes[0] > 0x0f;
23        if is_json_string {
24            return Ok(String::from_utf8_lossy(bytes).to_string());
25        }
26
27        let mut formatter = JsonStringFormatter::default();
28        Self::parse(bytes, &mut formatter)?;
29        Ok(formatter.get_string())
30    }
31
32    pub fn parse<F: JsonFormatter>(bytes: &[u8], formatter: &mut F) -> Result<(), BinlogError> {
33        let mut binary = JsonBinary {
34            reader: Cursor::new(bytes),
35        };
36        let value_type = binary.read_value_type()?;
37        binary.parse_internal(&value_type, formatter)
38    }
39
40    fn parse_internal<F: JsonFormatter>(
41        &mut self,
42        type_: &ValueType,
43        formatter: &mut F,
44    ) -> Result<(), BinlogError> {
45        match type_ {
46            ValueType::SmallDocument => self.parse_object(true, false, formatter),
47            ValueType::LargeDocument => self.parse_object(false, false, formatter),
48            ValueType::SmallArray => self.parse_object(true, true, formatter),
49            ValueType::LargeArray => self.parse_object(false, true, formatter),
50            ValueType::Literal => self.parse_literal(formatter),
51            ValueType::Int16 => self.parse_int16(formatter),
52            ValueType::Uint16 => self.parse_uint16(formatter),
53            ValueType::Int32 => self.parse_int32(formatter),
54            ValueType::Uint32 => self.parse_uint32(formatter),
55            ValueType::Int64 => self.parse_int64(formatter),
56            ValueType::Uint64 => self.parse_uint64(formatter),
57            ValueType::Double => self.parse_double(formatter),
58            ValueType::String => self.parse_string(formatter),
59            ValueType::Custom => self.parse_opaque(formatter),
60        }
61    }
62
63    fn parse_object<F: JsonFormatter>(
64        &mut self,
65        is_small: bool,
66        is_array: bool,
67        formatter: &mut F,
68    ) -> Result<(), BinlogError> {
69        let object_offset = self.reader.position();
70
71        // Read the header ...
72        let num_elements = self.read_unsigned_index(u32::MAX, is_small, "number of elements in")?;
73        let num_bytes = self.read_unsigned_index(u32::MAX, is_small, "size of")?;
74        let value_size = if is_small { 2 } else { 4 };
75
76        // Read each key-entry, consisting of the offset and length of each key ...
77        let mut keys = Vec::with_capacity(num_elements as usize);
78
79        if !is_array {
80            for _i in 0..num_elements {
81                keys.push(KeyEntry {
82                    index: self.read_unsigned_index(num_bytes, is_small, "key offset in")? as u64,
83                    length: self.read_uint16()? as usize,
84                    name: String::new(),
85                });
86            }
87        }
88
89        // Read each key value value-entry
90        let mut entries = Vec::with_capacity(num_elements as usize);
91        for _i in 0..num_elements as usize {
92            // Parse the value ...
93            let type_ = self.read_value_type()?;
94            let entry_value = match type_ {
95                ValueType::Literal => {
96                    let value = self.read_literal()?;
97                    self.reader.seek(SeekFrom::Current(value_size - 1))?;
98                    Some(DirectEntryValue::Literal(value))
99                }
100                ValueType::Int16 => {
101                    let value = Some(DirectEntryValue::Numeric(self.read_int16()? as i64));
102                    self.reader.seek(SeekFrom::Current(value_size - 2))?;
103                    value
104                }
105                ValueType::Uint16 => {
106                    let value = Some(DirectEntryValue::Numeric(self.read_uint16()? as i64));
107                    self.reader.seek(SeekFrom::Current(value_size - 2))?;
108                    value
109                }
110                ValueType::Int32 => {
111                    if !is_small {
112                        Some(DirectEntryValue::Numeric(self.read_int32()? as i64))
113                    } else {
114                        None
115                    }
116                }
117                ValueType::Uint32 => {
118                    if !is_small {
119                        Some(DirectEntryValue::Numeric(self.read_uint32()? as i64))
120                    } else {
121                        None
122                    }
123                }
124                _ => None,
125            };
126
127            if entry_value.is_some() {
128                entries.push(ValueEntry::new(type_).set_value(entry_value));
129            } else {
130                // It is an offset, not a value ...
131                let index = self.read_unsigned_index(num_bytes, is_small, "value offset in")?;
132                entries.push(ValueEntry::new_with_index(type_, index));
133            }
134        }
135
136        if !is_array {
137            // Read each key ...
138            for key in keys.iter_mut() {
139                let skip_bytes = key.index + object_offset - self.reader.position();
140                // Skip to a start of a field name if the current position does not point to it
141                // This can happen for MySQL 8
142                if skip_bytes != 0 {
143                    self.reader.seek(SeekFrom::Current(skip_bytes as i64))?;
144                }
145                key.name = self.read_as_string(key.length)?;
146            }
147        }
148
149        if is_array {
150            formatter.begin_array(num_elements)
151        } else {
152            formatter.begin_object(num_elements);
153        }
154
155        // Read and parse the values ...
156        for i in 0..num_elements as usize {
157            if i != 0 {
158                formatter.next_entry();
159            }
160
161            if !is_array {
162                formatter.name(&keys[i].name);
163            }
164
165            let entry = &entries[i];
166            if entry.resolved {
167                if let Some(entry_value) = &entry.value {
168                    match entry_value {
169                        DirectEntryValue::Literal(value) => {
170                            if let Some(bool_value) = value {
171                                formatter.value_bool(*bool_value);
172                            } else {
173                                formatter.value_null();
174                            }
175                        }
176                        DirectEntryValue::Numeric(value) => {
177                            formatter.value_long(*value);
178                        }
179                    }
180                } else {
181                    formatter.value_null();
182                }
183            } else {
184                // Parse the value ...
185                self.reader
186                    .seek(SeekFrom::Start(object_offset + entry.index as u64))?;
187                self.parse_internal(&entry.value_type, formatter)?;
188            }
189        }
190
191        if is_array {
192            formatter.end_array();
193        } else {
194            formatter.end_object();
195        }
196
197        Ok(())
198    }
199
200    pub fn parse_literal<F: JsonFormatter>(
201        &mut self,
202        formatter: &mut F,
203    ) -> Result<(), BinlogError> {
204        if let Some(value) = self.read_literal()? {
205            formatter.value_bool(value);
206        } else {
207            formatter.value_null();
208        }
209        Ok(())
210    }
211
212    fn parse_int16<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
213        let value = self.read_int16()?;
214        formatter.value_int(value as i32);
215        Ok(())
216    }
217
218    fn parse_uint16<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
219        let value = self.read_uint16()?;
220        formatter.value_int(value as i32);
221        Ok(())
222    }
223
224    fn parse_int32<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
225        let value = self.read_int32()?;
226        formatter.value_int(value);
227        Ok(())
228    }
229
230    fn parse_uint32<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
231        let value = self.read_uint32()?;
232        formatter.value_long(value as i64);
233        Ok(())
234    }
235
236    fn parse_int64<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
237        let value = self.read_int64()?;
238        formatter.value_long(value);
239        Ok(())
240    }
241
242    pub fn parse_uint64<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
243        let value = self.read_uint64()?;
244        formatter.value_big_int(value as i128);
245        Ok(())
246    }
247
248    pub fn parse_double(&mut self, formatter: &mut dyn JsonFormatter) -> Result<(), BinlogError> {
249        let raw_value = self.read_int64()? as u64;
250        let value = f64::from_bits(raw_value);
251        formatter.value_double(value);
252        Ok(())
253    }
254
255    fn parse_string<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
256        let length = self.read_var_int()?;
257        let mut bytes = vec![0; length as usize];
258        self.reader.read_exact(&mut bytes)?;
259        let value = bytes.to_utf8_string();
260        formatter.value_string(&value);
261        Ok(())
262    }
263
264    fn parse_date<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
265        let raw = self.read_int64()?;
266        let value = raw >> 24;
267        let year_month = (value >> 22) % (1 << 17);
268        let year = year_month / 13;
269        let month = year_month % 13;
270        let day = (value >> 17) % (1 << 5);
271        formatter.value_date(year as i32, month as i32, day as i32);
272        Ok(())
273    }
274
275    pub fn parse_time<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
276        let raw = self.read_int64()?;
277        let value = raw >> 24;
278        let negative = value < 0;
279        let hour = (value >> 12) % (1 << 10); // 10 bits starting at 12th
280        let min = (value >> 6) % (1 << 6); // 6 bits starting at 6th
281        let sec = value % (1 << 6); // 6 bits starting at 0th
282        let hour = if negative { -hour } else { hour };
283        let micro_seconds = (raw % (1 << 24)) as u32;
284        formatter.value_time(hour as i32, min as i32, sec as i32, micro_seconds as i32);
285        Ok(())
286    }
287
288    fn parse_datetime<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
289        let raw = self.read_int64()?;
290        let value = raw >> 24;
291        let year_month = ((value >> 22) % (1 << 17)) as i32; // 17 bits starting at 22nd
292        let year = year_month / 13;
293        let month = year_month % 13;
294        let day = ((value >> 17) % (1 << 5)) as i32; // 5 bits starting at 17th
295        let hour = ((value >> 12) % (1 << 5)) as i32; // 5 bits starting at 12th
296        let min = ((value >> 6) % (1 << 6)) as i32; // 6 bits starting at 6th
297        let sec = (value % (1 << 6)) as i32; // 6 bits starting at 0th
298        let micro_seconds = (raw % (1 << 24)) as i32;
299        formatter.value_datetime(year, month, day, hour, min, sec, micro_seconds);
300        Ok(())
301    }
302
303    fn parse_decimal<F: JsonFormatter>(
304        &mut self,
305        length: usize,
306        formatter: &mut F,
307    ) -> Result<(), BinlogError> {
308        // First two bytes are the precision and scale ...
309        let precision = self.reader.read_u8()? as usize;
310        let scale = self.reader.read_u8()? as usize;
311
312        // Followed by the binary representation
313        let mut buf = vec![0; length - 2];
314        self.reader.read_exact(&mut buf)?;
315        let mut cursor = Cursor::new(&buf);
316        let decimal = ColumnValue::parse_decimal(&mut cursor, precision, scale)?;
317
318        formatter.value_decimal(&decimal);
319        Ok(())
320    }
321
322    fn parse_opaque_value<F: JsonFormatter>(
323        &mut self,
324        type_: &ColumnType,
325        length: usize,
326        formatter: &mut F,
327    ) -> Result<(), BinlogError> {
328        let mut bytes = vec![0; length];
329        self.reader.read_exact(&mut bytes)?;
330        formatter.value_opaque(type_, &bytes);
331        Ok(())
332    }
333
334    pub fn parse_opaque<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
335        // Read the custom type, which should be a standard ColumnType ...
336        let custom_type = self.reader.read_u8()?;
337        let type_ = ColumnType::from_code(custom_type);
338        // Read the data length ...
339        let length = self.read_var_int()? as usize;
340
341        match type_ {
342            ColumnType::Decimal | ColumnType::NewDecimal => {
343                self.parse_decimal(length, formatter)?
344            }
345            ColumnType::Date => self.parse_date(formatter)?,
346            ColumnType::Time | ColumnType::Time2 => self.parse_time(formatter)?,
347            ColumnType::DateTime
348            | ColumnType::DateTime2
349            | ColumnType::TimeStamp
350            | ColumnType::TimeStamp2 => self.parse_datetime(formatter)?,
351            _ => self.parse_opaque_value(&type_, length, formatter)?,
352        }
353        Ok(())
354    }
355
356    fn read_unsigned_index(
357        &mut self,
358        max_value: u32,
359        is_small: bool,
360        desc: &str,
361    ) -> Result<u32, BinlogError> {
362        let result = if is_small {
363            self.read_uint16()? as u32
364        } else {
365            self.read_uint32()?
366        };
367
368        if result > max_value {
369            return Err(BinlogError::ParseJsonError(format!(
370                "{}, the JSON document is {} and is too big for the binary form of the document ({})",
371                desc,
372                result,
373                max_value
374            )));
375        }
376
377        Ok(result)
378    }
379
380    fn read_int16(&mut self) -> Result<i16, BinlogError> {
381        Ok(self.reader.read_i16::<LittleEndian>()?)
382    }
383
384    fn read_uint16(&mut self) -> Result<u16, BinlogError> {
385        Ok(self.reader.read_u16::<LittleEndian>()?)
386    }
387
388    fn read_int32(&mut self) -> Result<i32, BinlogError> {
389        Ok(self.reader.read_i32::<LittleEndian>()?)
390    }
391
392    fn read_uint32(&mut self) -> Result<u32, BinlogError> {
393        Ok(self.reader.read_u32::<LittleEndian>()?)
394    }
395
396    fn read_int64(&mut self) -> Result<i64, BinlogError> {
397        Ok(self.reader.read_i64::<LittleEndian>()?)
398    }
399
400    fn read_uint64(&mut self) -> Result<u64, BinlogError> {
401        Ok(self.reader.read_u64::<LittleEndian>()?)
402    }
403
404    fn read_as_string(&mut self, length: usize) -> Result<String, BinlogError> {
405        let mut bytes = vec![0; length];
406        self.reader.read_exact(&mut bytes)?;
407        Ok(bytes.to_utf8_string())
408    }
409
410    fn read_var_int(&mut self) -> Result<i32, BinlogError> {
411        let mut length: i32 = 0;
412        for i in 0..5 {
413            let b = self.reader.read_u8()? as i32;
414            length |= (b & 0x7F) << (7 * i);
415            if (b & 0x80) == 0 {
416                return Ok(length);
417            }
418        }
419
420        Err(BinlogError::ParseJsonError(
421            "Unexpected byte sequence".into(),
422        ))
423    }
424
425    fn read_literal(&mut self) -> Result<Option<bool>, BinlogError> {
426        let b = self.reader.read_u8()?;
427        match b {
428            0x00 => Ok(None),
429            0x01 => Ok(Some(true)),
430            0x02 => Ok(Some(false)),
431            _ => Err(BinlogError::ParseJsonError(format!(
432                "Unexpected value: '{}' for literal",
433                self.as_hex(b)
434            ))),
435        }
436    }
437
438    fn read_value_type(&mut self) -> Result<ValueType, BinlogError> {
439        let b = self.reader.read_u8()?;
440        if let Some(result) = ValueType::by_code(b) {
441            Ok(result)
442        } else {
443            Err(BinlogError::ParseJsonError(format!(
444                "Unknown value type code: '{}'",
445                self.as_hex(b)
446            )))
447        }
448    }
449
450    fn as_hex(&mut self, b: u8) -> String {
451        format!("{:02X} ", b)
452    }
453}
454
455#[derive(Default)]
456struct KeyEntry {
457    pub index: u64,
458    pub length: usize,
459    pub name: String,
460}
461
462struct ValueEntry {
463    pub value_type: ValueType,
464    pub index: u32,
465    pub value: Option<DirectEntryValue>,
466    pub resolved: bool,
467}
468
469enum DirectEntryValue {
470    Literal(Option<bool>),
471    Numeric(i64),
472}
473
474impl ValueEntry {
475    fn new(value_type: ValueType) -> Self {
476        Self {
477            value_type,
478            index: 0,
479            value: None,
480            resolved: false,
481        }
482    }
483
484    fn new_with_index(value_type: ValueType, index: u32) -> Self {
485        Self {
486            value_type,
487            index,
488            value: None,
489            resolved: false,
490        }
491    }
492
493    fn set_value(mut self, value: Option<DirectEntryValue>) -> Self {
494        self.value = value;
495        self.resolved = true;
496        self
497    }
498}