1use crate::{
2 binlog_error::BinlogError,
3 column::{column_type::ColumnType, column_value::ColumnValue},
4 ext::buf_ext::BufExt,
5};
6use byteorder::{LittleEndian, ReadBytesExt};
7use std::io::{Cursor, Read, Seek, SeekFrom};
8
9use super::{
10 json_formatter::JsonFormatter, json_string_formatter::JsonStringFormatter,
11 value_type::ValueType,
12};
13
14pub struct JsonBinary<'a> {
16 reader: Cursor<&'a [u8]>,
17}
18
19impl JsonBinary<'_> {
20 pub fn parse_as_string(bytes: &[u8]) -> Result<String, BinlogError> {
21 let is_json_string = bytes[0] > 0x0f;
23 if is_json_string {
24 return Ok(String::from_utf8_lossy(bytes).to_string());
25 }
26
27 let mut formatter = JsonStringFormatter::default();
28 Self::parse(bytes, &mut formatter)?;
29 Ok(formatter.get_string())
30 }
31
32 pub fn parse<F: JsonFormatter>(bytes: &[u8], formatter: &mut F) -> Result<(), BinlogError> {
33 let mut binary = JsonBinary {
34 reader: Cursor::new(bytes),
35 };
36 let value_type = binary.read_value_type()?;
37 binary.parse_internal(&value_type, formatter)
38 }
39
40 fn parse_internal<F: JsonFormatter>(
41 &mut self,
42 type_: &ValueType,
43 formatter: &mut F,
44 ) -> Result<(), BinlogError> {
45 match type_ {
46 ValueType::SmallDocument => self.parse_object(true, false, formatter),
47 ValueType::LargeDocument => self.parse_object(false, false, formatter),
48 ValueType::SmallArray => self.parse_object(true, true, formatter),
49 ValueType::LargeArray => self.parse_object(false, true, formatter),
50 ValueType::Literal => self.parse_literal(formatter),
51 ValueType::Int16 => self.parse_int16(formatter),
52 ValueType::Uint16 => self.parse_uint16(formatter),
53 ValueType::Int32 => self.parse_int32(formatter),
54 ValueType::Uint32 => self.parse_uint32(formatter),
55 ValueType::Int64 => self.parse_int64(formatter),
56 ValueType::Uint64 => self.parse_uint64(formatter),
57 ValueType::Double => self.parse_double(formatter),
58 ValueType::String => self.parse_string(formatter),
59 ValueType::Custom => self.parse_opaque(formatter),
60 }
61 }
62
63 fn parse_object<F: JsonFormatter>(
64 &mut self,
65 is_small: bool,
66 is_array: bool,
67 formatter: &mut F,
68 ) -> Result<(), BinlogError> {
69 let object_offset = self.reader.position();
70
71 let num_elements = self.read_unsigned_index(u32::MAX, is_small, "number of elements in")?;
73 let num_bytes = self.read_unsigned_index(u32::MAX, is_small, "size of")?;
74 let value_size = if is_small { 2 } else { 4 };
75
76 let mut keys = Vec::with_capacity(num_elements as usize);
78
79 if !is_array {
80 for _i in 0..num_elements {
81 keys.push(KeyEntry {
82 index: self.read_unsigned_index(num_bytes, is_small, "key offset in")? as u64,
83 length: self.read_uint16()? as usize,
84 name: String::new(),
85 });
86 }
87 }
88
89 let mut entries = Vec::with_capacity(num_elements as usize);
91 for _i in 0..num_elements as usize {
92 let type_ = self.read_value_type()?;
94 let entry_value = match type_ {
95 ValueType::Literal => {
96 let value = self.read_literal()?;
97 self.reader.seek(SeekFrom::Current(value_size - 1))?;
98 Some(DirectEntryValue::Literal(value))
99 }
100 ValueType::Int16 => {
101 let value = Some(DirectEntryValue::Numeric(self.read_int16()? as i64));
102 self.reader.seek(SeekFrom::Current(value_size - 2))?;
103 value
104 }
105 ValueType::Uint16 => {
106 let value = Some(DirectEntryValue::Numeric(self.read_uint16()? as i64));
107 self.reader.seek(SeekFrom::Current(value_size - 2))?;
108 value
109 }
110 ValueType::Int32 => {
111 if !is_small {
112 Some(DirectEntryValue::Numeric(self.read_int32()? as i64))
113 } else {
114 None
115 }
116 }
117 ValueType::Uint32 => {
118 if !is_small {
119 Some(DirectEntryValue::Numeric(self.read_uint32()? as i64))
120 } else {
121 None
122 }
123 }
124 _ => None,
125 };
126
127 if entry_value.is_some() {
128 entries.push(ValueEntry::new(type_).set_value(entry_value));
129 } else {
130 let index = self.read_unsigned_index(num_bytes, is_small, "value offset in")?;
132 entries.push(ValueEntry::new_with_index(type_, index));
133 }
134 }
135
136 if !is_array {
137 for key in keys.iter_mut() {
139 let skip_bytes = key.index + object_offset - self.reader.position();
140 if skip_bytes != 0 {
143 self.reader.seek(SeekFrom::Current(skip_bytes as i64))?;
144 }
145 key.name = self.read_as_string(key.length)?;
146 }
147 }
148
149 if is_array {
150 formatter.begin_array(num_elements)
151 } else {
152 formatter.begin_object(num_elements);
153 }
154
155 for i in 0..num_elements as usize {
157 if i != 0 {
158 formatter.next_entry();
159 }
160
161 if !is_array {
162 formatter.name(&keys[i].name);
163 }
164
165 let entry = &entries[i];
166 if entry.resolved {
167 if let Some(entry_value) = &entry.value {
168 match entry_value {
169 DirectEntryValue::Literal(value) => {
170 if let Some(bool_value) = value {
171 formatter.value_bool(*bool_value);
172 } else {
173 formatter.value_null();
174 }
175 }
176 DirectEntryValue::Numeric(value) => {
177 formatter.value_long(*value);
178 }
179 }
180 } else {
181 formatter.value_null();
182 }
183 } else {
184 self.reader
186 .seek(SeekFrom::Start(object_offset + entry.index as u64))?;
187 self.parse_internal(&entry.value_type, formatter)?;
188 }
189 }
190
191 if is_array {
192 formatter.end_array();
193 } else {
194 formatter.end_object();
195 }
196
197 Ok(())
198 }
199
200 pub fn parse_literal<F: JsonFormatter>(
201 &mut self,
202 formatter: &mut F,
203 ) -> Result<(), BinlogError> {
204 if let Some(value) = self.read_literal()? {
205 formatter.value_bool(value);
206 } else {
207 formatter.value_null();
208 }
209 Ok(())
210 }
211
212 fn parse_int16<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
213 let value = self.read_int16()?;
214 formatter.value_int(value as i32);
215 Ok(())
216 }
217
218 fn parse_uint16<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
219 let value = self.read_uint16()?;
220 formatter.value_int(value as i32);
221 Ok(())
222 }
223
224 fn parse_int32<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
225 let value = self.read_int32()?;
226 formatter.value_int(value);
227 Ok(())
228 }
229
230 fn parse_uint32<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
231 let value = self.read_uint32()?;
232 formatter.value_long(value as i64);
233 Ok(())
234 }
235
236 fn parse_int64<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
237 let value = self.read_int64()?;
238 formatter.value_long(value);
239 Ok(())
240 }
241
242 pub fn parse_uint64<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
243 let value = self.read_uint64()?;
244 formatter.value_big_int(value as i128);
245 Ok(())
246 }
247
248 pub fn parse_double(&mut self, formatter: &mut dyn JsonFormatter) -> Result<(), BinlogError> {
249 let raw_value = self.read_int64()? as u64;
250 let value = f64::from_bits(raw_value);
251 formatter.value_double(value);
252 Ok(())
253 }
254
255 fn parse_string<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
256 let length = self.read_var_int()?;
257 let mut bytes = vec![0; length as usize];
258 self.reader.read_exact(&mut bytes)?;
259 let value = bytes.to_utf8_string();
260 formatter.value_string(&value);
261 Ok(())
262 }
263
264 fn parse_date<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
265 let raw = self.read_int64()?;
266 let value = raw >> 24;
267 let year_month = (value >> 22) % (1 << 17);
268 let year = year_month / 13;
269 let month = year_month % 13;
270 let day = (value >> 17) % (1 << 5);
271 formatter.value_date(year as i32, month as i32, day as i32);
272 Ok(())
273 }
274
275 pub fn parse_time<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
276 let raw = self.read_int64()?;
277 let value = raw >> 24;
278 let negative = value < 0;
279 let hour = (value >> 12) % (1 << 10); let min = (value >> 6) % (1 << 6); let sec = value % (1 << 6); let hour = if negative { -hour } else { hour };
283 let micro_seconds = (raw % (1 << 24)) as u32;
284 formatter.value_time(hour as i32, min as i32, sec as i32, micro_seconds as i32);
285 Ok(())
286 }
287
288 fn parse_datetime<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
289 let raw = self.read_int64()?;
290 let value = raw >> 24;
291 let year_month = ((value >> 22) % (1 << 17)) as i32; let year = year_month / 13;
293 let month = year_month % 13;
294 let day = ((value >> 17) % (1 << 5)) as i32; let hour = ((value >> 12) % (1 << 5)) as i32; let min = ((value >> 6) % (1 << 6)) as i32; let sec = (value % (1 << 6)) as i32; let micro_seconds = (raw % (1 << 24)) as i32;
299 formatter.value_datetime(year, month, day, hour, min, sec, micro_seconds);
300 Ok(())
301 }
302
303 fn parse_decimal<F: JsonFormatter>(
304 &mut self,
305 length: usize,
306 formatter: &mut F,
307 ) -> Result<(), BinlogError> {
308 let precision = self.reader.read_u8()? as usize;
310 let scale = self.reader.read_u8()? as usize;
311
312 let mut buf = vec![0; length - 2];
314 self.reader.read_exact(&mut buf)?;
315 let mut cursor = Cursor::new(&buf);
316 let decimal = ColumnValue::parse_decimal(&mut cursor, precision, scale)?;
317
318 formatter.value_decimal(&decimal);
319 Ok(())
320 }
321
322 fn parse_opaque_value<F: JsonFormatter>(
323 &mut self,
324 type_: &ColumnType,
325 length: usize,
326 formatter: &mut F,
327 ) -> Result<(), BinlogError> {
328 let mut bytes = vec![0; length];
329 self.reader.read_exact(&mut bytes)?;
330 formatter.value_opaque(type_, &bytes);
331 Ok(())
332 }
333
334 pub fn parse_opaque<F: JsonFormatter>(&mut self, formatter: &mut F) -> Result<(), BinlogError> {
335 let custom_type = self.reader.read_u8()?;
337 let type_ = ColumnType::from_code(custom_type);
338 let length = self.read_var_int()? as usize;
340
341 match type_ {
342 ColumnType::Decimal | ColumnType::NewDecimal => {
343 self.parse_decimal(length, formatter)?
344 }
345 ColumnType::Date => self.parse_date(formatter)?,
346 ColumnType::Time | ColumnType::Time2 => self.parse_time(formatter)?,
347 ColumnType::DateTime
348 | ColumnType::DateTime2
349 | ColumnType::TimeStamp
350 | ColumnType::TimeStamp2 => self.parse_datetime(formatter)?,
351 _ => self.parse_opaque_value(&type_, length, formatter)?,
352 }
353 Ok(())
354 }
355
356 fn read_unsigned_index(
357 &mut self,
358 max_value: u32,
359 is_small: bool,
360 desc: &str,
361 ) -> Result<u32, BinlogError> {
362 let result = if is_small {
363 self.read_uint16()? as u32
364 } else {
365 self.read_uint32()?
366 };
367
368 if result > max_value {
369 return Err(BinlogError::ParseJsonError(format!(
370 "{}, the JSON document is {} and is too big for the binary form of the document ({})",
371 desc,
372 result,
373 max_value
374 )));
375 }
376
377 Ok(result)
378 }
379
380 fn read_int16(&mut self) -> Result<i16, BinlogError> {
381 Ok(self.reader.read_i16::<LittleEndian>()?)
382 }
383
384 fn read_uint16(&mut self) -> Result<u16, BinlogError> {
385 Ok(self.reader.read_u16::<LittleEndian>()?)
386 }
387
388 fn read_int32(&mut self) -> Result<i32, BinlogError> {
389 Ok(self.reader.read_i32::<LittleEndian>()?)
390 }
391
392 fn read_uint32(&mut self) -> Result<u32, BinlogError> {
393 Ok(self.reader.read_u32::<LittleEndian>()?)
394 }
395
396 fn read_int64(&mut self) -> Result<i64, BinlogError> {
397 Ok(self.reader.read_i64::<LittleEndian>()?)
398 }
399
400 fn read_uint64(&mut self) -> Result<u64, BinlogError> {
401 Ok(self.reader.read_u64::<LittleEndian>()?)
402 }
403
404 fn read_as_string(&mut self, length: usize) -> Result<String, BinlogError> {
405 let mut bytes = vec![0; length];
406 self.reader.read_exact(&mut bytes)?;
407 Ok(bytes.to_utf8_string())
408 }
409
410 fn read_var_int(&mut self) -> Result<i32, BinlogError> {
411 let mut length: i32 = 0;
412 for i in 0..5 {
413 let b = self.reader.read_u8()? as i32;
414 length |= (b & 0x7F) << (7 * i);
415 if (b & 0x80) == 0 {
416 return Ok(length);
417 }
418 }
419
420 Err(BinlogError::ParseJsonError(
421 "Unexpected byte sequence".into(),
422 ))
423 }
424
425 fn read_literal(&mut self) -> Result<Option<bool>, BinlogError> {
426 let b = self.reader.read_u8()?;
427 match b {
428 0x00 => Ok(None),
429 0x01 => Ok(Some(true)),
430 0x02 => Ok(Some(false)),
431 _ => Err(BinlogError::ParseJsonError(format!(
432 "Unexpected value: '{}' for literal",
433 self.as_hex(b)
434 ))),
435 }
436 }
437
438 fn read_value_type(&mut self) -> Result<ValueType, BinlogError> {
439 let b = self.reader.read_u8()?;
440 if let Some(result) = ValueType::by_code(b) {
441 Ok(result)
442 } else {
443 Err(BinlogError::ParseJsonError(format!(
444 "Unknown value type code: '{}'",
445 self.as_hex(b)
446 )))
447 }
448 }
449
450 fn as_hex(&mut self, b: u8) -> String {
451 format!("{:02X} ", b)
452 }
453}
454
455#[derive(Default)]
456struct KeyEntry {
457 pub index: u64,
458 pub length: usize,
459 pub name: String,
460}
461
462struct ValueEntry {
463 pub value_type: ValueType,
464 pub index: u32,
465 pub value: Option<DirectEntryValue>,
466 pub resolved: bool,
467}
468
469enum DirectEntryValue {
470 Literal(Option<bool>),
471 Numeric(i64),
472}
473
474impl ValueEntry {
475 fn new(value_type: ValueType) -> Self {
476 Self {
477 value_type,
478 index: 0,
479 value: None,
480 resolved: false,
481 }
482 }
483
484 fn new_with_index(value_type: ValueType, index: u32) -> Self {
485 Self {
486 value_type,
487 index,
488 value: None,
489 resolved: false,
490 }
491 }
492
493 fn set_value(mut self, value: Option<DirectEntryValue>) -> Self {
494 self.value = value;
495 self.resolved = true;
496 self
497 }
498}