mlt_core/frames/v01/stream/
parse.rs1use std::fmt::{self, Debug};
2use std::io::{self, Write};
3
4use integer_encoding::VarIntWriter as _;
5
6use crate::analyse::{Analyze, StatType};
7use crate::codecs::varint::parse_varint;
8use crate::errors::fail_if_invalid_stream_size;
9use crate::utils::{BinarySerializer as _, parse_u8, take};
10use crate::v01::{
11 IntEncoding, LogicalEncoding, LogicalTechnique, MortonMeta, PhysicalEncoding, RawStream,
12 RawStreamData, RleMeta, StreamMeta, StreamType,
13};
14use crate::{MltError, MltRefResult, MltResult, Parser};
15
16impl IntEncoding {
17 #[must_use]
18 pub const fn new(logical: LogicalEncoding, physical: PhysicalEncoding) -> Self {
19 Self { logical, physical }
20 }
21
22 #[must_use]
23 pub const fn none() -> Self {
24 Self::new(LogicalEncoding::None, PhysicalEncoding::None)
25 }
26}
27
28impl StreamMeta {
29 #[must_use]
30 pub fn new(stream_type: StreamType, encoding: IntEncoding, num_values: u32) -> Self {
31 Self {
32 stream_type,
33 encoding,
34 num_values,
35 }
36 }
37
38 pub(super) fn from_bytes<'a>(
47 input: &'a [u8],
48 is_bool: bool,
49 parser: &mut Parser,
50 ) -> MltRefResult<'a, (Self, u32)> {
51 use crate::v01::LogicalTechnique as LT;
52
53 let (input, stream_type) = StreamType::from_bytes(input)?;
54 let (input, val) = parse_u8(input)?;
55 let logical1 = LT::parse(val >> 5)?;
56 let logical2 = LT::parse((val >> 2) & 0x7)?;
57 let physical_encoding = PhysicalEncoding::parse(val & 0x3)?;
58
59 let (input, num_values) = parse_varint::<u32>(input)?;
60 let (input, byte_length) = parse_varint::<u32>(input)?;
61
62 let mut input = input;
63 let logical_encoding = match (logical1, logical2) {
64 (LT::None | LT::Delta | LT::ComponentwiseDelta | LT::PseudoDecimal, LT::None) => {
65 let decoded_bytes = num_values.saturating_mul(8);
67 parser.reserve(decoded_bytes)?;
68 match logical1 {
69 LT::None => LogicalEncoding::None,
70 LT::Delta => LogicalEncoding::Delta,
71 LT::ComponentwiseDelta => LogicalEncoding::ComponentwiseDelta,
72 _ => LogicalEncoding::PseudoDecimal,
73 }
74 }
75 (LT::Delta, LT::Rle) | (LT::Rle, LT::None) => {
76 let runs;
77 let num_rle_values;
78 if is_bool {
79 runs = num_values.div_ceil(8);
80 num_rle_values = byte_length;
81 } else {
82 (input, runs) = parse_varint::<u32>(input)?;
83 (input, num_rle_values) = parse_varint::<u32>(input)?;
84 }
85 let decoded_bytes = num_rle_values.saturating_mul(8);
87 parser.reserve(decoded_bytes)?;
88 let rle = RleMeta {
89 runs,
90 num_rle_values,
91 };
92 if logical1 == LT::Rle {
93 LogicalEncoding::Rle(rle)
94 } else {
95 LogicalEncoding::DeltaRle(rle)
96 }
97 }
98 (LT::Morton, LT::None | LT::Rle | LT::Delta) => {
99 let decoded_bytes = num_values.saturating_mul(8);
101 parser.reserve(decoded_bytes)?;
102 let num_bits;
103 let coordinate_shift;
104 (input, num_bits) = parse_varint::<u32>(input)?;
105 (input, coordinate_shift) = parse_varint::<u32>(input)?;
106 let meta = MortonMeta {
107 num_bits,
108 coordinate_shift,
109 };
110 match logical2 {
111 LT::Rle => LogicalEncoding::MortonRle(meta),
112 LT::Delta => LogicalEncoding::MortonDelta(meta),
113 _ => LogicalEncoding::Morton(meta),
114 }
115 }
116 _ => Err(MltError::InvalidLogicalEncodings(logical1, logical2))?,
117 };
118
119 let meta = StreamMeta::new(
120 stream_type,
121 IntEncoding::new(logical_encoding, physical_encoding),
122 num_values,
123 );
124 Ok((input, (meta, byte_length)))
125 }
126
127 pub fn write_to<W: Write>(
128 &self,
129 writer: &mut W,
130 is_bool: bool,
131 byte_length: u32,
132 ) -> io::Result<()> {
133 use LogicalEncoding as LE;
134 use LogicalTechnique as LT;
135
136 writer.write_u8(self.stream_type.as_u8())?;
137 let logical_enc_u8: u8 = match self.encoding.logical {
138 LE::None => (LT::None as u8) << 5,
139 LE::Delta => (LT::Delta as u8) << 5,
140 LE::DeltaRle(_) => ((LT::Delta as u8) << 5) | ((LT::Rle as u8) << 2),
141 LE::ComponentwiseDelta => (LT::ComponentwiseDelta as u8) << 5,
142 LE::Rle(_) => (LT::Rle as u8) << 5,
143 LE::Morton(_) => (LT::Morton as u8) << 5,
144 LE::MortonRle(_) => (LT::Morton as u8) << 5 | ((LT::Rle as u8) << 2),
145 LE::MortonDelta(_) => (LT::Morton as u8) << 5 | ((LT::Delta as u8) << 2),
146 LE::PseudoDecimal => (LT::PseudoDecimal as u8) << 5,
147 };
148 let physical_enc_u8: u8 = match self.encoding.physical {
149 PhysicalEncoding::None => 0x0,
150 PhysicalEncoding::FastPFOR => 0x1,
151 PhysicalEncoding::VarInt => 0x2,
152 PhysicalEncoding::Alp => 0x3,
153 };
154 writer.write_u8(logical_enc_u8 | physical_enc_u8)?;
155 writer.write_varint(self.num_values)?;
156 writer.write_varint(byte_length)?;
157
158 match self.encoding.logical {
160 LE::DeltaRle(r) | LE::Rle(r) => {
161 if !is_bool {
162 writer.write_varint(r.runs)?;
163 writer.write_varint(r.num_rle_values)?;
164 }
165 }
166 LE::Morton(m) | LE::MortonDelta(m) | LE::MortonRle(m) => {
167 writer.write_varint(m.num_bits)?;
168 writer.write_varint(m.coordinate_shift)?;
169 }
170 LE::None | LE::Delta | LE::ComponentwiseDelta | LE::PseudoDecimal => {}
171 }
172 Ok(())
173 }
174}
175
176impl Analyze for StreamMeta {
177 fn collect_statistic(&self, stat: StatType) -> usize {
178 if stat == StatType::DecodedMetaSize {
179 size_of::<Self>()
180 } else {
181 0
182 }
183 }
184}
185
186impl Debug for StreamMeta {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 let Self {
190 stream_type,
191 encoding,
192 num_values,
193 } = self;
194 f.debug_struct("StreamMeta")
195 .field("stream_type", &format_args!("{stream_type:?}"))
196 .field("logical_encoding", &format_args!("{:?}", encoding.logical))
197 .field(
198 "physical_encoding",
199 &format_args!("{:?}", encoding.physical),
200 )
201 .field("num_values", &format_args!("{num_values:?}"))
202 .finish()
203 }
204}
205
206impl<'a> RawStream<'a> {
207 #[must_use]
208 pub fn new(meta: StreamMeta, data: RawStreamData<'a>) -> Self {
209 Self { meta, data }
210 }
211
212 #[must_use]
213 pub fn as_bytes(&self) -> &'a [u8] {
214 match &self.data {
215 RawStreamData::Encoded(v) | RawStreamData::VarInt(v) => v,
216 }
217 }
218
219 pub fn from_bytes(input: &'a [u8], parser: &mut Parser) -> MltRefResult<'a, Self> {
220 Self::from_bytes_internal(input, false, parser)
221 }
222
223 pub fn parse_multiple(
224 mut input: &'a [u8],
225 count: usize,
226 parser: &mut Parser,
227 ) -> MltRefResult<'a, Vec<Self>> {
228 let mut result = Vec::with_capacity(count);
229 for _ in 0..count {
230 let stream;
231 (input, stream) = RawStream::from_bytes_internal(input, false, parser)?;
232 result.push(stream);
233 }
234 Ok((input, result))
235 }
236
237 pub fn parse_bool(input: &'a [u8], parser: &mut Parser) -> MltRefResult<'a, Self> {
238 Self::from_bytes_internal(input, true, parser)
239 }
240
241 fn from_bytes_internal(
246 input: &'a [u8],
247 is_bool: bool,
248 parser: &mut Parser,
249 ) -> MltRefResult<'a, Self> {
250 use LogicalEncoding as LE;
251 use PhysicalEncoding as PD;
252
253 let (input, (meta, byte_length)) = StreamMeta::from_bytes(input, is_bool, parser)?;
254
255 let (input, data) = take(input, byte_length)?;
256
257 if let LE::Rle(r) | LE::DeltaRle(r) = meta.encoding.logical
259 && matches!(meta.encoding.physical, PD::VarInt)
260 && !is_bool
261 {
262 validate_rle_varint_stream(data, r.runs, r.num_rle_values)?;
263 }
264
265 let stream_data = match meta.encoding.physical {
266 PD::None | PD::FastPFOR => RawStreamData::Encoded(data),
267 PD::VarInt => RawStreamData::VarInt(data),
268 PD::Alp => return Err(MltError::UnsupportedPhysicalEncoding("ALP")),
269 };
270
271 Ok((input, RawStream::new(meta, stream_data)))
272 }
273}
274
275fn validate_rle_varint_stream(data: &[u8], runs: u32, num_rle_values: u32) -> MltResult<()> {
277 use crate::utils::AsUsize as _;
278 let mut rest = data;
279 let mut sum: u64 = 0;
280 for _ in 0..runs {
281 let (next, len) = parse_varint::<u32>(rest)?;
282 rest = next;
283 sum = sum
284 .checked_add(len.into())
285 .ok_or(MltError::IntegerOverflow)?;
286 }
287 if sum != u64::from(num_rle_values) {
288 let sum_usize = usize::try_from(sum).map_err(|_| MltError::IntegerOverflow)?;
289 fail_if_invalid_stream_size(sum_usize, num_rle_values.as_usize())?;
290 }
291 Ok(())
292}