pancake_db_core/
rep_levels.rs

1use pancake_db_idl::dml::{FieldValue, RepeatedFieldValue};
2use pancake_db_idl::dml::field_value::Value;
3use q_compress::Compressor;
4
5use crate::errors::{CoreError, CoreResult};
6use crate::primitives::{Atom, Primitive};
7
8#[derive(Clone, Debug, Default)]
9pub struct RepLevelsAndAtoms<A: Atom> {
10  pub levels: Vec<u8>,
11  pub atoms: Vec<A>,
12}
13
14impl<A: Atom> RepLevelsAndAtoms<A> {
15  pub fn extend(&mut self, other: &RepLevelsAndAtoms<A>) {
16    self.levels.extend(&other.levels);
17    self.atoms.extend(&other.atoms);
18  }
19}
20
21#[derive(Clone, Debug)]
22pub struct RepLevelsAndBytes {
23  pub levels: Vec<u8>,
24  pub remaining_bytes: Vec<u8>,
25}
26
27pub fn extract_levels_and_atoms<P: Primitive>(
28  fvs: &[FieldValue],
29  schema_depth: u8,
30) -> CoreResult<RepLevelsAndAtoms<P::A>> {
31  let mut res = RepLevelsAndAtoms::<P::A>::default();
32  for fv in fvs {
33    let sub_levels_and_atoms = extract_single_levels_and_atoms::<P>(fv, schema_depth, 0)?;
34    res.extend(&sub_levels_and_atoms);
35  }
36  Ok(res)
37}
38
39pub fn extract_single_levels_and_atoms<P: Primitive>(
40  fv: &FieldValue,
41  schema_depth: u8,
42  traverse_depth: u8
43) -> CoreResult<RepLevelsAndAtoms<P::A>> {
44  match &fv.value {
45    None => {
46      if traverse_depth != 0 {
47        return Err(CoreError::invalid("null value found in nested position"));
48      }
49
50      Ok(RepLevelsAndAtoms {
51        levels: vec![0],
52        atoms: vec![],
53      })
54    },
55    Some(Value::ListVal(repeated)) => {
56      if traverse_depth >= schema_depth {
57        return Err(CoreError::invalid("traversed to deeper than schema depth"));
58      }
59
60      let mut res = RepLevelsAndAtoms::default();
61      for fv in &repeated.vals {
62        let sub_levels_and_atoms = extract_single_levels_and_atoms::<P>(
63          fv,
64          traverse_depth + 1,
65          schema_depth
66        )?;
67        res.extend(&sub_levels_and_atoms);
68      }
69      res.levels.push(traverse_depth + 1);
70      Ok(res)
71    },
72    Some(v) => {
73      if traverse_depth != schema_depth {
74        return Err(CoreError::invalid(
75          &format!(
76            "traverse depth of {} does not match schema depth of {}",
77            traverse_depth,
78            schema_depth
79          )
80        ))
81      }
82
83      let atoms = P::try_from_value(v)?.to_atoms();
84      let res = if P::IS_ATOMIC {
85        RepLevelsAndAtoms {
86          levels: vec![schema_depth + 1],
87          atoms: vec![atoms[0]],
88        }
89      } else {
90        let mut levels = atoms.iter()
91          .map(|_| schema_depth + 2)
92          .collect::<Vec<u8>>();
93        levels.push(schema_depth + 1);
94        RepLevelsAndAtoms {
95          levels,
96          atoms
97        }
98      };
99
100      Ok(res)
101    },
102  }
103}
104
105pub fn compress_rep_levels(rep_levels: Vec<u8>) -> CoreResult<Vec<u8>> {
106  let rep_levels = rep_levels.iter().map(|&l| l as u32).collect::<Vec<u32>>();
107  let compressor = Compressor::<u32>::default();
108  Ok(compressor.simple_compress(&rep_levels))
109}
110
111pub struct AtomNester<P: Primitive> {
112  rep_levels: Vec<u8>,
113  atoms: Vec<P::A>,
114  schema_depth: u8,
115  i: usize,
116  j: usize,
117}
118
119impl<P: Primitive> AtomNester<P> {
120  pub fn from_levels_and_values(rep_levels: Vec<u8>, atoms: Vec<P::A>, schema_depth: u8) -> Self {
121    AtomNester {
122      rep_levels,
123      atoms,
124      schema_depth,
125      i: 0,
126      j: 0,
127    }
128  }
129
130  fn nested_field_value(&mut self, traverse_depth: u8) -> CoreResult<FieldValue> {
131    let mut level = self.rep_levels[self.i];
132    if traverse_depth == 0 && level == 0 {
133      //null
134      self.i += 1;
135      Ok(FieldValue::default())
136    } else if traverse_depth < self.schema_depth {
137      //list
138      let mut res = Vec::new();
139      while level > traverse_depth + 1 {
140        res.push(self.nested_field_value(traverse_depth + 1)?);
141        level = self.rep_levels[self.i];
142      }
143
144      if level == traverse_depth + 1 {
145        self.i += 1;
146      }
147      Ok(FieldValue {
148        value: Some(Value::ListVal(RepeatedFieldValue {
149          vals: res,
150        })),
151      })
152    } else if traverse_depth == self.schema_depth {
153      let start = self.j;
154      if P::IS_ATOMIC {
155        self.i += 1;
156        self.j += 1;
157      } else {
158        while level == self.schema_depth + 2 {
159          self.i += 1;
160          self.j += 1;
161          level = self.rep_levels[self.i];
162        }
163        self.i += 1;
164      };
165      let atoms = &self.atoms[start..self.j];
166      let value = P::try_from_atoms(atoms)?.to_value();
167      Ok(FieldValue {
168        value: Some(value),
169      })
170    } else {
171      Err(CoreError::corrupt("invalid repetition level found"))
172    }
173  }
174
175  pub fn nested_field_values(&mut self) -> CoreResult<Vec<FieldValue>> {
176    let mut res = Vec::new();
177    while self.i < self.rep_levels.len() {
178      res.push(self.nested_field_value(0)?);
179    }
180    Ok(res)
181  }
182}