1use std::io::Cursor;
5use std::iter::FromIterator;
6
7use base64;
8use byteorder::{LittleEndian, ReadBytesExt};
9use serde_json::map::Map as JsonMap;
10use serde_json::Value as JsonValue;
11
12use crate::mysql_binlog::column_types::ColumnType;
13use crate::mysql_binlog::errors::JsonbParseError;
14use crate::mysql_binlog::packet_helpers;
15
16enum FieldType {
17 SmallObject,
18 LargeObject,
19 SmallArray,
20 LargeArray,
21 Literal,
22 Int16,
23 Uint16,
24 Int32,
25 Uint32,
26 Int64,
27 Uint64,
28 Double,
29 JsonString,
30 Custom,
31}
32
33impl FieldType {
34 fn from_byte(u: u8) -> Result<Self, JsonbParseError> {
35 Ok(match u {
36 0x00 => FieldType::SmallObject,
37 0x01 => FieldType::LargeObject,
38 0x02 => FieldType::SmallArray,
39 0x03 => FieldType::LargeArray,
40 0x04 => FieldType::Literal,
41 0x05 => FieldType::Int16,
42 0x06 => FieldType::Uint16,
43 0x07 => FieldType::Int32,
44 0x08 => FieldType::Uint32,
45 0x09 => FieldType::Int64,
46 0x0a => FieldType::Uint64,
47 0x0b => FieldType::Double,
48 0x0c => FieldType::JsonString,
49 0x0f => FieldType::Custom,
50 i => return Err(JsonbParseError::InvalidTypeByte(i)),
51 })
52 }
53}
54
55#[derive(Debug, PartialEq, Eq, Clone, Copy)]
56enum CompoundSize {
57 Small,
58 Large,
59}
60
61#[derive(Debug, PartialEq, Eq, Clone, Copy)]
62enum CompoundType {
63 Object,
64 Array,
65}
66
67pub fn parse(blob: Vec<u8>) -> Result<JsonValue, JsonbParseError> {
68 let mut cursor = Cursor::new(blob);
69 parse_any(&mut cursor)
70}
71
72#[derive(Debug)]
73enum OffsetOrInline {
74 Inline(JsonValue),
75 Offset(u32),
76}
77
78fn parse_maybe_inlined_value(
79 cursor: &mut Cursor<Vec<u8>>,
80 compound_size: CompoundSize,
81) -> Result<(u8, OffsetOrInline), JsonbParseError> {
82 let t = cursor.read_u8()?;
83 let inlined_value = match FieldType::from_byte(t) {
84 Ok(FieldType::Literal) => match cursor.read_u16::<LittleEndian>()? {
85 0x00 => JsonValue::Null,
86 0x01 => JsonValue::Bool(true),
87 0x02 => JsonValue::Bool(false),
88 i => return Err(JsonbParseError::InvalidLiteral(i).into()),
89 },
90 Ok(FieldType::Uint16) => JsonValue::from(cursor.read_u16::<LittleEndian>()?),
91 Ok(FieldType::Int16) => JsonValue::from(cursor.read_i16::<LittleEndian>()?),
92 Ok(FieldType::Uint32) => JsonValue::from(cursor.read_u32::<LittleEndian>()?),
93 Ok(FieldType::Int32) => JsonValue::from(cursor.read_i32::<LittleEndian>()?),
94 Ok(_) | Err(_) => {
95 return Ok((
96 t,
97 OffsetOrInline::Offset(match compound_size {
98 CompoundSize::Small => u32::from(cursor.read_u16::<LittleEndian>()?),
99 CompoundSize::Large => cursor.read_u32::<LittleEndian>()?,
100 }),
101 ));
102 }
103 };
104 Ok((t, OffsetOrInline::Inline(inlined_value)))
105}
106
107fn parse_compound(
108 mut cursor: &mut Cursor<Vec<u8>>,
109 compound_size: CompoundSize,
110 compound_type: CompoundType,
111) -> Result<JsonValue, JsonbParseError> {
112 let data_length = cursor.get_ref().len();
113 let offset_size = match compound_size {
114 CompoundSize::Small => 2,
115 CompoundSize::Large => 4,
116 };
117 if data_length < offset_size {
118 return Ok(JsonValue::Null);
119 }
120 let (count, size) = match compound_size {
121 CompoundSize::Small => (
122 u32::from(cursor.read_u16::<LittleEndian>()?) as usize,
123 u32::from(cursor.read_u16::<LittleEndian>()?) as usize,
124 ),
125 CompoundSize::Large => (
126 cursor.read_u32::<LittleEndian>()? as usize,
127 cursor.read_u32::<LittleEndian>()? as usize,
128 ),
129 };
130 if data_length < size as usize {
131 return Ok(JsonValue::Null);
132 }
133 let (key_entry_size, value_entry_size) = match compound_size {
134 CompoundSize::Small => (4, 3),
135 CompoundSize::Large => (6, 5),
136 };
137 let mut header_size = 2 * offset_size + count * value_entry_size;
138 header_size += match compound_type {
139 CompoundType::Array => 0,
140 CompoundType::Object => count * key_entry_size,
141 };
142 if header_size > size as usize {
143 return Ok(JsonValue::Null);
144 }
145 let keys = match compound_type {
146 CompoundType::Array => None,
147 CompoundType::Object => {
148 let mut rsl = vec![];
149 for i in 0..count {
150 let entry_offset = 2 * offset_size + key_entry_size * i;
151 cursor.set_position(entry_offset as u64 + 1);
152 let key_offset = match compound_size {
153 CompoundSize::Small => cursor.read_u16::<LittleEndian>()? as usize,
154 CompoundSize::Large => cursor.read_u32::<LittleEndian>()? as usize,
155 };
156 let key_length = cursor.read_u16::<LittleEndian>()? as usize;
157 if data_length < (key_offset) as usize + key_length {
158 return Ok(JsonValue::Null);
159 }
160 cursor.set_position(key_offset as u64 + 1);
161 let key = packet_helpers::read_nbytes(&mut cursor, key_length)?;
162 let key = String::from_utf8_lossy(&key).into_owned();
163 rsl.push(key);
164 }
165 Some(rsl)
166 }
167 };
168 let values = {
169 let mut rsl = vec![];
170 for i in 0..count {
171 let mut entry_offset = 2 * offset_size + value_entry_size * i;
172 entry_offset += match compound_type {
176 CompoundType::Array => 0,
177 CompoundType::Object => key_entry_size * count,
178 };
179 let tp_data = cursor.get_ref().get(entry_offset + 1).unwrap();
180 let tp_data_c = (*tp_data).clone();
181 let tp = FieldType::from_byte(*tp_data)?;
182 let is_inline = match tp {
183 FieldType::Uint16 | FieldType::Int16 | FieldType::Literal => true,
184 FieldType::Int32 | FieldType::Uint32 => !match compound_size {
185 CompoundSize::Small => true,
186 CompoundSize::Large => false,
187 },
188 _ => false,
189 };
190 if is_inline {
191 let data = &cursor.get_ref()[entry_offset + 1..entry_offset + value_entry_size + 1];
192 let mut cur = Cursor::new(data.to_vec());
193 let value = parse_any(&mut cur)?;
194 rsl.push(value);
195 continue;
196 }
197 cursor.set_position(entry_offset as u64 + 2);
198 let value_offset = match compound_size {
199 CompoundSize::Small => u32::from(cursor.read_u16::<LittleEndian>()?) as usize,
200 CompoundSize::Large => u32::from(cursor.read_u32::<LittleEndian>()?) as usize,
201 };
202 if data_length < value_offset {
203 return Ok(JsonValue::Null);
204 }
205 let mut data = vec![tp_data_c];
206 data.extend_from_slice(&cursor.get_ref()[value_offset + 1..data_length]);
207 let mut cur = Cursor::new(data);
208 let value = parse_any(&mut cur)?;
209 rsl.push(value);
210 }
211 rsl
212 };
213 Ok(if let Some(keys) = keys {
214 let map = JsonMap::from_iter(keys.into_iter().zip(values.into_iter()));
215 JsonValue::Object(map)
216 } else {
217 JsonValue::Array(values)
218 })
219}
220
221fn parse_any(cursor: &mut Cursor<Vec<u8>>) -> Result<JsonValue, JsonbParseError> {
222 let type_indicator = FieldType::from_byte(cursor.read_u8()?)?;
223 parse_any_with_type_indicator(cursor, type_indicator)
224}
225
226fn parse_any_with_type_indicator(
227 mut cursor: &mut Cursor<Vec<u8>>,
228 type_indicator: FieldType,
229) -> Result<JsonValue, JsonbParseError> {
230 match type_indicator {
231 FieldType::Literal => Ok(match cursor.read_u8()? {
232 0x00 => JsonValue::Null,
233 0x01 => JsonValue::Bool(true),
234 0x02 => JsonValue::Bool(false),
235 i => return Err(JsonbParseError::InvalidLiteral(u16::from(i)).into()),
236 }),
237 FieldType::Int16 => {
238 let val = cursor.read_i16::<LittleEndian>()?;
239 Ok(JsonValue::from(val))
240 }
241 FieldType::Uint16 => {
242 let val = cursor.read_u16::<LittleEndian>()?;
243 Ok(JsonValue::from(val))
244 }
245 FieldType::Int32 => {
246 let val = cursor.read_i32::<LittleEndian>()?;
247 Ok(JsonValue::from(val))
248 }
249 FieldType::Uint32 => {
250 let val = cursor.read_u32::<LittleEndian>()?;
251 Ok(JsonValue::from(val))
252 }
253 FieldType::Int64 => {
254 let val = cursor.read_i64::<LittleEndian>()?;
255 Ok(JsonValue::from(val))
256 }
257 FieldType::Uint64 => {
258 let val = cursor.read_u64::<LittleEndian>()?;
259 Ok(JsonValue::from(val))
260 }
261 FieldType::Double => {
262 let val = cursor.read_f64::<LittleEndian>()?;
263 Ok(JsonValue::from(val))
264 }
265 FieldType::JsonString => {
266 let val = packet_helpers::read_variable_length_string(&mut cursor)?;
267 Ok(JsonValue::from(val))
268 }
269 FieldType::SmallObject => {
270 parse_compound(&mut cursor, CompoundSize::Small, CompoundType::Object)
271 }
272 FieldType::LargeObject => {
273 parse_compound(&mut cursor, CompoundSize::Large, CompoundType::Object)
274 }
275 FieldType::SmallArray => {
276 parse_compound(&mut cursor, CompoundSize::Small, CompoundType::Array)
277 }
278 FieldType::LargeArray => {
279 parse_compound(&mut cursor, CompoundSize::Large, CompoundType::Array)
280 }
281 FieldType::Custom => {
282 let raw_mysql_column_type = cursor.read_u8()?;
291 let column_type = ColumnType::from_byte(raw_mysql_column_type);
292 let payload = packet_helpers::read_variable_length_bytes(&mut cursor)?;
293 match column_type {
294 ColumnType::NewDecimal(..)
295 | ColumnType::Date
296 | ColumnType::Time
297 | ColumnType::Timestamp
298 | ColumnType::DateTime
299 | ColumnType::DateTime2(..)
300 | ColumnType::Time2(..)
301 | ColumnType::Timestamp2(..) => {
302 let mut cursor = Cursor::new(payload);
303 let column_type = column_type.read_metadata(&mut cursor)?;
304 let value = column_type.read_value(&mut cursor)?;
305 Ok(value.as_value()?.into_owned())
306 }
307 _ => {
308 let serialized_payload = base64::encode(&payload);
309 let mut m = JsonMap::with_capacity(2);
310 m.insert(
311 "column_type".to_owned(),
312 JsonValue::from(raw_mysql_column_type),
313 );
314 m.insert(
315 "base64_payload".to_owned(),
316 JsonValue::from(serialized_payload),
317 );
318 Ok(JsonValue::from(m))
319 }
320 }
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use serde_json::json;
328
329 use super::parse;
330
331 #[test]
332 pub fn test_i16() {
333 let blob = vec![5u8, 1, 0];
334 let parsed = parse(blob).expect("should parse");
335 assert_eq!(parsed, json!(1));
336 }
337
338 #[test]
339 pub fn test_string() {
340 let blob = vec![12u8, 3, 102, 111, 111];
341 let parsed = parse(blob).expect("should parse");
342 assert_eq!(parsed, json!("foo"));
343 }
344
345 #[test]
346 pub fn test_nested() {
347 let blob = vec![
348 0u8, 1, 0, 46, 0, 11, 0, 1, 0, 2, 12, 0, 97, 4, 0, 34, 0, 5, 1, 0, 5, 2, 0, 12, 16, 0,
349 0, 22, 0, 5, 116, 104, 114, 101, 101, 1, 0, 12, 0, 11, 0, 1, 0, 5, 4, 0, 52,
350 ];
351 let parsed = parse(blob).expect("should parse");
352 assert_eq!(parsed, json!({"a":[1,2,"three",{"4":4}]}));
353 }
354
355 #[test]
356 pub fn test_inline_null() {
357 let blob = vec![0u8, 1, 0, 12, 0, 11, 0, 1, 0, 4, 0, 0, 97];
358 let parsed = parse(blob).expect("should parse");
359 assert_eq!(parsed, json!({ "a": null }));
360 }
361
362 #[test]
363 pub fn test_inline_false() {
364 let blob = vec![0, 1, 0, 12, 0, 11, 0, 1, 0, 4, 2, 0, 97];
365 let parsed = parse(blob).expect("should parse");
366 assert_eq!(parsed, json!({"a": false}));
367 }
368
369 #[test]
370 pub fn test_array() {
371 let blob = vec![
372 2, 5, 0, 21, 0, 4, 1, 0, 4, 2, 0, 4, 0, 0, 5, 0, 0, 12, 19, 0, 1, 48,
373 ];
374 let parsed = parse(blob).expect("should parse");
375 assert_eq!(parsed, json!([true, false, null, 0, "0"]));
376 }
377
378 #[test]
379 pub fn test_opaque_decimal() {
380 let blob = vec![15, 246, 3, 2, 2, 138];
381 let parsed = parse(blob).expect("should parse");
382 assert_eq!(parsed, json!({"Decimal":"0.10"}));
383 }
384
385 #[test]
386 pub fn test_opaque_times() {
387 let blob = vec![
388 0, 4, 0, 97, 0, 32, 0, 4, 0, 36, 0, 4, 0, 40, 0, 8, 0, 48, 0, 9, 0, 15, 57, 0, 15, 67,
389 0, 15, 77, 0, 15, 87, 0, 100, 97, 116, 101, 116, 105, 109, 101, 100, 97, 116, 101, 116,
390 105, 109, 101, 116, 105, 109, 101, 115, 116, 97, 109, 112, 10, 8, 0, 0, 0, 0, 0, 188,
391 159, 25, 11, 8, 0, 0, 0, 64, 218, 0, 0, 0, 12, 8, 0, 0, 0, 64, 218, 188, 159, 25, 7, 8,
392 0, 0, 0, 77, 218, 188, 159, 25,
393 ];
394 let parsed = parse(blob).expect("should parse");
395 assert_eq!(
396 parsed,
397 json!({"date": null,"datetime":{"DateTime":{"day":7,"hour":82,"minute":69,"month":78,"second":44,"subsecond":0,"year":184640201}},"time":{"Time":{"hours":0,"minutes":0,"seconds":0,"subseconds":0}},"timestamp":{"Timestamp":{"subsecond":0,"unix_time":1291845632}}})
398 );
399 }
400}