vortex_layout/layouts/dict/writer/
mod.rs1use arcref::ArcRef;
2use vortex_array::{Array, ArrayContext, ArrayRef};
3use vortex_btrblocks::BtrBlocksCompressor;
4use vortex_dict::DictEncoding;
5use vortex_dict::builders::{DictConstraints, DictEncoder, dict_encoder};
6use vortex_dtype::DType;
7use vortex_error::{VortexResult, vortex_bail};
8
9mod repeating;
10
11use crate::{LayoutRef, LayoutStrategy, LayoutWriter, LayoutWriterExt};
12
13#[derive(Clone)]
14pub struct DictLayoutOptions {
15 pub constraints: DictConstraints,
16}
17
18impl Default for DictLayoutOptions {
19 fn default() -> Self {
20 Self {
21 constraints: DictConstraints {
22 max_bytes: 1024 * 1024,
23 max_len: u16::MAX as usize,
24 },
25 }
26 }
27}
28
29#[derive(Clone)]
35pub struct DictStrategy {
36 pub options: DictLayoutOptions,
37 pub codes: ArcRef<dyn LayoutStrategy>,
38 pub values: ArcRef<dyn LayoutStrategy>,
39 pub fallback: ArcRef<dyn LayoutStrategy>,
40}
41
42impl LayoutStrategy for DictStrategy {
43 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
44 if !dict_layout_supported(dtype) {
45 return self.fallback.new_writer(ctx, dtype);
46 }
47 Ok(DelegatingDictLayoutWriter {
48 ctx: ctx.clone(),
49 strategy: self.clone(),
50 dtype: dtype.clone(),
51 writer: None,
52 }
53 .boxed())
54 }
55}
56
57pub fn dict_layout_supported(dtype: &DType) -> bool {
58 matches!(
59 dtype,
60 DType::Primitive(..) | DType::Utf8(_) | DType::Binary(_)
61 )
62}
63
64struct DelegatingDictLayoutWriter {
65 ctx: ArrayContext,
66 strategy: DictStrategy,
67 dtype: DType,
68 writer: Option<Box<dyn LayoutWriter>>,
69}
70
71impl LayoutWriter for DelegatingDictLayoutWriter {
72 fn push_chunk(
73 &mut self,
74 segment_writer: &mut dyn crate::segments::SegmentWriter,
75 chunk: ArrayRef,
76 ) -> VortexResult<()> {
77 assert_eq!(
78 chunk.dtype(),
79 &self.dtype,
80 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
81 chunk.dtype(),
82 self.dtype
83 );
84 match self.writer.as_mut() {
85 Some(writer) => writer.push_chunk(segment_writer, chunk),
86 None => {
87 let compressed = BtrBlocksCompressor.compress(&chunk)?;
88 let mut writer = if !compressed.is_encoding(DictEncoding.id()) {
89 self.strategy.fallback.new_writer(&self.ctx, &self.dtype)?
90 } else {
91 repeating::DictLayoutWriter::new(
92 self.ctx.clone(),
93 &self.dtype,
94 self.strategy.clone(),
95 )
96 .boxed()
97 };
98 writer.push_chunk(segment_writer, chunk)?;
99 self.writer = Some(writer);
100 Ok(())
101 }
102 }
103 }
104
105 fn flush(
106 &mut self,
107 segment_writer: &mut dyn crate::segments::SegmentWriter,
108 ) -> VortexResult<()> {
109 match self.writer.as_mut() {
110 None => vortex_bail!("flush called before push_chunk"),
111 Some(writer) => writer.flush(segment_writer),
112 }
113 }
114
115 fn finish(
116 &mut self,
117 segment_writer: &mut dyn crate::segments::SegmentWriter,
118 ) -> VortexResult<LayoutRef> {
119 match self.writer.as_mut() {
120 None => vortex_bail!("finish called before push_chunk"),
121 Some(writer) => writer.finish(segment_writer),
122 }
123 }
124}
125
126enum EncodingState {
127 Continue((Box<dyn DictEncoder>, ArrayRef)),
128 Done((ArrayRef, ArrayRef, ArrayRef)),
130}
131
132fn start_encoding(constraints: &DictConstraints, chunk: &dyn Array) -> VortexResult<EncodingState> {
133 let encoder = dict_encoder(chunk, constraints)?;
134 encode_chunk(encoder, chunk)
135}
136
137fn encode_chunk(
138 mut encoder: Box<dyn DictEncoder>,
139 chunk: &dyn Array,
140) -> VortexResult<EncodingState> {
141 let encoded = encoder.encode(chunk)?;
142 Ok(match remainder(chunk, encoded.len())? {
143 None => EncodingState::Continue((encoder, encoded)),
144 Some(unencoded) => EncodingState::Done((encoder.values()?, encoded, unencoded)),
145 })
146}
147
148fn remainder(array: &dyn Array, encoded_len: usize) -> VortexResult<Option<ArrayRef>> {
149 (encoded_len < array.len())
150 .then(|| array.slice(encoded_len, array.len()))
151 .transpose()
152}