1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::sync::Arc;

use arrow::array::{ArrayRef, StringArray};
use snafu::{OptionExt, ResultExt};

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error::{self, Result};
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::variable_length::Values;
use crate::reader::decompress::Decompressor;

pub struct DirectStringIterator {
    values: Box<Values<Decompressor>>,
    lengths: Box<dyn Iterator<Item = Result<i64>>>,
}

impl Iterator for DirectStringIterator {
    type Item = Result<String>;
    fn next(&mut self) -> Option<Self::Item> {
        match self.lengths.next() {
            Some(Ok(length)) => match self.values.next(length as usize) {
                Ok(value) => Some(
                    std::str::from_utf8(value)
                        .map(|x| x.to_string())
                        .context(error::InvalidUft8Snafu),
                ),
                Err(err) => Some(Err(err)),
            },
            Some(Err(err)) => Some(Err(err)),
            None => None,
        }
    }
}

pub fn new_direct_string_iter(column: &Column) -> Result<NullableIterator<String>> {
    let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;

    let values = column
        .stream(Kind::Data)
        .transpose()?
        .map(|reader| Box::new(Values::new(reader, vec![])))
        .context(error::InvalidColumnSnafu { name: &column.name })?;

    let lengths = column
        .stream(Kind::Length)
        .transpose()?
        .map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
        .context(error::InvalidColumnSnafu { name: &column.name })?;

    Ok(NullableIterator {
        present: Box::new(present.into_iter()),
        iter: Box::new(DirectStringIterator { values, lengths }),
    })
}

pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterator<i64>, ArrayRef)> {
    let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;

    // DictionaryData
    let values = column
        .stream(Kind::DictionaryData)
        .transpose()?
        .map(|reader| Box::new(Values::new(reader, vec![])))
        .context(error::InvalidColumnSnafu { name: &column.name })?;

    let lengths = column
        .stream(Kind::Length)
        .transpose()?
        .map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
        .context(error::InvalidColumnSnafu { name: &column.name })?;
    let mut iter = DirectStringIterator { values, lengths };

    let values = iter.try_collect::<Vec<_>>()?;

    let indexes = column
        .stream(Kind::Data)
        .transpose()?
        .map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
        .context(error::InvalidColumnSnafu { name: &column.name })?;

    let dictionary = StringArray::from_iter(values.into_iter().map(Some));

    Ok((
        NullableIterator {
            present: Box::new(present.into_iter()),
            iter: Box::new(indexes),
        },
        Arc::new(dictionary),
    ))
}

pub enum StringDecoder {
    Direct(NullableIterator<String>),
    Dictionary((NullableIterator<i64>, ArrayRef)),
}

impl StringDecoder {
    pub fn new(column: &Column) -> Result<Self> {
        match column.encoding().kind() {
            ColumnEncodingKind::DirectV2 => {
                Ok(StringDecoder::Direct(new_direct_string_iter(column)?))
            }
            ColumnEncodingKind::DictionaryV2 => Ok(StringDecoder::Dictionary(
                new_arrow_dict_string_decoder(column)?,
            )),
            other => unimplemented!("{other:?}"),
        }
    }
}