pancake_db_core/
rep_levels.rs1use pancake_db_idl::dml::{FieldValue, RepeatedFieldValue};
2use pancake_db_idl::dml::field_value::Value;
3use q_compress::Compressor;
4
5use crate::errors::{CoreError, CoreResult};
6use crate::primitives::{Atom, Primitive};
7
8#[derive(Clone, Debug, Default)]
9pub struct RepLevelsAndAtoms<A: Atom> {
10 pub levels: Vec<u8>,
11 pub atoms: Vec<A>,
12}
13
14impl<A: Atom> RepLevelsAndAtoms<A> {
15 pub fn extend(&mut self, other: &RepLevelsAndAtoms<A>) {
16 self.levels.extend(&other.levels);
17 self.atoms.extend(&other.atoms);
18 }
19}
20
21#[derive(Clone, Debug)]
22pub struct RepLevelsAndBytes {
23 pub levels: Vec<u8>,
24 pub remaining_bytes: Vec<u8>,
25}
26
27pub fn extract_levels_and_atoms<P: Primitive>(
28 fvs: &[FieldValue],
29 schema_depth: u8,
30) -> CoreResult<RepLevelsAndAtoms<P::A>> {
31 let mut res = RepLevelsAndAtoms::<P::A>::default();
32 for fv in fvs {
33 let sub_levels_and_atoms = extract_single_levels_and_atoms::<P>(fv, schema_depth, 0)?;
34 res.extend(&sub_levels_and_atoms);
35 }
36 Ok(res)
37}
38
39pub fn extract_single_levels_and_atoms<P: Primitive>(
40 fv: &FieldValue,
41 schema_depth: u8,
42 traverse_depth: u8
43) -> CoreResult<RepLevelsAndAtoms<P::A>> {
44 match &fv.value {
45 None => {
46 if traverse_depth != 0 {
47 return Err(CoreError::invalid("null value found in nested position"));
48 }
49
50 Ok(RepLevelsAndAtoms {
51 levels: vec![0],
52 atoms: vec![],
53 })
54 },
55 Some(Value::ListVal(repeated)) => {
56 if traverse_depth >= schema_depth {
57 return Err(CoreError::invalid("traversed to deeper than schema depth"));
58 }
59
60 let mut res = RepLevelsAndAtoms::default();
61 for fv in &repeated.vals {
62 let sub_levels_and_atoms = extract_single_levels_and_atoms::<P>(
63 fv,
64 traverse_depth + 1,
65 schema_depth
66 )?;
67 res.extend(&sub_levels_and_atoms);
68 }
69 res.levels.push(traverse_depth + 1);
70 Ok(res)
71 },
72 Some(v) => {
73 if traverse_depth != schema_depth {
74 return Err(CoreError::invalid(
75 &format!(
76 "traverse depth of {} does not match schema depth of {}",
77 traverse_depth,
78 schema_depth
79 )
80 ))
81 }
82
83 let atoms = P::try_from_value(v)?.to_atoms();
84 let res = if P::IS_ATOMIC {
85 RepLevelsAndAtoms {
86 levels: vec![schema_depth + 1],
87 atoms: vec![atoms[0]],
88 }
89 } else {
90 let mut levels = atoms.iter()
91 .map(|_| schema_depth + 2)
92 .collect::<Vec<u8>>();
93 levels.push(schema_depth + 1);
94 RepLevelsAndAtoms {
95 levels,
96 atoms
97 }
98 };
99
100 Ok(res)
101 },
102 }
103}
104
105pub fn compress_rep_levels(rep_levels: Vec<u8>) -> CoreResult<Vec<u8>> {
106 let rep_levels = rep_levels.iter().map(|&l| l as u32).collect::<Vec<u32>>();
107 let compressor = Compressor::<u32>::default();
108 Ok(compressor.simple_compress(&rep_levels))
109}
110
111pub struct AtomNester<P: Primitive> {
112 rep_levels: Vec<u8>,
113 atoms: Vec<P::A>,
114 schema_depth: u8,
115 i: usize,
116 j: usize,
117}
118
119impl<P: Primitive> AtomNester<P> {
120 pub fn from_levels_and_values(rep_levels: Vec<u8>, atoms: Vec<P::A>, schema_depth: u8) -> Self {
121 AtomNester {
122 rep_levels,
123 atoms,
124 schema_depth,
125 i: 0,
126 j: 0,
127 }
128 }
129
130 fn nested_field_value(&mut self, traverse_depth: u8) -> CoreResult<FieldValue> {
131 let mut level = self.rep_levels[self.i];
132 if traverse_depth == 0 && level == 0 {
133 self.i += 1;
135 Ok(FieldValue::default())
136 } else if traverse_depth < self.schema_depth {
137 let mut res = Vec::new();
139 while level > traverse_depth + 1 {
140 res.push(self.nested_field_value(traverse_depth + 1)?);
141 level = self.rep_levels[self.i];
142 }
143
144 if level == traverse_depth + 1 {
145 self.i += 1;
146 }
147 Ok(FieldValue {
148 value: Some(Value::ListVal(RepeatedFieldValue {
149 vals: res,
150 })),
151 })
152 } else if traverse_depth == self.schema_depth {
153 let start = self.j;
154 if P::IS_ATOMIC {
155 self.i += 1;
156 self.j += 1;
157 } else {
158 while level == self.schema_depth + 2 {
159 self.i += 1;
160 self.j += 1;
161 level = self.rep_levels[self.i];
162 }
163 self.i += 1;
164 };
165 let atoms = &self.atoms[start..self.j];
166 let value = P::try_from_atoms(atoms)?.to_value();
167 Ok(FieldValue {
168 value: Some(value),
169 })
170 } else {
171 Err(CoreError::corrupt("invalid repetition level found"))
172 }
173 }
174
175 pub fn nested_field_values(&mut self) -> CoreResult<Vec<FieldValue>> {
176 let mut res = Vec::new();
177 while self.i < self.rep_levels.len() {
178 res.push(self.nested_field_value(0)?);
179 }
180 Ok(res)
181 }
182}