vortex_layout/layouts/dict/writer/
mod.rs1use arcref::ArcRef;
2use bytes::Bytes;
3use vortex_array::{Array, ArrayContext, ArrayRef, ProstMetadata, SerializeMetadata};
4use vortex_btrblocks::BtrBlocksCompressor;
5use vortex_dict::DictEncoding;
6use vortex_dict::builders::{DictConstraints, DictEncoder, dict_encoder};
7use vortex_dtype::{DType, PType};
8use vortex_error::{VortexResult, vortex_bail};
9
10mod repeating;
11
12use crate::layouts::dict::DictLayout;
13use crate::{Layout, LayoutStrategy, LayoutVTableRef, LayoutWriter, LayoutWriterExt};
14
15#[derive(Clone)]
16pub struct DictLayoutOptions {
17 pub constraints: DictConstraints,
18}
19
20impl Default for DictLayoutOptions {
21 fn default() -> Self {
22 Self {
23 constraints: DictConstraints {
24 max_bytes: 1024 * 1024,
25 max_len: u16::MAX as usize,
26 },
27 }
28 }
29}
30
31#[derive(Clone)]
37pub struct DictStrategy {
38 pub options: DictLayoutOptions,
39 pub codes: ArcRef<dyn LayoutStrategy>,
40 pub values: ArcRef<dyn LayoutStrategy>,
41 pub fallback: ArcRef<dyn LayoutStrategy>,
42}
43
44impl LayoutStrategy for DictStrategy {
45 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
46 if !dict_layout_supported(dtype) {
47 return self.fallback.new_writer(ctx, dtype);
48 }
49 Ok(DelegatingDictLayoutWriter {
50 ctx: ctx.clone(),
51 strategy: self.clone(),
52 dtype: dtype.clone(),
53 writer: None,
54 }
55 .boxed())
56 }
57}
58
59pub fn dict_layout_supported(dtype: &DType) -> bool {
60 matches!(
61 dtype,
62 DType::Primitive(..) | DType::Utf8(_) | DType::Binary(_)
63 )
64}
65
66struct DelegatingDictLayoutWriter {
67 ctx: ArrayContext,
68 strategy: DictStrategy,
69 dtype: DType,
70 writer: Option<Box<dyn LayoutWriter>>,
71}
72
73impl LayoutWriter for DelegatingDictLayoutWriter {
74 fn push_chunk(
75 &mut self,
76 segment_writer: &mut dyn crate::segments::SegmentWriter,
77 chunk: ArrayRef,
78 ) -> VortexResult<()> {
79 assert_eq!(
80 chunk.dtype(),
81 &self.dtype,
82 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
83 chunk.dtype(),
84 self.dtype
85 );
86 match self.writer.as_mut() {
87 Some(writer) => writer.push_chunk(segment_writer, chunk),
88 None => {
89 let compressed = BtrBlocksCompressor.compress(&chunk)?;
90 let mut writer = if !compressed.is_encoding(DictEncoding.id()) {
91 self.strategy.fallback.new_writer(&self.ctx, &self.dtype)?
92 } else {
93 repeating::DictLayoutWriter::new(
94 self.ctx.clone(),
95 &self.dtype,
96 self.strategy.clone(),
97 )
98 .boxed()
99 };
100 writer.push_chunk(segment_writer, chunk)?;
101 self.writer = Some(writer);
102 Ok(())
103 }
104 }
105 }
106
107 fn flush(
108 &mut self,
109 segment_writer: &mut dyn crate::segments::SegmentWriter,
110 ) -> VortexResult<()> {
111 match self.writer.as_mut() {
112 None => vortex_bail!("flush called before push_chunk"),
113 Some(writer) => writer.flush(segment_writer),
114 }
115 }
116
117 fn finish(
118 &mut self,
119 segment_writer: &mut dyn crate::segments::SegmentWriter,
120 ) -> VortexResult<Layout> {
121 match self.writer.as_mut() {
122 None => vortex_bail!("finish called before push_chunk"),
123 Some(writer) => writer.finish(segment_writer),
124 }
125 }
126}
127
128#[derive(prost::Message)]
129pub struct DictLayoutMetadata {
130 #[prost(enumeration = "PType", tag = "1")]
131 codes_ptype: i32,
133}
134
135impl DictLayoutMetadata {
136 pub fn new(codes_ptype: PType) -> Self {
137 let mut metadata = Self::default();
138 metadata.set_codes_ptype(codes_ptype);
139 metadata
140 }
141}
142
143fn dict_layout(values: Layout, codes: Layout) -> VortexResult<Layout> {
144 let metadata =
145 Bytes::from(ProstMetadata(DictLayoutMetadata::new(codes.dtype().try_into()?)).serialize());
146 Ok(Layout::new_owned(
147 "dict".into(),
148 LayoutVTableRef::new_ref(&DictLayout),
149 values.dtype().clone(),
150 codes.row_count(),
151 vec![],
152 vec![values, codes],
153 Some(metadata),
154 ))
155}
156
157enum EncodingState {
158 Continue((Box<dyn DictEncoder>, ArrayRef)),
159 Done((ArrayRef, ArrayRef, ArrayRef)),
161}
162
163fn start_encoding(constraints: &DictConstraints, chunk: &dyn Array) -> VortexResult<EncodingState> {
164 let encoder = dict_encoder(chunk, constraints)?;
165 encode_chunk(encoder, chunk)
166}
167
168fn encode_chunk(
169 mut encoder: Box<dyn DictEncoder>,
170 chunk: &dyn Array,
171) -> VortexResult<EncodingState> {
172 let encoded = encoder.encode(chunk)?;
173 Ok(match remainder(chunk, encoded.len())? {
174 None => EncodingState::Continue((encoder, encoded)),
175 Some(unencoded) => EncodingState::Done((encoder.values()?, encoded, unencoded)),
176 })
177}
178
179fn remainder(array: &dyn Array, encoded_len: usize) -> VortexResult<Option<ArrayRef>> {
180 (encoded_len < array.len())
181 .then(|| array.slice(encoded_len, array.len()))
182 .transpose()
183}