vortex_layout/layouts/dict/writer/
mod.rs

1use arcref::ArcRef;
2use bytes::Bytes;
3use vortex_array::{Array, ArrayContext, ArrayRef, ProstMetadata, SerializeMetadata};
4use vortex_btrblocks::BtrBlocksCompressor;
5use vortex_dict::DictEncoding;
6use vortex_dict::builders::{DictConstraints, DictEncoder, dict_encoder};
7use vortex_dtype::{DType, PType};
8use vortex_error::{VortexResult, vortex_bail};
9
10mod repeating;
11
12use crate::layouts::dict::DictLayout;
13use crate::{Layout, LayoutStrategy, LayoutVTableRef, LayoutWriter, LayoutWriterExt};
14
15#[derive(Clone)]
16pub struct DictLayoutOptions {
17    pub constraints: DictConstraints,
18}
19
20impl Default for DictLayoutOptions {
21    fn default() -> Self {
22        Self {
23            constraints: DictConstraints {
24                max_bytes: 1024 * 1024,
25                max_len: u16::MAX as usize,
26            },
27        }
28    }
29}
30
31/// A layout strategy that encodes chunk into values and codes, if found
32/// appropriate by the btrblocks compressor. Current implementation only
33/// checks the first chunk to decide whether to apply dict layout and
34/// encodes chunks into dictionaries. When the dict constraints are hit, a
35/// new dictionary is created.
36#[derive(Clone)]
37pub struct DictStrategy {
38    pub options: DictLayoutOptions,
39    pub codes: ArcRef<dyn LayoutStrategy>,
40    pub values: ArcRef<dyn LayoutStrategy>,
41    pub fallback: ArcRef<dyn LayoutStrategy>,
42}
43
44impl LayoutStrategy for DictStrategy {
45    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
46        if !dict_layout_supported(dtype) {
47            return self.fallback.new_writer(ctx, dtype);
48        }
49        Ok(DelegatingDictLayoutWriter {
50            ctx: ctx.clone(),
51            strategy: self.clone(),
52            dtype: dtype.clone(),
53            writer: None,
54        }
55        .boxed())
56    }
57}
58
59pub fn dict_layout_supported(dtype: &DType) -> bool {
60    matches!(
61        dtype,
62        DType::Primitive(..) | DType::Utf8(_) | DType::Binary(_)
63    )
64}
65
66struct DelegatingDictLayoutWriter {
67    ctx: ArrayContext,
68    strategy: DictStrategy,
69    dtype: DType,
70    writer: Option<Box<dyn LayoutWriter>>,
71}
72
73impl LayoutWriter for DelegatingDictLayoutWriter {
74    fn push_chunk(
75        &mut self,
76        segment_writer: &mut dyn crate::segments::SegmentWriter,
77        chunk: ArrayRef,
78    ) -> VortexResult<()> {
79        assert_eq!(
80            chunk.dtype(),
81            &self.dtype,
82            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
83            chunk.dtype(),
84            self.dtype
85        );
86        match self.writer.as_mut() {
87            Some(writer) => writer.push_chunk(segment_writer, chunk),
88            None => {
89                let compressed = BtrBlocksCompressor.compress(&chunk)?;
90                let mut writer = if !compressed.is_encoding(DictEncoding.id()) {
91                    self.strategy.fallback.new_writer(&self.ctx, &self.dtype)?
92                } else {
93                    repeating::DictLayoutWriter::new(
94                        self.ctx.clone(),
95                        &self.dtype,
96                        self.strategy.clone(),
97                    )
98                    .boxed()
99                };
100                writer.push_chunk(segment_writer, chunk)?;
101                self.writer = Some(writer);
102                Ok(())
103            }
104        }
105    }
106
107    fn flush(
108        &mut self,
109        segment_writer: &mut dyn crate::segments::SegmentWriter,
110    ) -> VortexResult<()> {
111        match self.writer.as_mut() {
112            None => vortex_bail!("flush called before push_chunk"),
113            Some(writer) => writer.flush(segment_writer),
114        }
115    }
116
117    fn finish(
118        &mut self,
119        segment_writer: &mut dyn crate::segments::SegmentWriter,
120    ) -> VortexResult<Layout> {
121        match self.writer.as_mut() {
122            None => vortex_bail!("finish called before push_chunk"),
123            Some(writer) => writer.finish(segment_writer),
124        }
125    }
126}
127
128#[derive(prost::Message)]
129pub struct DictLayoutMetadata {
130    #[prost(enumeration = "PType", tag = "1")]
131    // i32 is required for proto, use the generated getter to read this field.
132    codes_ptype: i32,
133}
134
135impl DictLayoutMetadata {
136    pub fn new(codes_ptype: PType) -> Self {
137        let mut metadata = Self::default();
138        metadata.set_codes_ptype(codes_ptype);
139        metadata
140    }
141}
142
143fn dict_layout(values: Layout, codes: Layout) -> VortexResult<Layout> {
144    let metadata =
145        Bytes::from(ProstMetadata(DictLayoutMetadata::new(codes.dtype().try_into()?)).serialize());
146    Ok(Layout::new_owned(
147        "dict".into(),
148        LayoutVTableRef::new_ref(&DictLayout),
149        values.dtype().clone(),
150        codes.row_count(),
151        vec![],
152        vec![values, codes],
153        Some(metadata),
154    ))
155}
156
157enum EncodingState {
158    Continue((Box<dyn DictEncoder>, ArrayRef)),
159    // (values, encoded, unencoded)
160    Done((ArrayRef, ArrayRef, ArrayRef)),
161}
162
163fn start_encoding(constraints: &DictConstraints, chunk: &dyn Array) -> VortexResult<EncodingState> {
164    let encoder = dict_encoder(chunk, constraints)?;
165    encode_chunk(encoder, chunk)
166}
167
168fn encode_chunk(
169    mut encoder: Box<dyn DictEncoder>,
170    chunk: &dyn Array,
171) -> VortexResult<EncodingState> {
172    let encoded = encoder.encode(chunk)?;
173    Ok(match remainder(chunk, encoded.len())? {
174        None => EncodingState::Continue((encoder, encoded)),
175        Some(unencoded) => EncodingState::Done((encoder.values()?, encoded, unencoded)),
176    })
177}
178
179fn remainder(array: &dyn Array, encoded_len: usize) -> VortexResult<Option<ArrayRef>> {
180    (encoded_len < array.len())
181        .then(|| array.slice(encoded_len, array.len()))
182        .transpose()
183}