mlt_core/frames/v01/stream/
encode_stream.rs1use crate::MltResult;
2use crate::codecs::bytes::encode_bools_to_bytes;
3use crate::codecs::fsst::compress_fsst;
4use crate::codecs::rle::encode_byte_rle;
5use crate::v01::{
6 DictionaryType, EncodedPlainData, EncodedStream, EncodedStreamData, EncodedStringsEncoding,
7 FsstStrEncoder, IntEncoder, IntEncoding, LengthType, LogicalEncoding, OffsetType,
8 PhysicalEncoding, RleMeta, StreamMeta, StreamType,
9};
10
11impl EncodedStream {
12 #[must_use]
14 pub fn empty_without_encoding() -> Self {
15 Self {
16 meta: StreamMeta::new(
17 StreamType::Data(DictionaryType::None),
18 IntEncoding::none(),
19 0,
20 ),
21 data: EncodedStreamData::Encoded(Vec::new()),
22 }
23 }
24
25 #[must_use]
26 fn plain(data: Vec<u8>, num_values: u32) -> EncodedStream {
27 Self::plain_with_type(data, num_values, DictionaryType::None)
28 }
29
30 #[must_use]
32 fn plain_with_type(data: Vec<u8>, num_values: u32, dict_type: DictionaryType) -> EncodedStream {
33 let meta = StreamMeta::new(StreamType::Data(dict_type), IntEncoding::none(), num_values);
34 let data = EncodedStreamData::Encoded(data);
35 Self { meta, data }
36 }
37
38 pub fn encode_bools(values: &[bool]) -> MltResult<Self> {
42 Self::encode_bools_with_type(values, StreamType::Data(DictionaryType::None))
43 }
44
45 pub fn encode_presence(values: &[bool]) -> MltResult<Self> {
49 Self::encode_bools_with_type(values, StreamType::Present)
50 }
51
52 fn encode_bools_with_type(values: &[bool], stream_type: StreamType) -> MltResult<Self> {
54 let num_values = u32::try_from(values.len())?;
55 let bytes = encode_bools_to_bytes(values);
56 let data = encode_byte_rle(&bytes);
57 let runs = num_values.div_ceil(8);
58 let num_rle_values = u32::try_from(data.len())?;
59 let meta = StreamMeta::new(
60 stream_type,
61 IntEncoding::new(
62 LogicalEncoding::Rle(RleMeta {
63 runs,
64 num_rle_values,
65 }),
66 PhysicalEncoding::None,
67 ),
68 num_values,
69 );
70 Ok(Self {
71 meta,
72 data: EncodedStreamData::Encoded(data),
73 })
74 }
75
76 pub fn encode_f32(values: &[f32]) -> MltResult<Self> {
78 let num_values = u32::try_from(values.len())?;
79 let data = values
80 .iter()
81 .flat_map(|f| f.to_le_bytes())
82 .collect::<Vec<u8>>();
83
84 Ok(Self::plain(data, num_values))
85 }
86
87 pub fn encode_f64(values: &[f64]) -> MltResult<Self> {
89 let num_values = u32::try_from(values.len())?;
90 let data = values
91 .iter()
92 .flat_map(|v| v.to_le_bytes())
93 .collect::<Vec<u8>>();
94
95 Ok(Self::plain(data, num_values))
96 }
97
98 pub fn encode_i8s(values: &[i8], encoding: IntEncoder) -> MltResult<Self> {
99 let as_i32: Vec<i32> = values.iter().map(|&v| i32::from(v)).collect();
100 let (physical_u32s, logical_encoding) = encoding.logical.encode_i32s(&as_i32)?;
101 let num_values = u32::try_from(physical_u32s.len())?;
102 let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
103 Ok(Self {
104 meta: StreamMeta::new(
105 StreamType::Data(DictionaryType::None),
106 IntEncoding::new(logical_encoding, physical_encoding),
107 num_values,
108 ),
109 data,
110 })
111 }
112
113 pub fn encode_u8s(values: &[u8], encoding: IntEncoder) -> MltResult<Self> {
114 let as_u32: Vec<u32> = values.iter().map(|&v| u32::from(v)).collect();
115 let (physical_u32s, logical_encoding) = encoding.logical.encode_u32s(&as_u32)?;
116 let num_values = u32::try_from(physical_u32s.len())?;
117 let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
118 Ok(Self {
119 meta: StreamMeta::new(
120 StreamType::Data(DictionaryType::None),
121 IntEncoding::new(logical_encoding, physical_encoding),
122 num_values,
123 ),
124 data,
125 })
126 }
127
128 pub fn encode_i32s(values: &[i32], encoding: IntEncoder) -> MltResult<Self> {
129 let (physical_u32s, logical_encoding) = encoding.logical.encode_i32s(values)?;
130 let num_values = u32::try_from(physical_u32s.len())?;
131 let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
132 Ok(Self {
133 meta: StreamMeta::new(
134 StreamType::Data(DictionaryType::None),
135 IntEncoding::new(logical_encoding, physical_encoding),
136 num_values,
137 ),
138 data,
139 })
140 }
141
142 pub fn encode_u32s(values: &[u32], encoding: IntEncoder) -> MltResult<Self> {
143 Self::encode_u32s_of_type(values, encoding, StreamType::Data(DictionaryType::None))
144 }
145
146 pub fn encode_u32s_of_type(
147 values: &[u32],
148 encoding: IntEncoder,
149 stream_type: StreamType,
150 ) -> MltResult<Self> {
151 let (physical_u32s, logical_encoding) = encoding.logical.encode_u32s(values)?;
152 let num_values = u32::try_from(physical_u32s.len())?;
153 let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
154 Ok(Self {
155 meta: StreamMeta::new(
156 stream_type,
157 IntEncoding::new(logical_encoding, physical_encoding),
158 num_values,
159 ),
160 data,
161 })
162 }
163
164 pub fn encode_i64s(values: &[i64], encoding: IntEncoder) -> MltResult<Self> {
165 let (physical_u64s, logical_encoding) = encoding.logical.encode_i64s(values)?;
166 let num_values = u32::try_from(physical_u64s.len())?;
167 let (data, physical_encoding) = encoding.physical.encode_u64s(physical_u64s)?;
168 Ok(Self {
169 meta: StreamMeta::new(
170 StreamType::Data(DictionaryType::None),
171 IntEncoding::new(logical_encoding, physical_encoding),
172 num_values,
173 ),
174 data,
175 })
176 }
177
178 pub fn encode_u64s(values: &[u64], encoding: IntEncoder) -> MltResult<Self> {
179 let (physical_u64s, logical_encoding) = encoding.logical.encode_u64s(values)?;
180 let num_values = u32::try_from(physical_u64s.len())?;
181 let (data, physical_encoding) = encoding.physical.encode_u64s(physical_u64s)?;
182 Ok(Self {
183 meta: StreamMeta::new(
184 StreamType::Data(DictionaryType::None),
185 IntEncoding::new(logical_encoding, physical_encoding),
186 num_values,
187 ),
188 data,
189 })
190 }
191
192 pub fn encode_strings_with_type<S: AsRef<str>>(
194 values: &[S],
195 length_encoding: IntEncoder,
196 length_type: LengthType,
197 dict_type: DictionaryType,
198 ) -> MltResult<EncodedStringsEncoding> {
199 let lengths: Vec<u32> = values
200 .iter()
201 .map(|s| u32::try_from(s.as_ref().len()))
202 .collect::<Result<Vec<_>, _>>()?;
203 let data: Vec<u8> = values
204 .iter()
205 .flat_map(|s| s.as_ref().as_bytes().iter().copied())
206 .collect();
207 Ok(EncodedStringsEncoding::Plain(EncodedPlainData::new(
208 Self::encode_u32s_of_type(&lengths, length_encoding, StreamType::Length(length_type))?,
209 Self::plain_with_type(data, u32::try_from(values.len())?, dict_type),
210 )?))
211 }
212
213 pub fn encode_strings_fsst_with_type<S: AsRef<str>>(
229 values: &[S],
230 encoding: FsstStrEncoder,
231 dict_type: DictionaryType,
232 ) -> MltResult<EncodedStringsEncoding> {
233 let fsst_data = compress_fsst(values, encoding, dict_type)?;
234 let value_cnt = u32::try_from(values.len())?;
235 let offsets = (0..value_cnt).collect::<Vec<_>>();
236 let offsets = Self::encode_u32s_of_type(
237 &offsets,
238 encoding.dict_lengths,
239 StreamType::Offset(OffsetType::String),
240 )?;
241 Ok(EncodedStringsEncoding::FsstDictionary { fsst_data, offsets })
242 }
243
244 pub fn encode_strings_fsst_plain_with_type<S: AsRef<str>>(
247 values: &[S],
248 encoding: FsstStrEncoder,
249 dict_type: DictionaryType,
250 ) -> MltResult<EncodedStringsEncoding> {
251 Ok(EncodedStringsEncoding::FsstPlain(compress_fsst(
252 values, encoding, dict_type,
253 )?))
254 }
255}