1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use std::marker::PhantomData;

use pancake_db_idl::dml::FieldValue;
use pancake_db_idl::dml::field_value::Value;

use crate::errors::{CoreResult, CoreError};
use crate::primitives::{Atom, Primitive};
use super::{NULL_BYTE, ESCAPE_BYTE, COUNT_BYTE};

pub trait Encoder: Send + Sync {
  fn encode(&self, values: &[FieldValue]) -> CoreResult<Vec<u8>>;
  fn encode_count(&self, count: u32) -> Vec<u8>;
}

#[derive(Clone, Debug)]
pub struct EncoderImpl<P: Primitive> {
  nested_list_depth: u8,
  _phantom: PhantomData<P>,
}

fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
  let mut res = Vec::new();
  for &b in bytes {
    if b >= NULL_BYTE {
      res.push(ESCAPE_BYTE);
      // we must avoid using the count byte at all so that we can easily read the end
      // of the file without decoding the whole thing, so instead of pushing the byte
      // we escaped, we push its complement
      res.push(!b);
    } else {
      res.push(b);
    }
  }
  res
}

impl<P: Primitive> Encoder for EncoderImpl<P> {
  fn encode(&self, fvs: &[FieldValue]) -> CoreResult<Vec<u8>> {
    let mut res = Vec::new();

    for fv in fvs {
      let maybe_err: CoreResult<()> = match &fv.value {
        Some(value) => {
          let bytes = self.value_bytes(value, 0)?;
          res.extend(bytes);
          Ok(())
        },
        None => {
          res.push(NULL_BYTE);
          Ok(())
        }
      };
      maybe_err?;
    }
    Ok(res)
  }

  fn encode_count(&self, count: u32) -> Vec<u8> {
    let mut res = vec![COUNT_BYTE];
    res.extend(&escape_bytes(&count.to_be_bytes()));
    res
  }
}

impl<P: Primitive> EncoderImpl<P> {
  pub fn new(escape_depth: u8) -> Self {
    Self {
      nested_list_depth: escape_depth,
      _phantom: PhantomData,
    }
  }

  fn value_bytes(&self, v: &Value, traverse_depth: u8) -> CoreResult<Vec<u8>> {
    if traverse_depth == self.nested_list_depth {
      let atoms = P::try_from_value(v)?.to_atoms();
      if P::IS_ATOMIC {
        Ok(escape_bytes(&atoms[0].to_bytes()))
      } else {
        let mut res = Vec::with_capacity(2 + P::A::BYTE_SIZE * atoms.len());
        res.extend((atoms.len() as u16).to_be_bytes());
        for atom in &atoms {
          res.extend(atom.to_bytes());
        }
        Ok(escape_bytes(&res))
      }
    } else {
      match v {
        Value::list_val(l) => {
          let mut res = Vec::new();
          res.extend(escape_bytes(&(l.vals.len() as u16).to_be_bytes()));
          for val in &l.vals {
            let bytes = self.value_bytes(val.value.as_ref().unwrap(), traverse_depth + 1)?;
            res.extend(bytes);
          }
          Ok(res)
        },
        _ => Err(CoreError::invalid("expected a list to traverse but found atomic type"))
      }
    }
  }
}