pancake_db_core/encoding/
decoder.rs1use std::marker::PhantomData;
2
3use pancake_db_idl::dml::{FieldValue, RepeatedFieldValue};
4use pancake_db_idl::dml::field_value::Value;
5
6use crate::encoding::byte_reader::ByteReader;
7use crate::errors::{CoreError, CoreResult};
8use crate::primitives::{Atom, Primitive};
9use crate::utils;
10use super::{NULL_BYTE, COUNT_BYTE};
11use crate::rep_levels::RepLevelsAndAtoms;
12
13pub trait Decodable<P: Primitive>: Send + Sync {
14 fn handle_atoms(
15 atoms: Vec<P::A>,
16 depth: u8,
17 byte_idx: usize,
18 ) -> CoreResult<Self> where Self: Sized;
19 fn handle_null(byte_idx: usize) -> Self where Self: Sized;
20 fn combine(
21 outputs: Vec<Self>,
22 depth: u8,
23 byte_idx: usize
24 ) -> Self where Self: Sized;
25}
26
27impl<P: Primitive> Decodable<P> for FieldValue {
28 fn handle_atoms(atoms: Vec<P::A>, _: u8, _: usize) -> CoreResult<FieldValue> {
29 let value = P::try_from_atoms(&atoms)?.to_value();
30 Ok(FieldValue {
31 value: Some(value),
32 })
33 }
34
35 fn handle_null(_: usize) -> FieldValue {
36 FieldValue::default()
37 }
38
39 fn combine(outputs: Vec<FieldValue>, _: u8, _: usize) -> FieldValue {
40 let repeated = RepeatedFieldValue {
41 vals: outputs,
42 };
43 FieldValue {
44 value: Some(Value::ListVal(repeated)),
45 }
46 }
47}
48
49impl<P: Primitive> Decodable<P> for RepLevelsAndAtoms<P::A> {
50 fn handle_atoms(
51 atoms: Vec<P::A>,
52 depth: u8,
53 _: usize
54 ) -> CoreResult<RepLevelsAndAtoms<P::A>> {
55 let levels = if P::IS_ATOMIC {
56 vec![depth + 1]
57 } else {
58 let mut res = vec![depth + 2; atoms.len()];
59 res.push(depth + 1);
60 res
61 };
62 Ok(RepLevelsAndAtoms {
63 levels,
64 atoms,
65 })
66 }
67
68 fn handle_null(_: usize) -> RepLevelsAndAtoms<P::A> {
69 RepLevelsAndAtoms {
70 levels: vec![0],
71 atoms: vec![],
72 }
73 }
74
75 fn combine(
76 outputs: Vec<RepLevelsAndAtoms<P::A>>,
77 depth: u8,
78 _: usize,
79 ) -> RepLevelsAndAtoms<P::A> {
80 let mut res = RepLevelsAndAtoms::<P::A>::default();
81 for output in &outputs {
82 res.extend(output);
83 }
84 res.levels.push(depth + 1);
85 res
86 }
87}
88
89pub type ByteIdx = usize;
91
92impl<P: Primitive> Decodable<P> for ByteIdx {
93 fn handle_atoms(_: Vec<P::A>, _: u8, byte_idx: usize) -> CoreResult<Self> where Self: Sized {
94 Ok(byte_idx)
95 }
96
97 fn handle_null(byte_idx: usize) -> Self where Self: Sized {
98 byte_idx
99 }
100
101 fn combine(_: Vec<Self>, _: u8, byte_idx: usize) -> Self where Self: Sized {
102 byte_idx
103 }
104}
105
106pub trait Decoder<Output> {
107 fn decode_limited(&self, bytes: &[u8], limit: usize) -> CoreResult<Vec<Output>>;
108 fn decode(&self, bytes: &[u8]) -> CoreResult<Vec<Output>> {
109 self.decode_limited(bytes, usize::MAX)
110 }
111}
112
113#[derive(Clone, Debug)]
114pub struct DecoderImpl<P: Primitive, H> where H: Decodable<P> {
115 nested_list_depth: u8,
116 _phantom_p: PhantomData<P>,
117 _phantom_h: PhantomData<H>,
118}
119
120impl<P: Primitive, H> Decoder<H> for DecoderImpl<P, H> where H: Decodable<P> {
121 fn decode_limited(
122 &self,
123 bytes: &[u8],
124 limit: usize
125 ) -> CoreResult<Vec<H>> {
126 let mut res = Vec::new();
127 let mut reader = ByteReader::new(bytes);
128 while !reader.complete() && res.len() < limit {
129 let b0 = reader.read_one()?;
130 if b0 == NULL_BYTE {
131 res.push(H::handle_null(reader.get_byte_idx()));
132 } else if b0 == COUNT_BYTE {
133 let count_bytes = utils::try_byte_array::<4>(&reader.unescaped_read_n(4)?)?;
134 let count = u32::from_be_bytes(count_bytes) as usize;
135 if res.is_empty() {
136 for _ in 0..count {
137 res.push(H::handle_null(reader.get_byte_idx()));
138 }
139 } else if res.len() != count {
140 return Err(CoreError::corrupt("in-file count did not match number of decoded entries"));
141 }
142 } else {
143 reader.back_one();
144 let v = self.decode_value(&mut reader, 0)?;
145 res.push(v);
146 }
147 }
148 Ok(res)
149 }
150}
151
152impl<P: Primitive, H> DecoderImpl<P, H> where H: Decodable<P> {
153 pub fn new(escape_depth: u8) -> Self {
154 Self {
155 nested_list_depth: escape_depth,
156 _phantom_p: PhantomData,
157 _phantom_h: PhantomData,
158 }
159 }
160
161 fn decode_value(&self, reader: &mut ByteReader, current_depth: u8) -> CoreResult<H> {
162 if current_depth == self.nested_list_depth {
163 let atoms = if P::IS_ATOMIC {
164 let bytes = reader.unescaped_read_n(P::A::BYTE_SIZE)?;
165 vec![P::A::try_from_bytes(&bytes)?]
166 } else {
167 let len = reader.unescaped_read_u16()? as usize;
168 let mut atoms = Vec::with_capacity(len);
169 for _ in 0..len {
170 let bytes = reader.unescaped_read_n(P::A::BYTE_SIZE)?;
171 atoms.push(P::A::try_from_bytes(&bytes)?);
172 }
173 atoms
174 };
175 H::handle_atoms(atoms, current_depth, reader.get_byte_idx())
176 } else {
177 let len = reader.unescaped_read_u16()? as usize;
178 let mut outputs = Vec::with_capacity(len);
179 for _ in 0..len {
180 let child = self.decode_value(reader, current_depth + 1)?;
181 outputs.push(child);
182 }
183 Ok(H::combine(outputs, current_depth, reader.get_byte_idx()))
184 }
185 }
186}