pancake_db_core/encoding/
decoder.rs

1use std::marker::PhantomData;
2
3use pancake_db_idl::dml::{FieldValue, RepeatedFieldValue};
4use pancake_db_idl::dml::field_value::Value;
5
6use crate::encoding::byte_reader::ByteReader;
7use crate::errors::{CoreError, CoreResult};
8use crate::primitives::{Atom, Primitive};
9use crate::utils;
10use super::{NULL_BYTE, COUNT_BYTE};
11use crate::rep_levels::RepLevelsAndAtoms;
12
13pub trait Decodable<P: Primitive>: Send + Sync {
14  fn handle_atoms(
15    atoms: Vec<P::A>,
16    depth: u8,
17    byte_idx: usize,
18  ) -> CoreResult<Self> where Self: Sized;
19  fn handle_null(byte_idx: usize) -> Self where Self: Sized;
20  fn combine(
21    outputs: Vec<Self>,
22    depth: u8,
23    byte_idx: usize
24  ) -> Self where Self: Sized;
25}
26
27impl<P: Primitive> Decodable<P> for FieldValue {
28  fn handle_atoms(atoms: Vec<P::A>, _: u8, _: usize) -> CoreResult<FieldValue> {
29    let value = P::try_from_atoms(&atoms)?.to_value();
30    Ok(FieldValue {
31      value: Some(value),
32    })
33  }
34
35  fn handle_null(_: usize) -> FieldValue {
36    FieldValue::default()
37  }
38
39  fn combine(outputs: Vec<FieldValue>, _: u8, _: usize) -> FieldValue {
40    let repeated = RepeatedFieldValue {
41      vals: outputs,
42    };
43    FieldValue {
44      value: Some(Value::ListVal(repeated)),
45    }
46  }
47}
48
49impl<P: Primitive> Decodable<P> for RepLevelsAndAtoms<P::A> {
50  fn handle_atoms(
51    atoms: Vec<P::A>,
52    depth: u8,
53    _: usize
54  ) -> CoreResult<RepLevelsAndAtoms<P::A>> {
55    let levels = if P::IS_ATOMIC {
56      vec![depth + 1]
57    } else {
58      let mut res = vec![depth + 2; atoms.len()];
59      res.push(depth + 1);
60      res
61    };
62    Ok(RepLevelsAndAtoms {
63      levels,
64      atoms,
65    })
66  }
67
68  fn handle_null(_: usize) -> RepLevelsAndAtoms<P::A> {
69    RepLevelsAndAtoms {
70      levels: vec![0],
71      atoms: vec![],
72    }
73  }
74
75  fn combine(
76    outputs: Vec<RepLevelsAndAtoms<P::A>>,
77    depth: u8,
78    _: usize,
79  ) -> RepLevelsAndAtoms<P::A> {
80    let mut res = RepLevelsAndAtoms::<P::A>::default();
81    for output in &outputs {
82      res.extend(output);
83    }
84    res.levels.push(depth + 1);
85    res
86  }
87}
88
89// used in server for "seeking" to a specific element
90pub type ByteIdx = usize;
91
92impl<P: Primitive> Decodable<P> for ByteIdx {
93  fn handle_atoms(_: Vec<P::A>, _: u8, byte_idx: usize) -> CoreResult<Self> where Self: Sized {
94    Ok(byte_idx)
95  }
96
97  fn handle_null(byte_idx: usize) -> Self where Self: Sized {
98    byte_idx
99  }
100
101  fn combine(_: Vec<Self>, _: u8, byte_idx: usize) -> Self where Self: Sized {
102    byte_idx
103  }
104}
105
106pub trait Decoder<Output> {
107  fn decode_limited(&self, bytes: &[u8], limit: usize) -> CoreResult<Vec<Output>>;
108  fn decode(&self, bytes: &[u8]) -> CoreResult<Vec<Output>> {
109    self.decode_limited(bytes, usize::MAX)
110  }
111}
112
113#[derive(Clone, Debug)]
114pub struct DecoderImpl<P: Primitive, H> where H: Decodable<P> {
115  nested_list_depth: u8,
116  _phantom_p: PhantomData<P>,
117  _phantom_h: PhantomData<H>,
118}
119
120impl<P: Primitive, H> Decoder<H> for DecoderImpl<P, H> where H: Decodable<P> {
121  fn decode_limited(
122    &self,
123    bytes: &[u8],
124    limit: usize
125  ) -> CoreResult<Vec<H>> {
126    let mut res = Vec::new();
127    let mut reader = ByteReader::new(bytes);
128    while !reader.complete() && res.len() < limit {
129      let b0 = reader.read_one()?;
130      if b0 == NULL_BYTE {
131        res.push(H::handle_null(reader.get_byte_idx()));
132      } else if b0 == COUNT_BYTE {
133        let count_bytes = utils::try_byte_array::<4>(&reader.unescaped_read_n(4)?)?;
134        let count = u32::from_be_bytes(count_bytes) as usize;
135        if res.is_empty() {
136          for _ in 0..count {
137            res.push(H::handle_null(reader.get_byte_idx()));
138          }
139        } else if res.len() != count {
140          return Err(CoreError::corrupt("in-file count did not match number of decoded entries"));
141        }
142      } else {
143        reader.back_one();
144        let v = self.decode_value(&mut reader, 0)?;
145        res.push(v);
146      }
147    }
148    Ok(res)
149  }
150}
151
152impl<P: Primitive, H> DecoderImpl<P, H> where H: Decodable<P> {
153  pub fn new(escape_depth: u8) -> Self {
154    Self {
155      nested_list_depth: escape_depth,
156      _phantom_p: PhantomData,
157      _phantom_h: PhantomData,
158    }
159  }
160
161  fn decode_value(&self, reader: &mut ByteReader, current_depth: u8) -> CoreResult<H> {
162    if current_depth == self.nested_list_depth {
163      let atoms = if P::IS_ATOMIC {
164        let bytes = reader.unescaped_read_n(P::A::BYTE_SIZE)?;
165        vec![P::A::try_from_bytes(&bytes)?]
166      } else {
167        let len = reader.unescaped_read_u16()? as usize;
168        let mut atoms = Vec::with_capacity(len);
169        for _ in 0..len {
170          let bytes = reader.unescaped_read_n(P::A::BYTE_SIZE)?;
171          atoms.push(P::A::try_from_bytes(&bytes)?);
172        }
173        atoms
174      };
175      H::handle_atoms(atoms, current_depth, reader.get_byte_idx())
176    } else {
177      let len = reader.unescaped_read_u16()? as usize;
178      let mut outputs = Vec::with_capacity(len);
179      for _ in 0..len {
180        let child = self.decode_value(reader, current_depth + 1)?;
181        outputs.push(child);
182      }
183      Ok(H::combine(outputs, current_depth, reader.get_byte_idx()))
184    }
185  }
186}