vortex_layout/layouts/dict/writer/
mod.rs

1use arcref::ArcRef;
2use vortex_array::{Array, ArrayContext, ArrayRef};
3use vortex_btrblocks::BtrBlocksCompressor;
4use vortex_dict::DictEncoding;
5use vortex_dict::builders::{DictConstraints, DictEncoder, dict_encoder};
6use vortex_dtype::DType;
7use vortex_error::{VortexResult, vortex_bail};
8
9mod repeating;
10
11use crate::{LayoutRef, LayoutStrategy, LayoutWriter, LayoutWriterExt};
12
13#[derive(Clone)]
14pub struct DictLayoutOptions {
15    pub constraints: DictConstraints,
16}
17
18impl Default for DictLayoutOptions {
19    fn default() -> Self {
20        Self {
21            constraints: DictConstraints {
22                max_bytes: 1024 * 1024,
23                max_len: u16::MAX as usize,
24            },
25        }
26    }
27}
28
29/// A layout strategy that encodes chunk into values and codes, if found
30/// appropriate by the btrblocks compressor. Current implementation only
31/// checks the first chunk to decide whether to apply dict layout and
32/// encodes chunks into dictionaries. When the dict constraints are hit, a
33/// new dictionary is created.
34#[derive(Clone)]
35pub struct DictStrategy {
36    pub options: DictLayoutOptions,
37    pub codes: ArcRef<dyn LayoutStrategy>,
38    pub values: ArcRef<dyn LayoutStrategy>,
39    pub fallback: ArcRef<dyn LayoutStrategy>,
40}
41
42impl LayoutStrategy for DictStrategy {
43    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
44        if !dict_layout_supported(dtype) {
45            return self.fallback.new_writer(ctx, dtype);
46        }
47        Ok(DelegatingDictLayoutWriter {
48            ctx: ctx.clone(),
49            strategy: self.clone(),
50            dtype: dtype.clone(),
51            writer: None,
52        }
53        .boxed())
54    }
55}
56
57pub fn dict_layout_supported(dtype: &DType) -> bool {
58    matches!(
59        dtype,
60        DType::Primitive(..) | DType::Utf8(_) | DType::Binary(_)
61    )
62}
63
64struct DelegatingDictLayoutWriter {
65    ctx: ArrayContext,
66    strategy: DictStrategy,
67    dtype: DType,
68    writer: Option<Box<dyn LayoutWriter>>,
69}
70
71impl LayoutWriter for DelegatingDictLayoutWriter {
72    fn push_chunk(
73        &mut self,
74        segment_writer: &mut dyn crate::segments::SegmentWriter,
75        chunk: ArrayRef,
76    ) -> VortexResult<()> {
77        assert_eq!(
78            chunk.dtype(),
79            &self.dtype,
80            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
81            chunk.dtype(),
82            self.dtype
83        );
84        match self.writer.as_mut() {
85            Some(writer) => writer.push_chunk(segment_writer, chunk),
86            None => {
87                let compressed = BtrBlocksCompressor.compress(&chunk)?;
88                let mut writer = if !compressed.is_encoding(DictEncoding.id()) {
89                    self.strategy.fallback.new_writer(&self.ctx, &self.dtype)?
90                } else {
91                    repeating::DictLayoutWriter::new(
92                        self.ctx.clone(),
93                        &self.dtype,
94                        self.strategy.clone(),
95                    )
96                    .boxed()
97                };
98                writer.push_chunk(segment_writer, chunk)?;
99                self.writer = Some(writer);
100                Ok(())
101            }
102        }
103    }
104
105    fn flush(
106        &mut self,
107        segment_writer: &mut dyn crate::segments::SegmentWriter,
108    ) -> VortexResult<()> {
109        match self.writer.as_mut() {
110            None => vortex_bail!("flush called before push_chunk"),
111            Some(writer) => writer.flush(segment_writer),
112        }
113    }
114
115    fn finish(
116        &mut self,
117        segment_writer: &mut dyn crate::segments::SegmentWriter,
118    ) -> VortexResult<LayoutRef> {
119        match self.writer.as_mut() {
120            None => vortex_bail!("finish called before push_chunk"),
121            Some(writer) => writer.finish(segment_writer),
122        }
123    }
124}
125
126enum EncodingState {
127    Continue((Box<dyn DictEncoder>, ArrayRef)),
128    // (values, encoded, unencoded)
129    Done((ArrayRef, ArrayRef, ArrayRef)),
130}
131
132fn start_encoding(constraints: &DictConstraints, chunk: &dyn Array) -> VortexResult<EncodingState> {
133    let encoder = dict_encoder(chunk, constraints)?;
134    encode_chunk(encoder, chunk)
135}
136
137fn encode_chunk(
138    mut encoder: Box<dyn DictEncoder>,
139    chunk: &dyn Array,
140) -> VortexResult<EncodingState> {
141    let encoded = encoder.encode(chunk)?;
142    Ok(match remainder(chunk, encoded.len())? {
143        None => EncodingState::Continue((encoder, encoded)),
144        Some(unencoded) => EncodingState::Done((encoder.values()?, encoded, unencoded)),
145    })
146}
147
148fn remainder(array: &dyn Array, encoded_len: usize) -> VortexResult<Option<ArrayRef>> {
149    (encoded_len < array.len())
150        .then(|| array.slice(encoded_len, array.len()))
151        .transpose()
152}