Skip to main content

mlt_core/frames/v01/stream/
encode_stream.rs

1use crate::MltResult;
2use crate::codecs::bytes::encode_bools_to_bytes;
3use crate::codecs::fsst::compress_fsst;
4use crate::codecs::rle::encode_byte_rle;
5use crate::v01::{
6    DictionaryType, EncodedPlainData, EncodedStream, EncodedStreamData, EncodedStringsEncoding,
7    FsstStrEncoder, IntEncoder, IntEncoding, LengthType, LogicalEncoding, OffsetType,
8    PhysicalEncoding, RleMeta, StreamMeta, StreamType,
9};
10
11impl EncodedStream {
12    /// Creates an empty stream
13    #[must_use]
14    pub fn empty_without_encoding() -> Self {
15        Self {
16            meta: StreamMeta::new(
17                StreamType::Data(DictionaryType::None),
18                IntEncoding::none(),
19                0,
20            ),
21            data: EncodedStreamData::Encoded(Vec::new()),
22        }
23    }
24
25    #[must_use]
26    fn plain(data: Vec<u8>, num_values: u32) -> EncodedStream {
27        Self::plain_with_type(data, num_values, DictionaryType::None)
28    }
29
30    /// Creates a plain stream with values encoded literally
31    #[must_use]
32    fn plain_with_type(data: Vec<u8>, num_values: u32, dict_type: DictionaryType) -> EncodedStream {
33        let meta = StreamMeta::new(StreamType::Data(dict_type), IntEncoding::none(), num_values);
34        let data = EncodedStreamData::Encoded(data);
35        Self { meta, data }
36    }
37
38    /// Encode a boolean data stream: byte-RLE <- packed bitmap <- `Vec<bool>`
39    /// Boolean streams always use byte-RLE encoding with `LogicalEncoding::Rle` metadata.
40    /// The `RleMeta` values are computed by readers from the stream itself.
41    pub fn encode_bools(values: &[bool]) -> MltResult<Self> {
42        Self::encode_bools_with_type(values, StreamType::Data(DictionaryType::None))
43    }
44
45    /// Encode a presence/nullability stream
46    ///
47    /// Identical to [`Self::encode_bools`] except the stream type is [`StreamType::Present`]
48    pub fn encode_presence(values: &[bool]) -> MltResult<Self> {
49        Self::encode_bools_with_type(values, StreamType::Present)
50    }
51
52    /// Encode a boolean data stream: byte-RLE <- packed bitmap <- `Vec<bool>`
53    fn encode_bools_with_type(values: &[bool], stream_type: StreamType) -> MltResult<Self> {
54        let num_values = u32::try_from(values.len())?;
55        let bytes = encode_bools_to_bytes(values);
56        let data = encode_byte_rle(&bytes);
57        let runs = num_values.div_ceil(8);
58        let num_rle_values = u32::try_from(data.len())?;
59        let meta = StreamMeta::new(
60            stream_type,
61            IntEncoding::new(
62                LogicalEncoding::Rle(RleMeta {
63                    runs,
64                    num_rle_values,
65                }),
66                PhysicalEncoding::None,
67            ),
68            num_values,
69        );
70        Ok(Self {
71            meta,
72            data: EncodedStreamData::Encoded(data),
73        })
74    }
75
76    /// Encodes `f32`s into a stream
77    pub fn encode_f32(values: &[f32]) -> MltResult<Self> {
78        let num_values = u32::try_from(values.len())?;
79        let data = values
80            .iter()
81            .flat_map(|f| f.to_le_bytes())
82            .collect::<Vec<u8>>();
83
84        Ok(Self::plain(data, num_values))
85    }
86
87    /// Encodes `f64`s into a stream
88    pub fn encode_f64(values: &[f64]) -> MltResult<Self> {
89        let num_values = u32::try_from(values.len())?;
90        let data = values
91            .iter()
92            .flat_map(|v| v.to_le_bytes())
93            .collect::<Vec<u8>>();
94
95        Ok(Self::plain(data, num_values))
96    }
97
98    pub fn encode_i8s(values: &[i8], encoding: IntEncoder) -> MltResult<Self> {
99        let as_i32: Vec<i32> = values.iter().map(|&v| i32::from(v)).collect();
100        let (physical_u32s, logical_encoding) = encoding.logical.encode_i32s(&as_i32)?;
101        let num_values = u32::try_from(physical_u32s.len())?;
102        let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
103        Ok(Self {
104            meta: StreamMeta::new(
105                StreamType::Data(DictionaryType::None),
106                IntEncoding::new(logical_encoding, physical_encoding),
107                num_values,
108            ),
109            data,
110        })
111    }
112
113    pub fn encode_u8s(values: &[u8], encoding: IntEncoder) -> MltResult<Self> {
114        let as_u32: Vec<u32> = values.iter().map(|&v| u32::from(v)).collect();
115        let (physical_u32s, logical_encoding) = encoding.logical.encode_u32s(&as_u32)?;
116        let num_values = u32::try_from(physical_u32s.len())?;
117        let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
118        Ok(Self {
119            meta: StreamMeta::new(
120                StreamType::Data(DictionaryType::None),
121                IntEncoding::new(logical_encoding, physical_encoding),
122                num_values,
123            ),
124            data,
125        })
126    }
127
128    pub fn encode_i32s(values: &[i32], encoding: IntEncoder) -> MltResult<Self> {
129        let (physical_u32s, logical_encoding) = encoding.logical.encode_i32s(values)?;
130        let num_values = u32::try_from(physical_u32s.len())?;
131        let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
132        Ok(Self {
133            meta: StreamMeta::new(
134                StreamType::Data(DictionaryType::None),
135                IntEncoding::new(logical_encoding, physical_encoding),
136                num_values,
137            ),
138            data,
139        })
140    }
141
142    pub fn encode_u32s(values: &[u32], encoding: IntEncoder) -> MltResult<Self> {
143        Self::encode_u32s_of_type(values, encoding, StreamType::Data(DictionaryType::None))
144    }
145
146    pub fn encode_u32s_of_type(
147        values: &[u32],
148        encoding: IntEncoder,
149        stream_type: StreamType,
150    ) -> MltResult<Self> {
151        let (physical_u32s, logical_encoding) = encoding.logical.encode_u32s(values)?;
152        let num_values = u32::try_from(physical_u32s.len())?;
153        let (data, physical_encoding) = encoding.physical.encode_u32s(physical_u32s)?;
154        Ok(Self {
155            meta: StreamMeta::new(
156                stream_type,
157                IntEncoding::new(logical_encoding, physical_encoding),
158                num_values,
159            ),
160            data,
161        })
162    }
163
164    pub fn encode_i64s(values: &[i64], encoding: IntEncoder) -> MltResult<Self> {
165        let (physical_u64s, logical_encoding) = encoding.logical.encode_i64s(values)?;
166        let num_values = u32::try_from(physical_u64s.len())?;
167        let (data, physical_encoding) = encoding.physical.encode_u64s(physical_u64s)?;
168        Ok(Self {
169            meta: StreamMeta::new(
170                StreamType::Data(DictionaryType::None),
171                IntEncoding::new(logical_encoding, physical_encoding),
172                num_values,
173            ),
174            data,
175        })
176    }
177
178    pub fn encode_u64s(values: &[u64], encoding: IntEncoder) -> MltResult<Self> {
179        let (physical_u64s, logical_encoding) = encoding.logical.encode_u64s(values)?;
180        let num_values = u32::try_from(physical_u64s.len())?;
181        let (data, physical_encoding) = encoding.physical.encode_u64s(physical_u64s)?;
182        Ok(Self {
183            meta: StreamMeta::new(
184                StreamType::Data(DictionaryType::None),
185                IntEncoding::new(logical_encoding, physical_encoding),
186                num_values,
187            ),
188            data,
189        })
190    }
191
192    /// Encode a sequence of strings into a length stream and a data stream.
193    pub fn encode_strings_with_type<S: AsRef<str>>(
194        values: &[S],
195        length_encoding: IntEncoder,
196        length_type: LengthType,
197        dict_type: DictionaryType,
198    ) -> MltResult<EncodedStringsEncoding> {
199        let lengths: Vec<u32> = values
200            .iter()
201            .map(|s| u32::try_from(s.as_ref().len()))
202            .collect::<Result<Vec<_>, _>>()?;
203        let data: Vec<u8> = values
204            .iter()
205            .flat_map(|s| s.as_ref().as_bytes().iter().copied())
206            .collect();
207        Ok(EncodedStringsEncoding::Plain(EncodedPlainData::new(
208            Self::encode_u32s_of_type(&lengths, length_encoding, StreamType::Length(length_type))?,
209            Self::plain_with_type(data, u32::try_from(values.len())?, dict_type),
210        )?))
211    }
212
213    /// Encode a sequence of strings using FSST compression.
214    ///
215    /// Produces 5 streams:
216    /// 1. Symbol lengths stream (Length, `LengthType::Symbol`)
217    /// 2. Symbol table data stream (Data, `DictionaryType::Fsst`)
218    /// 3. Value lengths stream (Length, `LengthType::Dictionary`)
219    /// 4. Compressed corpus stream (Data, `dict_type`)
220    /// 5. Offset indices stream (Offset, `OffsetType::String`)
221    ///
222    /// The dictionary type of the compressed corpus stream is determined by the
223    /// `dict_type` argument passed to this function.
224    ///
225    /// Note: The FSST algorithm implementation may differ from Java's, so the
226    /// compressed output may not be byte-for-byte identical. Both implementations
227    /// are semantically compatible and can decode each other's output.
228    pub fn encode_strings_fsst_with_type<S: AsRef<str>>(
229        values: &[S],
230        encoding: FsstStrEncoder,
231        dict_type: DictionaryType,
232    ) -> MltResult<EncodedStringsEncoding> {
233        let fsst_data = compress_fsst(values, encoding, dict_type)?;
234        let value_cnt = u32::try_from(values.len())?;
235        let offsets = (0..value_cnt).collect::<Vec<_>>();
236        let offsets = Self::encode_u32s_of_type(
237            &offsets,
238            encoding.dict_lengths,
239            StreamType::Offset(OffsetType::String),
240        )?;
241        Ok(EncodedStringsEncoding::FsstDictionary { fsst_data, offsets })
242    }
243
244    /// Encode strings with FSST (4 streams, no offset). For shared dictionary struct columns;
245    /// each child has its own offset stream.
246    pub fn encode_strings_fsst_plain_with_type<S: AsRef<str>>(
247        values: &[S],
248        encoding: FsstStrEncoder,
249        dict_type: DictionaryType,
250    ) -> MltResult<EncodedStringsEncoding> {
251        Ok(EncodedStringsEncoding::FsstPlain(compress_fsst(
252            values, encoding, dict_type,
253        )?))
254    }
255}