1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use super::super::delta_bitpacked;
use super::super::delta_length_byte_array;

/// Decodes according to [Delta strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-strings-delta_byte_array--7),
/// prefixes, lengths and values
/// # Implementation
/// This struct does not allocate on the heap.
#[derive(Debug)]
pub struct Decoder<'a> {
    values: &'a [u8],
    prefix_lengths: delta_bitpacked::Decoder<'a>,
}

impl<'a> Decoder<'a> {
    pub fn new(values: &'a [u8]) -> Self {
        let prefix_lengths = delta_bitpacked::Decoder::new(values);
        Self {
            values,
            prefix_lengths,
        }
    }

    pub fn into_lengths(self) -> delta_length_byte_array::Decoder<'a> {
        assert_eq!(self.prefix_lengths.size_hint().0, 0);
        delta_length_byte_array::Decoder::new(&self.values[self.prefix_lengths.consumed_bytes()..])
    }
}

impl<'a> Iterator for Decoder<'a> {
    type Item = u32;

    fn next(&mut self) -> Option<Self::Item> {
        self.prefix_lengths.next().map(|x| x as u32)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_bla() {
        // VALIDATED from spark==3.1.1
        let data = &[
            128, 1, 4, 2, 0, 0, 0, 0, 0, 0, 128, 1, 4, 2, 10, 0, 0, 0, 0, 0, 72, 101, 108, 108,
            111, 87, 111, 114, 108, 100,
            // extra bytes are not from spark, but they should be ignored by the decoder
            // because they are beyond the sum of all lengths.
            1, 2, 3,
        ];
        // result of encoding
        let expected = &["Hello", "World"];
        let expected_lengths = expected.iter().map(|x| x.len() as i32).collect::<Vec<_>>();
        let expected_prefixes = vec![0, 0];
        let expected_values = expected.join("");
        let expected_values = expected_values.as_bytes();

        let mut decoder = Decoder::new(data);
        let prefixes = decoder.by_ref().collect::<Vec<_>>();
        assert_eq!(prefixes, expected_prefixes);

        // move to the lengths
        let mut decoder = decoder.into_lengths();

        let lengths = decoder.by_ref().collect::<Vec<_>>();
        assert_eq!(lengths, expected_lengths);

        // move to the values
        let values = decoder.values();
        assert_eq!(values, expected_values);
    }
}