mysqlbinlog_network/mysql_binlog/
jsonb.rs

1/// MySQL uses a bizarro custom encoding that they call JSONB (no relation to the PostgreSQL column
2/// type) for JSON values. No, I don't know why they didn't just use BSON or CBOR. I think they
3/// might just hate me.
4use std::io::Cursor;
5use std::iter::FromIterator;
6
7use base64;
8use byteorder::{LittleEndian, ReadBytesExt};
9use serde_json::map::Map as JsonMap;
10use serde_json::Value as JsonValue;
11
12use crate::mysql_binlog::column_types::ColumnType;
13use crate::mysql_binlog::errors::JsonbParseError;
14use crate::mysql_binlog::packet_helpers;
15
16enum FieldType {
17    SmallObject,
18    LargeObject,
19    SmallArray,
20    LargeArray,
21    Literal,
22    Int16,
23    Uint16,
24    Int32,
25    Uint32,
26    Int64,
27    Uint64,
28    Double,
29    JsonString,
30    Custom,
31}
32
33impl FieldType {
34    fn from_byte(u: u8) -> Result<Self, JsonbParseError> {
35        Ok(match u {
36            0x00 => FieldType::SmallObject,
37            0x01 => FieldType::LargeObject,
38            0x02 => FieldType::SmallArray,
39            0x03 => FieldType::LargeArray,
40            0x04 => FieldType::Literal,
41            0x05 => FieldType::Int16,
42            0x06 => FieldType::Uint16,
43            0x07 => FieldType::Int32,
44            0x08 => FieldType::Uint32,
45            0x09 => FieldType::Int64,
46            0x0a => FieldType::Uint64,
47            0x0b => FieldType::Double,
48            0x0c => FieldType::JsonString,
49            0x0f => FieldType::Custom,
50            i => return Err(JsonbParseError::InvalidTypeByte(i)),
51        })
52    }
53}
54
55#[derive(Debug, PartialEq, Eq, Clone, Copy)]
56enum CompoundSize {
57    Small,
58    Large,
59}
60
61#[derive(Debug, PartialEq, Eq, Clone, Copy)]
62enum CompoundType {
63    Object,
64    Array,
65}
66
67pub fn parse(blob: Vec<u8>) -> Result<JsonValue, JsonbParseError> {
68    let mut cursor = Cursor::new(blob);
69    parse_any(&mut cursor)
70}
71
72#[derive(Debug)]
73enum OffsetOrInline {
74    Inline(JsonValue),
75    Offset(u32),
76}
77
78fn parse_maybe_inlined_value(
79    cursor: &mut Cursor<Vec<u8>>,
80    compound_size: CompoundSize,
81) -> Result<(u8, OffsetOrInline), JsonbParseError> {
82    let t = cursor.read_u8()?;
83    let inlined_value = match FieldType::from_byte(t) {
84        Ok(FieldType::Literal) => match cursor.read_u16::<LittleEndian>()? {
85            0x00 => JsonValue::Null,
86            0x01 => JsonValue::Bool(true),
87            0x02 => JsonValue::Bool(false),
88            i => return Err(JsonbParseError::InvalidLiteral(i).into()),
89        },
90        Ok(FieldType::Uint16) => JsonValue::from(cursor.read_u16::<LittleEndian>()?),
91        Ok(FieldType::Int16) => JsonValue::from(cursor.read_i16::<LittleEndian>()?),
92        Ok(FieldType::Uint32) => JsonValue::from(cursor.read_u32::<LittleEndian>()?),
93        Ok(FieldType::Int32) => JsonValue::from(cursor.read_i32::<LittleEndian>()?),
94        Ok(_) | Err(_) => {
95            return Ok((
96                t,
97                OffsetOrInline::Offset(match compound_size {
98                    CompoundSize::Small => u32::from(cursor.read_u16::<LittleEndian>()?),
99                    CompoundSize::Large => cursor.read_u32::<LittleEndian>()?,
100                }),
101            ));
102        }
103    };
104    Ok((t, OffsetOrInline::Inline(inlined_value)))
105}
106
107fn parse_compound(
108    mut cursor: &mut Cursor<Vec<u8>>,
109    compound_size: CompoundSize,
110    compound_type: CompoundType,
111) -> Result<JsonValue, JsonbParseError> {
112    let data_length = cursor.get_ref().len();
113    let offset_size = match compound_size {
114        CompoundSize::Small => 2,
115        CompoundSize::Large => 4,
116    };
117    if data_length < offset_size {
118        return Ok(JsonValue::Null);
119    }
120    let (count, size) = match compound_size {
121        CompoundSize::Small => (
122            u32::from(cursor.read_u16::<LittleEndian>()?) as usize,
123            u32::from(cursor.read_u16::<LittleEndian>()?) as usize,
124        ),
125        CompoundSize::Large => (
126            cursor.read_u32::<LittleEndian>()? as usize,
127            cursor.read_u32::<LittleEndian>()? as usize,
128        ),
129    };
130    if data_length < size as usize {
131        return Ok(JsonValue::Null);
132    }
133    let (key_entry_size, value_entry_size) = match compound_size {
134        CompoundSize::Small => (4, 3),
135        CompoundSize::Large => (6, 5),
136    };
137    let mut header_size = 2 * offset_size + count * value_entry_size;
138    header_size += match compound_type {
139        CompoundType::Array => 0,
140        CompoundType::Object => count * key_entry_size,
141    };
142    if header_size > size as usize {
143        return Ok(JsonValue::Null);
144    }
145    let keys = match compound_type {
146        CompoundType::Array => None,
147        CompoundType::Object => {
148            let mut rsl = vec![];
149            for i in 0..count {
150                let entry_offset = 2 * offset_size + key_entry_size * i;
151                cursor.set_position(entry_offset as u64 + 1);
152                let key_offset = match compound_size {
153                    CompoundSize::Small => cursor.read_u16::<LittleEndian>()? as usize,
154                    CompoundSize::Large => cursor.read_u32::<LittleEndian>()? as usize,
155                };
156                let key_length = cursor.read_u16::<LittleEndian>()? as usize;
157                if data_length < (key_offset) as usize + key_length {
158                    return Ok(JsonValue::Null);
159                }
160                cursor.set_position(key_offset as u64 + 1);
161                let key = packet_helpers::read_nbytes(&mut cursor, key_length)?;
162                let key = String::from_utf8_lossy(&key).into_owned();
163                rsl.push(key);
164            }
165            Some(rsl)
166        }
167    };
168    let values = {
169        let mut rsl = vec![];
170        for i in 0..count {
171            let mut entry_offset = 2 * offset_size + value_entry_size * i;
172            // if isObject {
173            //     entryOffset += keyEntrySize * count
174            // }
175            entry_offset += match compound_type {
176                CompoundType::Array => 0,
177                CompoundType::Object => key_entry_size * count,
178            };
179            let tp_data = cursor.get_ref().get(entry_offset + 1).unwrap();
180            let tp_data_c = (*tp_data).clone();
181            let tp = FieldType::from_byte(*tp_data)?;
182            let is_inline = match tp {
183                FieldType::Uint16 | FieldType::Int16 | FieldType::Literal => true,
184                FieldType::Int32 | FieldType::Uint32 => !match compound_size {
185                    CompoundSize::Small => true,
186                    CompoundSize::Large => false,
187                },
188                _ => false,
189            };
190            if is_inline {
191                let data = &cursor.get_ref()[entry_offset + 1..entry_offset + value_entry_size + 1];
192                let mut cur = Cursor::new(data.to_vec());
193                let value = parse_any(&mut cur)?;
194                rsl.push(value);
195                continue;
196            }
197            cursor.set_position(entry_offset as u64 + 2);
198            let value_offset = match compound_size {
199                CompoundSize::Small => u32::from(cursor.read_u16::<LittleEndian>()?) as usize,
200                CompoundSize::Large => u32::from(cursor.read_u32::<LittleEndian>()?) as usize,
201            };
202            if data_length < value_offset {
203                return Ok(JsonValue::Null);
204            }
205            let mut data = vec![tp_data_c];
206            data.extend_from_slice(&cursor.get_ref()[value_offset + 1..data_length]);
207            let mut cur = Cursor::new(data);
208            let value = parse_any(&mut cur)?;
209            rsl.push(value);
210        }
211        rsl
212    };
213    Ok(if let Some(keys) = keys {
214        let map = JsonMap::from_iter(keys.into_iter().zip(values.into_iter()));
215        JsonValue::Object(map)
216    } else {
217        JsonValue::Array(values)
218    })
219}
220
221fn parse_any(cursor: &mut Cursor<Vec<u8>>) -> Result<JsonValue, JsonbParseError> {
222    let type_indicator = FieldType::from_byte(cursor.read_u8()?)?;
223    parse_any_with_type_indicator(cursor, type_indicator)
224}
225
226fn parse_any_with_type_indicator(
227    mut cursor: &mut Cursor<Vec<u8>>,
228    type_indicator: FieldType,
229) -> Result<JsonValue, JsonbParseError> {
230    match type_indicator {
231        FieldType::Literal => Ok(match cursor.read_u8()? {
232            0x00 => JsonValue::Null,
233            0x01 => JsonValue::Bool(true),
234            0x02 => JsonValue::Bool(false),
235            i => return Err(JsonbParseError::InvalidLiteral(u16::from(i)).into()),
236        }),
237        FieldType::Int16 => {
238            let val = cursor.read_i16::<LittleEndian>()?;
239            Ok(JsonValue::from(val))
240        }
241        FieldType::Uint16 => {
242            let val = cursor.read_u16::<LittleEndian>()?;
243            Ok(JsonValue::from(val))
244        }
245        FieldType::Int32 => {
246            let val = cursor.read_i32::<LittleEndian>()?;
247            Ok(JsonValue::from(val))
248        }
249        FieldType::Uint32 => {
250            let val = cursor.read_u32::<LittleEndian>()?;
251            Ok(JsonValue::from(val))
252        }
253        FieldType::Int64 => {
254            let val = cursor.read_i64::<LittleEndian>()?;
255            Ok(JsonValue::from(val))
256        }
257        FieldType::Uint64 => {
258            let val = cursor.read_u64::<LittleEndian>()?;
259            Ok(JsonValue::from(val))
260        }
261        FieldType::Double => {
262            let val = cursor.read_f64::<LittleEndian>()?;
263            Ok(JsonValue::from(val))
264        }
265        FieldType::JsonString => {
266            let val = packet_helpers::read_variable_length_string(&mut cursor)?;
267            Ok(JsonValue::from(val))
268        }
269        FieldType::SmallObject => {
270            parse_compound(&mut cursor, CompoundSize::Small, CompoundType::Object)
271        }
272        FieldType::LargeObject => {
273            parse_compound(&mut cursor, CompoundSize::Large, CompoundType::Object)
274        }
275        FieldType::SmallArray => {
276            parse_compound(&mut cursor, CompoundSize::Small, CompoundType::Array)
277        }
278        FieldType::LargeArray => {
279            parse_compound(&mut cursor, CompoundSize::Large, CompoundType::Array)
280        }
281        FieldType::Custom => {
282            /* augh apparently MySQL has this "neat" feature where it can encode any MySQL type
283             * inside JSON.
284             *
285             * easiest way to trigger it is with INSERT...SELECT
286             *
287             * it looks like it's only implemented for DECIMAL and the various time types as of
288             * MySQL 8.0
289             */
290            let raw_mysql_column_type = cursor.read_u8()?;
291            let column_type = ColumnType::from_byte(raw_mysql_column_type);
292            let payload = packet_helpers::read_variable_length_bytes(&mut cursor)?;
293            match column_type {
294                ColumnType::NewDecimal(..)
295                | ColumnType::Date
296                | ColumnType::Time
297                | ColumnType::Timestamp
298                | ColumnType::DateTime
299                | ColumnType::DateTime2(..)
300                | ColumnType::Time2(..)
301                | ColumnType::Timestamp2(..) => {
302                    let mut cursor = Cursor::new(payload);
303                    let column_type = column_type.read_metadata(&mut cursor)?;
304                    let value = column_type.read_value(&mut cursor)?;
305                    Ok(value.as_value()?.into_owned())
306                }
307                _ => {
308                    let serialized_payload = base64::encode(&payload);
309                    let mut m = JsonMap::with_capacity(2);
310                    m.insert(
311                        "column_type".to_owned(),
312                        JsonValue::from(raw_mysql_column_type),
313                    );
314                    m.insert(
315                        "base64_payload".to_owned(),
316                        JsonValue::from(serialized_payload),
317                    );
318                    Ok(JsonValue::from(m))
319                }
320            }
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use serde_json::json;
328
329    use super::parse;
330
331    #[test]
332    pub fn test_i16() {
333        let blob = vec![5u8, 1, 0];
334        let parsed = parse(blob).expect("should parse");
335        assert_eq!(parsed, json!(1));
336    }
337
338    #[test]
339    pub fn test_string() {
340        let blob = vec![12u8, 3, 102, 111, 111];
341        let parsed = parse(blob).expect("should parse");
342        assert_eq!(parsed, json!("foo"));
343    }
344
345    #[test]
346    pub fn test_nested() {
347        let blob = vec![
348            0u8, 1, 0, 46, 0, 11, 0, 1, 0, 2, 12, 0, 97, 4, 0, 34, 0, 5, 1, 0, 5, 2, 0, 12, 16, 0,
349            0, 22, 0, 5, 116, 104, 114, 101, 101, 1, 0, 12, 0, 11, 0, 1, 0, 5, 4, 0, 52,
350        ];
351        let parsed = parse(blob).expect("should parse");
352        assert_eq!(parsed, json!({"a":[1,2,"three",{"4":4}]}));
353    }
354
355    #[test]
356    pub fn test_inline_null() {
357        let blob = vec![0u8, 1, 0, 12, 0, 11, 0, 1, 0, 4, 0, 0, 97];
358        let parsed = parse(blob).expect("should parse");
359        assert_eq!(parsed, json!({ "a": null }));
360    }
361
362    #[test]
363    pub fn test_inline_false() {
364        let blob = vec![0, 1, 0, 12, 0, 11, 0, 1, 0, 4, 2, 0, 97];
365        let parsed = parse(blob).expect("should parse");
366        assert_eq!(parsed, json!({"a": false}));
367    }
368
369    #[test]
370    pub fn test_array() {
371        let blob = vec![
372            2, 5, 0, 21, 0, 4, 1, 0, 4, 2, 0, 4, 0, 0, 5, 0, 0, 12, 19, 0, 1, 48,
373        ];
374        let parsed = parse(blob).expect("should parse");
375        assert_eq!(parsed, json!([true, false, null, 0, "0"]));
376    }
377
378    #[test]
379    pub fn test_opaque_decimal() {
380        let blob = vec![15, 246, 3, 2, 2, 138];
381        let parsed = parse(blob).expect("should parse");
382        assert_eq!(parsed, json!({"Decimal":"0.10"}));
383    }
384
385    #[test]
386    pub fn test_opaque_times() {
387        let blob = vec![
388            0, 4, 0, 97, 0, 32, 0, 4, 0, 36, 0, 4, 0, 40, 0, 8, 0, 48, 0, 9, 0, 15, 57, 0, 15, 67,
389            0, 15, 77, 0, 15, 87, 0, 100, 97, 116, 101, 116, 105, 109, 101, 100, 97, 116, 101, 116,
390            105, 109, 101, 116, 105, 109, 101, 115, 116, 97, 109, 112, 10, 8, 0, 0, 0, 0, 0, 188,
391            159, 25, 11, 8, 0, 0, 0, 64, 218, 0, 0, 0, 12, 8, 0, 0, 0, 64, 218, 188, 159, 25, 7, 8,
392            0, 0, 0, 77, 218, 188, 159, 25,
393        ];
394        let parsed = parse(blob).expect("should parse");
395        assert_eq!(
396            parsed,
397            json!({"date": null,"datetime":{"DateTime":{"day":7,"hour":82,"minute":69,"month":78,"second":44,"subsecond":0,"year":184640201}},"time":{"Time":{"hours":0,"minutes":0,"seconds":0,"subseconds":0}},"timestamp":{"Timestamp":{"subsecond":0,"unix_time":1291845632}}})
398        );
399    }
400}