1use std::error;
16use std::fmt::Display;
17use std::io;
18use std::mem::size_of;
19
20use byteorder::BigEndian;
21use byteorder::ReadBytesExt;
22use byteorder::WriteBytesExt;
23
24use crate::records::Header;
25use crate::records::Record;
26use crate::IoResult;
27
28pub(crate) fn err_codec_message<E>(message: E) -> io::Error
29where
30 E: Into<Box<dyn error::Error + Send + Sync>>,
31{
32 io::Error::new(io::ErrorKind::InvalidData, message)
33}
34
35pub(crate) fn err_decode_message_unsupported(version: i16, schemata: &str) -> io::Error {
36 err_codec_message(format!("failed to read version {version} of {schemata}"))
37}
38
39pub(crate) fn err_encode_message_unsupported(version: i16, schemata: &str) -> io::Error {
40 err_codec_message(format!("failed to write version {version} of {schemata}"))
41}
42
43pub(crate) fn err_decode_message_null(field: impl Display) -> io::Error {
44 err_codec_message(format!("non-nullable field {field} was serialized as null"))
45}
46
47pub(crate) fn err_encode_message_null(field: impl Display) -> io::Error {
48 err_codec_message(format!(
49 "non-nullable field {field} to be serialized as null"
50 ))
51}
52
53pub(crate) trait Decoder<T: Sized> {
54 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<T>;
55}
56
57pub(crate) trait Encoder<T> {
58 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: T) -> IoResult<()>;
59 fn calculate_size(&self, value: T) -> usize;
60}
61
62pub(crate) trait FixedSizeEncoder {
63 const SIZE: usize;
64}
65
66pub trait Decodable: Sized {
67 fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self>;
68}
69
70pub trait Encodable: Sized {
71 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()>;
72 fn calculate_size(&self, version: i16) -> usize;
73}
74
75macro_rules! define_ints_codec {
76 ($name:ident, $ty:ty, $write:ident, $read:ident $(,)? $($endian:ident)?) => {
77 #[derive(Debug, Copy, Clone)]
78 pub(crate) struct $name;
79
80 impl Decoder<$ty> for $name {
81 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<$ty> {
82 buf.$read$(::<$endian>)?()
83 }
84 }
85
86 impl Encoder<$ty> for $name {
87 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: $ty) -> IoResult<()> {
88 buf.$write$(::<$endian>)?(value)
89 }
90
91 #[inline]
92 fn calculate_size(&self, _: $ty) -> usize {
93 Self::SIZE
94 }
95 }
96
97 impl Encoder<&$ty> for $name {
98 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &$ty) -> IoResult<()> {
99 self.encode(buf, *value)
100 }
101
102 #[inline]
103 fn calculate_size(&self, _: &$ty) -> usize {
104 Self::SIZE
105 }
106 }
107
108 impl FixedSizeEncoder for $name {
109 const SIZE: usize = size_of::<$ty>();
110 }
111 };
112}
113
114define_ints_codec!(Int8, i8, write_i8, read_i8);
115define_ints_codec!(Int16, i16, write_i16, read_i16, BigEndian);
116define_ints_codec!(Int32, i32, write_i32, read_i32, BigEndian);
117define_ints_codec!(Int64, i64, write_i64, read_i64, BigEndian);
118define_ints_codec!(UInt8, u8, write_u8, read_u8);
119define_ints_codec!(UInt16, u16, write_u16, read_u16, BigEndian);
120define_ints_codec!(UInt32, u32, write_u32, read_u32, BigEndian);
121define_ints_codec!(UInt64, u64, write_u64, read_u64, BigEndian);
122define_ints_codec!(Float32, f32, write_f32, read_f32, BigEndian);
123define_ints_codec!(Float64, f64, write_f64, read_f64, BigEndian);
124
125#[derive(Debug, Copy, Clone)]
126pub(crate) struct Bool;
127
128impl Decoder<bool> for Bool {
129 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<bool> {
130 Ok(buf.read_u8()? != 0)
131 }
132}
133
134impl Encoder<bool> for Bool {
135 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: bool) -> IoResult<()> {
136 buf.write_u8(if value { 1 } else { 0 })
137 }
138
139 #[inline]
140 fn calculate_size(&self, _: bool) -> usize {
141 Self::SIZE
142 }
143}
144
145impl FixedSizeEncoder for Bool {
146 const SIZE: usize = size_of::<bool>();
147}
148
149#[derive(Debug, Copy, Clone)]
150pub(crate) struct Uuid;
151
152impl Decoder<uuid::Uuid> for Uuid {
153 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<uuid::Uuid> {
154 read_uuid(buf)
155 }
156}
157
158impl Encoder<uuid::Uuid> for Uuid {
159 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: uuid::Uuid) -> IoResult<()> {
160 write_uuid(buf, value)
161 }
162
163 fn calculate_size(&self, _: uuid::Uuid) -> usize {
164 Self::SIZE
165 }
166}
167
168impl FixedSizeEncoder for Uuid {
169 const SIZE: usize = 16;
170}
171
172#[derive(Debug, Copy, Clone)]
173pub(crate) struct VarInt;
174
175impl Decoder<i32> for VarInt {
176 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<i32> {
177 read_unsigned_varint(buf)
178 }
179}
180
181impl Encoder<i32> for VarInt {
182 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: i32) -> IoResult<()> {
183 write_unsigned_varint(buf, value)
184 }
185
186 fn calculate_size(&self, value: i32) -> usize {
187 let mut res = 1;
188 let mut v = value;
189 while v >= 0x80 {
190 res += 1;
191 v >>= 7;
192 }
193 debug_assert!(res <= 5);
194 res
195 }
196}
197
198#[derive(Debug, Copy, Clone)]
199pub(crate) struct VarLong;
200
201impl Decoder<i64> for VarLong {
202 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<i64> {
203 read_unsigned_varlong(buf)
204 }
205}
206
207impl Encoder<i64> for VarLong {
208 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: i64) -> IoResult<()> {
209 write_unsigned_varlong(buf, value)
210 }
211
212 fn calculate_size(&self, value: i64) -> usize {
213 let mut res = 1;
214 let mut v = value;
215 while v >= 0x80 {
216 res += 1;
217 v >>= 7;
218 }
219 debug_assert!(res <= 10);
220 res
221 }
222}
223
224#[derive(Debug, Copy, Clone)]
225pub(crate) struct NullableString(pub bool );
226
227impl Decoder<Option<String>> for NullableString {
228 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Option<String>> {
229 let len = if self.0 {
230 VarInt.decode(buf)? - 1
231 } else {
232 Int16.decode(buf)? as i32
233 };
234 Ok(read_bytes(buf, len)?.map(|bs| String::from_utf8_lossy(&bs).into_owned()))
235 }
236}
237
238impl Encoder<Option<&str>> for NullableString {
239 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: Option<&str>) -> IoResult<()> {
240 write_str(buf, value, self.0)
241 }
242
243 fn calculate_size(&self, value: Option<&str>) -> usize {
244 let bytes = value.map(|s| s.as_bytes());
245 let len = bytes.map(|bs| bs.len()).unwrap_or(0);
246 if self.0 {
247 VarInt.calculate_size(len as i32 + 1) + len
248 } else {
249 Int16::SIZE + len
250 }
251 }
252}
253
254impl Encoder<&str> for NullableString {
255 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &str) -> IoResult<()> {
256 self.encode(buf, Some(value))
257 }
258
259 fn calculate_size(&self, value: &str) -> usize {
260 self.calculate_size(Some(value))
261 }
262}
263
264#[derive(Debug, Copy, Clone)]
265pub(crate) struct NullableBytes(pub bool );
266
267impl Decoder<Option<Vec<u8>>> for NullableBytes {
268 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Option<Vec<u8>>> {
269 let len = if self.0 {
270 VarInt.decode(buf)? - 1
271 } else {
272 Int32.decode(buf)?
273 };
274 read_bytes(buf, len)
275 }
276}
277
278impl<T: AsRef<[u8]>> Encoder<Option<&T>> for NullableBytes {
279 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: Option<&T>) -> IoResult<()> {
280 write_bytes(buf, value.map(|s| s.as_ref()), self.0)
281 }
282
283 fn calculate_size(&self, value: Option<&T>) -> usize {
284 let bytes = value.map(|s| s.as_ref());
285 let len = bytes.map(|bs| bs.len()).unwrap_or(0);
286 if self.0 {
287 VarInt.calculate_size(len as i32 + 1) + len
288 } else {
289 Int32::SIZE + len
290 }
291 }
292}
293
294impl<T: AsRef<[u8]>> Encoder<&T> for NullableBytes {
295 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &T) -> IoResult<()> {
296 self.encode(buf, Some(value))
297 }
298
299 fn calculate_size(&self, value: &T) -> usize {
300 self.calculate_size(Some(value))
301 }
302}
303
304#[derive(Debug, Copy, Clone)]
305pub(crate) struct Struct(pub i16 );
306
307impl<T: Decodable> Decoder<T> for Struct {
308 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<T> {
309 T::read(buf, self.0)
310 }
311}
312
313impl<T: Encodable> Encoder<&T> for Struct {
314 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &T) -> IoResult<()> {
315 value.write(buf, self.0)
316 }
317
318 fn calculate_size(&self, value: &T) -> usize {
319 value.calculate_size(self.0)
320 }
321}
322
323#[derive(Debug, Copy, Clone)]
324pub(crate) struct NullableArray<E>(pub E, pub bool );
325
326impl<T, E: Decoder<T>> Decoder<Option<Vec<T>>> for NullableArray<E> {
327 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Option<Vec<T>>> {
328 let len = if self.1 {
329 VarInt.decode(buf)? - 1
330 } else {
331 Int32.decode(buf)?
332 };
333 match len {
334 -1 => Ok(None),
335 n if n >= 0 => {
336 let n = n as usize;
337 let mut result = Vec::with_capacity(n);
338 for _ in 0..n {
339 result.push(self.0.decode(buf)?);
340 }
341 Ok(Some(result))
342 }
343 n => Err(err_codec_message(format!("invalid length: {n}"))),
344 }
345 }
346}
347
348impl<T, E: for<'a> Encoder<&'a T>> Encoder<Option<&[T]>> for NullableArray<E> {
349 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: Option<&[T]>) -> IoResult<()> {
350 match value {
351 None => {
352 if self.1 {
353 VarInt.encode(buf, 0)
354 } else {
355 Int32.encode(buf, -1)
356 }
357 }
358 Some(s) => self.encode(buf, s),
359 }
360 }
361
362 fn calculate_size(&self, value: Option<&[T]>) -> usize {
363 match value {
364 None => {
365 if self.1 {
366 VarInt.calculate_size(0)
367 } else {
368 Int32::SIZE
369 }
370 }
371 Some(ns) => {
372 let mut res = 0;
373 res += if self.1 {
374 VarInt.calculate_size(ns.len() as i32 + 1)
375 } else {
376 Int32::SIZE
377 };
378 for n in ns {
379 res += self.0.calculate_size(n);
380 }
381 res
382 }
383 }
384 }
385}
386
387impl<T, E: for<'a> Encoder<&'a T>> Encoder<&[T]> for NullableArray<E> {
388 fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &[T]) -> IoResult<()> {
389 if self.1 {
390 VarInt.encode(buf, value.len() as i32 + 1)?;
391 } else {
392 Int32.encode(buf, value.len() as i32)?;
393 }
394 for v in value {
395 self.0.encode(buf, v)?;
396 }
397 Ok(())
398 }
399
400 fn calculate_size(&self, value: &[T]) -> usize {
401 self.calculate_size(Some(value))
402 }
403}
404
405#[derive(Debug, Copy, Clone)]
406pub(crate) struct RawTaggedFieldList;
407
408impl Decoder<Vec<RawTaggedField>> for RawTaggedFieldList {
409 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Vec<RawTaggedField>> {
410 RawTaggedFieldList.decode_with(buf, |_, _, _| Ok(false))
411 }
412}
413
414impl Encoder<&[RawTaggedField]> for RawTaggedFieldList {
415 fn encode<B: WriteBytesExt>(&self, buf: &mut B, fields: &[RawTaggedField]) -> IoResult<()> {
416 self.encode_with(buf, 0, fields, |_| Ok(()))
417 }
418
419 fn calculate_size(&self, fields: &[RawTaggedField]) -> usize {
420 self.calculate_size_with(0, 0, fields)
421 }
422}
423
424impl RawTaggedFieldList {
425 pub(crate) fn decode_with<B: ReadBytesExt, F>(
426 &self,
427 buf: &mut B,
428 mut f: F,
429 ) -> IoResult<Vec<RawTaggedField>>
430 where
431 F: FnMut(&mut B, i32, usize) -> IoResult<bool>,
432 {
433 let n = VarInt.decode(buf)?;
434 let mut res = vec![];
435 for _ in 0..n {
436 let tag = VarInt.decode(buf)?;
437 let size = VarInt.decode(buf)? as usize;
438 let consumed = f(buf, tag, size)?;
439 if !consumed {
440 match read_bytes(buf, size as i32)? {
441 None => return Err(err_codec_message("unexpected null data")),
442 Some(data) => res.push(RawTaggedField { tag, data }),
443 }
444 }
445 }
446 Ok(res)
447 }
448
449 pub(crate) fn encode_with<B: WriteBytesExt, F>(
450 &self,
451 buf: &mut B,
452 n: usize, fields: &[RawTaggedField],
454 mut f: F,
455 ) -> IoResult<()>
456 where
457 F: FnMut(&mut B) -> IoResult<()>,
458 {
459 VarInt.encode(buf, (fields.len() + n) as i32)?;
460 f(buf)?;
461 for field in fields {
462 RawTaggedFieldWriter.write_byte_buffer(buf, field.tag, &field.data)?;
463 }
464 Ok(())
465 }
466
467 pub(crate) fn calculate_size_with(
468 &self,
469 n: usize, bs: usize, fields: &[RawTaggedField],
472 ) -> usize {
473 let mut res = 0;
474 res += VarInt.calculate_size((fields.len() + n) as i32);
475 for field in fields {
476 res += VarInt.calculate_size(field.tag);
477 res += VarInt.calculate_size(field.data.len() as i32);
478 res += field.data.len();
479 }
480 res + bs
481 }
482}
483
484#[derive(Debug, Default, Clone)]
485pub struct RawTaggedField {
486 pub tag: i32,
487 pub data: Vec<u8>,
488}
489
490#[derive(Debug, Copy, Clone)]
491pub(crate) struct RawTaggedFieldWriter;
492
493impl RawTaggedFieldWriter {
494 pub(crate) fn write_field<
495 B: WriteBytesExt,
496 T: Copy, E: Encoder<T>,
498 >(
499 &self,
500 buf: &mut B,
501 tag: i32,
502 encoder: E,
503 value: T,
504 ) -> IoResult<()> {
505 VarInt.encode(buf, tag)?;
506 VarInt.encode(buf, encoder.calculate_size(value) as i32)?;
507 encoder.encode(buf, value)?;
508 Ok(())
509 }
510
511 pub(crate) fn calculate_field_size<T, E: Encoder<T>>(
512 &self,
513 tag: i32,
514 encoder: E,
515 value: T,
516 ) -> usize {
517 let size = encoder.calculate_size(value);
518 let mut res = 0;
519 res += VarInt.calculate_size(tag);
520 res += VarInt.calculate_size(size as i32);
521 res + size
522 }
523
524 fn write_byte_buffer<B: WriteBytesExt>(
525 &self,
526 buf: &mut B,
527 tag: i32,
528 bs: &[u8],
529 ) -> IoResult<()> {
530 VarInt.encode(buf, tag)?;
531 VarInt.encode(buf, bs.len() as i32)?;
532 buf.write_all(bs)?;
533 Ok(())
534 }
535}
536
537fn read_uuid<B: ReadBytesExt>(buf: &mut B) -> IoResult<uuid::Uuid> {
538 let msb = buf.read_u64::<BigEndian>()?;
539 let lsb = buf.read_u64::<BigEndian>()?;
540 Ok(uuid::Uuid::from_u64_pair(msb, lsb))
541}
542
543fn write_uuid<B: WriteBytesExt>(buf: &mut B, n: uuid::Uuid) -> IoResult<()> {
544 buf.write_all(n.as_ref())
545}
546
547fn read_unsigned_varint<B: ReadBytesExt>(buf: &mut B) -> IoResult<i32> {
548 let mut res = 0;
549 for i in 0.. {
550 debug_assert!(i < 5); let next = buf.read_u8()? as i32;
552 res |= (next & 0x7F) << (i * 7);
553 if next < 0x80 {
554 break;
555 }
556 }
557 Ok(res)
558}
559
560fn read_unsigned_varlong<B: ReadBytesExt>(buf: &mut B) -> IoResult<i64> {
561 let mut res = 0;
562 for i in 0.. {
563 debug_assert!(i < 10); let next = buf.read_u8()? as i64;
565 res |= (next & 0x7F) << (i * 7);
566 if next < 0x80 {
567 break;
568 }
569 }
570 Ok(res)
571}
572
573fn varint_zigzag(i: i32) -> i32 {
574 (((i as u32) >> 1) as i32) ^ -(i & 1)
575}
576
577fn varlong_zigzag(i: i64) -> i64 {
578 (((i as u64) >> 1) as i64) ^ -(i & 1)
579}
580
581fn read_varint<B: ReadBytesExt>(buf: &mut B) -> IoResult<i32> {
582 read_unsigned_varint(buf).map(varint_zigzag)
583}
584
585fn read_varlong<B: ReadBytesExt>(buf: &mut B) -> IoResult<i64> {
586 read_unsigned_varlong(buf).map(varlong_zigzag)
587}
588
589fn write_unsigned_varint<B: WriteBytesExt>(buf: &mut B, n: i32) -> IoResult<()> {
590 let mut v = n;
591 while v >= 0x80 {
592 buf.write_u8((v as u8) | 0x80)?;
593 v >>= 7;
594 }
595 buf.write_u8(v as u8)
596}
597
598fn write_unsigned_varlong<B: WriteBytesExt>(buf: &mut B, n: i64) -> IoResult<()> {
599 let mut v = n;
600 while v >= 0x80 {
601 buf.write_u8((v as u8) | 0x80)?;
602 v >>= 7;
603 }
604 buf.write_u8(v as u8)
605}
606
607fn read_bytes<B: ReadBytesExt>(buf: &mut B, len: i32) -> IoResult<Option<Vec<u8>>> {
608 match len {
609 -1 => Ok(None),
610 n if n >= 0 => {
611 let n = n as usize;
612 let mut v = vec![0; n];
613 buf.read_exact(&mut v).map_err(|e| {
614 io::Error::new(
615 io::ErrorKind::InvalidData,
616 format!("failed to read {n} bytes: {e}"),
617 )
618 })?;
619 Ok(Some(v))
620 }
621 n => Err(io::Error::new(
622 io::ErrorKind::InvalidData,
623 format!("invalid length: {n}"),
624 )),
625 }
626}
627
628fn write_str<B: WriteBytesExt>(buf: &mut B, str: Option<&str>, flexible: bool) -> IoResult<()> {
629 match str {
630 None => {
631 if flexible {
632 VarInt.encode(buf, 0)?
633 } else {
634 Int16.encode(buf, -1)?
635 }
636 }
637 Some(bs) => {
638 let bs = bs.as_bytes();
639 let len = bs.len();
640 if flexible {
641 VarInt.encode(buf, len as i32 + 1)?;
642 } else {
643 Int16.encode(buf, len as i16)?;
644 }
645 buf.write_all(bs)?;
646 }
647 }
648 Ok(())
649}
650
651fn write_bytes<B: WriteBytesExt>(
652 buf: &mut B,
653 bytes: Option<&[u8]>,
654 flexible: bool,
655) -> IoResult<()> {
656 match bytes {
657 None => {
658 if flexible {
659 VarInt.encode(buf, 0)?
660 } else {
661 Int32.encode(buf, -1)?
662 }
663 }
664 Some(bs) => {
665 let len = bs.len() as i32;
666 if flexible {
667 VarInt.encode(buf, len + 1)?;
668 } else {
669 Int32.encode(buf, len)?;
670 }
671 buf.write_all(bs)?;
672 }
673 }
674 Ok(())
675}
676
677#[derive(Debug, Copy, Clone)]
678pub(crate) struct RecordList;
679
680impl Decoder<Vec<Record>> for RecordList {
681 fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Vec<Record>> {
682 struct Entry(i32, Option<Vec<u8>>);
683 fn read_key_value<B: ReadBytesExt>(buf: &mut B) -> IoResult<(Entry, Entry)> {
684 let key = {
685 let len = read_varint(buf)?;
686 let payload = read_bytes(buf, len)?;
687 Entry(len, payload)
688 };
689
690 let value = {
691 let len = read_varint(buf)?;
692 let payload = read_bytes(buf, len)?;
693 Entry(len, payload)
694 };
695
696 Ok((key, value))
697 }
698
699 let cnt = Int32.decode(buf)?;
700 let mut records = vec![];
701 for _ in 0..cnt {
702 let mut record = Record {
703 len: read_varint(buf)?,
704 attributes: Int8.decode(buf)?,
705 timestamp_delta: read_varlong(buf)?,
706 offset_delta: read_varint(buf)?,
707 ..Default::default()
708 };
709
710 let (key, value) = read_key_value(buf)?;
711 record.key_len = key.0;
712 record.key = key.1;
713 record.value_len = value.0;
714 record.value = value.1;
715
716 let headers_cnt = read_varint(buf)?;
717 for _ in 0..headers_cnt {
718 let (key, value) = read_key_value(buf)?;
719 record.headers.push(Header {
720 key_len: key.0,
721 key: key.1,
722 value_len: value.0,
723 value: value.1,
724 });
725 }
726 records.push(record);
727 }
728 Ok(records)
729 }
730}