1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::hash_map::Entry;
4use std::io::Write;
5use std::mem::size_of;
6
7use crate::MltError::{
8 BufferUnderflow, DictIndexOutOfBounds, NotImplemented, UnexpectedStreamType2,
9};
10use crate::codecs::fsst::decode_fsst;
11use crate::errors::AsMltError as _;
12use crate::utils::AsUsize as _;
13use crate::v01::{
14 ColumnType, DictionaryType, EncodedFsstData, EncodedName, EncodedPlainData, EncodedPresence,
15 EncodedProperty, EncodedSharedDict, EncodedSharedDictEncoding, EncodedSharedDictItem,
16 EncodedStream, EncodedStrings, EncodedStringsEncoding, FsstStrEncoder, IntEncoder, LengthType,
17 OffsetType, ParsedSharedDict, ParsedSharedDictItem, ParsedStrings, PresenceStream,
18 PropertyEncoder, RawFsstData, RawPlainData, RawPresence, RawSharedDict, RawSharedDictEncoding,
19 RawSharedDictItem, RawStream, RawStrings, RawStringsEncoding, SharedDictEncoder,
20 StagedSharedDict, StagedSharedDictItem, StagedStrings, StrEncoder, StreamType,
21};
22use crate::{Analyze, Decoder, MltError, MltResult, StatType};
23
24impl StrEncoder {
25 #[must_use]
26 pub fn plain(string_lengths: IntEncoder) -> Self {
27 Self::Plain { string_lengths }
28 }
29 #[must_use]
30 pub fn fsst(symbol_lengths: IntEncoder, dict_lengths: IntEncoder) -> Self {
31 Self::Fsst(FsstStrEncoder {
32 symbol_lengths,
33 dict_lengths,
34 })
35 }
36}
37
38impl From<Vec<Option<String>>> for StagedStrings {
39 fn from(values: Vec<Option<String>>) -> Self {
40 Self::from_optional_strings(values)
41 }
42}
43
44impl From<Vec<String>> for StagedStrings {
45 fn from(values: Vec<String>) -> Self {
46 Self::from_optional_strings(values.into_iter().map(Some).collect())
47 }
48}
49
50impl StagedStrings {
51 fn from_optional_strings(values: Vec<Option<String>>) -> Self {
52 let mut lengths = Vec::with_capacity(values.len());
53 let mut data = String::new();
54 let mut end = 0_i32;
55 for value in values {
56 match value {
57 Some(value) => {
58 end = checked_string_end(end, value.len())
59 .expect("staged string corpus exceeds supported i32 range");
60 lengths.push(end);
61 data.push_str(&value);
62 }
63 None => lengths.push(encode_null_end(end)),
64 }
65 }
66 Self {
67 name: String::new(),
68 lengths,
69 data,
70 }
71 }
72
73 #[must_use]
74 pub fn feature_count(&self) -> usize {
75 self.lengths.len()
76 }
77
78 fn bounds(&self, i: u32) -> Option<(u32, u32)> {
79 let i = i.as_usize();
80 let end = *self.lengths.get(i)?;
81 if end < 0 {
82 return None;
83 }
84 let end = end.cast_unsigned();
85 let start = if i == 0 {
86 0
87 } else {
88 let prev = self.lengths[i - 1];
89 if prev < 0 {
90 (!prev).cast_unsigned()
91 } else {
92 prev.cast_unsigned()
93 }
94 };
95 Some((start, end))
96 }
97
98 #[must_use]
99 pub fn has_nulls(&self) -> bool {
100 self.lengths.iter().any(|&end| end < 0)
101 }
102
103 #[must_use]
104 pub fn presence_bools(&self) -> Vec<bool> {
105 self.lengths.iter().map(|&end| end >= 0).collect()
106 }
107
108 #[must_use]
109 pub fn get(&self, i: u32) -> Option<&str> {
110 let (start, end) = self.bounds(i)?;
111 self.data.get(start.as_usize()..end.as_usize())
112 }
113
114 #[must_use]
115 pub fn dense_values(&self) -> Vec<String> {
116 let mut values = Vec::new();
117 let mut start = 0_u32;
118 for &end in &self.lengths {
119 if end >= 0 {
120 let end = end.cast_unsigned();
121 values.push(self.data[start.as_usize()..end.as_usize()].to_string());
122 start = end;
123 } else {
124 start = (!end).cast_unsigned();
125 }
126 }
127 values
128 }
129}
130
131impl StagedSharedDict {
134 #[must_use]
135 pub fn corpus(&self) -> &str {
136 &self.data
137 }
138
139 #[must_use]
140 pub fn get(&self, span: (u32, u32)) -> Option<&str> {
141 self.corpus().get(span.0.as_usize()..span.1.as_usize())
142 }
143}
144
145pub fn collect_staged_shared_dict_spans(items: &[StagedSharedDictItem]) -> Vec<(u32, u32)> {
146 let mut spans = items
147 .iter()
148 .flat_map(StagedSharedDictItem::dense_spans)
149 .collect::<Vec<_>>();
150 spans.sort_unstable();
151 spans.dedup();
152 spans
153}
154
155impl StagedSharedDictItem {
156 #[must_use]
157 pub fn feature_count(&self) -> usize {
158 self.ranges.len()
159 }
160
161 #[must_use]
162 pub fn has_nulls(&self) -> bool {
163 self.ranges
164 .iter()
165 .any(|&range| decode_shared_dict_range(range).is_none())
166 }
167
168 #[must_use]
169 pub fn presence_bools(&self) -> Vec<bool> {
170 self.ranges
171 .iter()
172 .map(|&range| decode_shared_dict_range(range).is_some())
173 .collect()
174 }
175
176 #[must_use]
177 pub fn dense_spans(&self) -> Vec<(u32, u32)> {
178 self.ranges
179 .iter()
180 .filter_map(|&range| decode_shared_dict_range(range))
181 .collect()
182 }
183
184 #[must_use]
185 pub fn get<'a>(&self, shared_dict: &'a StagedSharedDict, i: usize) -> Option<&'a str> {
186 self.ranges
187 .get(i)
188 .copied()
189 .and_then(decode_shared_dict_range)
190 .and_then(|span| shared_dict.get(span))
191 }
192
193 #[must_use]
194 pub fn materialize(&self, shared_dict: &StagedSharedDict) -> Vec<Option<String>> {
195 self.ranges
196 .iter()
197 .map(|&range| {
198 decode_shared_dict_range(range)
199 .and_then(|span| shared_dict.get(span))
200 .map(str::to_string)
201 })
202 .collect()
203 }
204}
205
206impl<'a> ParsedStrings<'a> {
209 #[must_use]
210 pub fn new(name: &'a str, lengths: Vec<i32>, data: Cow<'a, str>) -> Self {
211 ParsedStrings {
212 name,
213 lengths,
214 data,
215 }
216 }
217
218 #[must_use]
219 pub fn feature_count(&self) -> usize {
220 self.lengths.len()
221 }
222
223 #[must_use]
224 pub fn has_nulls(&self) -> bool {
225 self.lengths.iter().any(|end| *end < 0)
226 }
227
228 #[must_use]
229 pub fn presence_bools(&self) -> Vec<bool> {
230 self.lengths.iter().map(|&end| end >= 0).collect()
231 }
232
233 fn bounds(&self, i: u32) -> Option<(u32, u32)> {
234 let idx = i.as_usize();
235 let end = *self.lengths.get(idx)?;
236 if end < 0 {
237 return None;
238 }
239 let start = idx
240 .checked_sub(1)
241 .and_then(|prev| self.lengths.get(prev).copied())
242 .map_or(0, decode_end);
243 Some((start, decode_end(end)))
244 }
245
246 #[must_use]
247 pub fn get(&self, i: u32) -> Option<&str> {
248 let (start, end) = self.bounds(i)?;
249 let start = start.as_usize();
250 let end = end.as_usize();
251 self.data.get(start..end)
252 }
253
254 #[must_use]
255 pub fn dense_values(&self) -> Vec<String> {
256 let mut values = Vec::new();
257 let mut start = 0_u32;
258 for &end in &self.lengths {
259 let end_u32 = decode_end(end);
260 let start_idx = start.as_usize();
261 let end_idx = end_u32.as_usize();
262 if end >= 0
263 && let Some(value) = self.data.get(start_idx..end_idx)
264 {
265 values.push(value.to_string());
266 }
267 start = end_u32;
268 }
269 values
270 }
271
272 #[must_use]
273 pub fn materialize(&self) -> Vec<Option<String>> {
274 (0..u32::try_from(self.feature_count()).unwrap_or(u32::MAX))
275 .map(|i| self.get(i).map(str::to_string))
276 .collect()
277 }
278}
279
280impl Analyze for ParsedStrings<'_> {
281 fn collect_statistic(&self, stat: StatType) -> usize {
282 let meta = if stat == StatType::DecodedMetaSize {
283 self.name.len()
284 } else {
285 0
286 };
287 meta + self.dense_values().collect_statistic(stat)
288 }
289}
290
291fn encode_shared_dict_range(start: u32, end: u32) -> Result<(i32, i32), MltError> {
292 Ok((i32::try_from(start)?, i32::try_from(end)?))
293}
294
295fn decode_shared_dict_range(range: (i32, i32)) -> Option<(u32, u32)> {
296 if let (Ok(start), Ok(end)) = (u32::try_from(range.0), u32::try_from(range.1)) {
297 Some((start, end))
298 } else {
299 None
300 }
301}
302
303fn shared_dict_spans(lengths: &[u32], dec: &mut Decoder) -> Result<Vec<(u32, u32)>, MltError> {
304 let mut spans = dec.alloc(lengths.len())?;
305 let mut offset = 0_u32;
306 for &len in lengths {
307 let start = offset;
308 offset = offset.saturating_add(len);
309 spans.push((start, offset));
310 }
311 Ok(spans)
312}
313
314fn resolve_dict_spans(
315 offsets: &[u32],
316 presence: Option<&[bool]>,
317 dict_spans: &[(u32, u32)],
318 dec: &mut Decoder,
319) -> Result<Vec<Option<(u32, u32)>>, MltError> {
320 let present_count = presence.map_or(offsets.len(), <[bool]>::len);
321 let mut resolved = dec.alloc(present_count)?;
322 let mut next = offsets.iter().copied();
323
324 if let Some(presence) = presence {
325 let fail = || {
326 MltError::PresenceValueCountMismatch(
327 presence.iter().filter(|&&v| v).count(),
328 offsets.len(),
329 )
330 };
331 for &present in presence {
332 if !present {
333 resolved.push(None);
334 continue;
335 }
336 let idx = next.next().ok_or_else(fail)?;
337 let span = dict_spans
338 .get(idx as usize)
339 .copied()
340 .ok_or(DictIndexOutOfBounds(idx, dict_spans.len()))?;
341 resolved.push(Some(span));
342 }
343
344 if next.next().is_some() {
345 return Err(fail());
346 }
347 } else {
348 for &idx in offsets {
349 let span = dict_spans
350 .get(idx as usize)
351 .copied()
352 .ok_or(DictIndexOutOfBounds(idx, dict_spans.len()))?;
353 resolved.push(Some(span));
354 }
355 }
356
357 Ok(resolved)
358}
359
360fn dict_span_str(dict_data: &str, span: (u32, u32)) -> MltResult<&str> {
361 let start = span.0.as_usize();
362 let end = span.1.as_usize();
363 let bytes = dict_data.as_bytes();
364 let Some(value) = bytes.get(start..end) else {
365 let len = span.1.saturating_sub(span.0);
366 return Err(BufferUnderflow(len, bytes.len().saturating_sub(start)));
367 };
368 Ok(str::from_utf8(value)?)
369}
370
371impl ParsedSharedDict<'_> {
372 #[must_use]
373 pub fn corpus(&self) -> &str {
374 &self.data
375 }
376
377 #[must_use]
378 pub fn get(&self, span: (u32, u32)) -> Option<&str> {
379 let start = span.0.as_usize();
380 let end = span.1.as_usize();
381 self.corpus().get(start..end)
382 }
383}
384
385impl ParsedSharedDictItem<'_> {
386 #[must_use]
387 pub fn feature_count(&self) -> usize {
388 self.ranges.len()
389 }
390
391 #[must_use]
392 pub fn has_nulls(&self) -> bool {
393 self.ranges
394 .iter()
395 .any(|&range| decode_shared_dict_range(range).is_none())
396 }
397
398 #[must_use]
399 pub fn presence_bools(&self) -> Vec<bool> {
400 self.ranges
401 .iter()
402 .map(|&range| decode_shared_dict_range(range).is_some())
403 .collect()
404 }
405
406 #[must_use]
407 pub fn dense_spans(&self) -> Vec<(u32, u32)> {
408 self.ranges
409 .iter()
410 .filter_map(|&range| decode_shared_dict_range(range))
411 .collect()
412 }
413
414 #[must_use]
415 pub fn materialize(&self, shared_dict: &ParsedSharedDict<'_>) -> Vec<Option<String>> {
416 self.ranges
417 .iter()
418 .map(|&range| {
419 decode_shared_dict_range(range)
420 .and_then(|span| shared_dict.get(span))
421 .map(str::to_string)
422 })
423 .collect()
424 }
425
426 #[must_use]
427 pub fn get<'a>(&self, shared_dict: &'a ParsedSharedDict<'_>, i: usize) -> Option<&'a str> {
428 self.ranges
429 .get(i)
430 .copied()
431 .and_then(decode_shared_dict_range)
432 .and_then(|span| shared_dict.get(span))
433 }
434}
435
436macro_rules! validate_stream {
438 ($stream:expr, $expected:pat $(,)?) => {
439 if !matches!($stream.meta.stream_type, $expected) {
440 return Err(UnexpectedStreamType2(
441 $stream.meta.stream_type,
442 stringify!($expected),
443 stringify!($stream),
444 ));
445 }
446 };
447}
448
449impl<'a> RawPlainData<'a> {
450 pub fn new(lengths: RawStream<'a>, data: RawStream<'a>) -> MltResult<Self> {
451 validate_stream!(
452 lengths,
453 StreamType::Length(LengthType::VarBinary | LengthType::Dictionary)
454 );
455 validate_stream!(
456 data,
457 StreamType::Data(
458 DictionaryType::None | DictionaryType::Single | DictionaryType::Shared
459 )
460 );
461 Ok(Self { lengths, data })
462 }
463
464 pub fn decode(self, dec: &mut Decoder) -> Result<(&'a str, Vec<u32>), MltError> {
465 Ok((
466 str::from_utf8(self.data.as_bytes())?,
467 self.lengths.decode_u32s(dec)?,
468 ))
469 }
470
471 #[must_use]
472 pub fn streams(&self) -> Vec<&RawStream<'_>> {
473 vec![&self.lengths, &self.data]
474 }
475}
476
477impl EncodedPlainData {
478 pub fn new(lengths: EncodedStream, data: EncodedStream) -> MltResult<Self> {
479 validate_stream!(
480 lengths,
481 StreamType::Length(LengthType::VarBinary | LengthType::Dictionary)
482 );
483 validate_stream!(
484 data,
485 StreamType::Data(
486 DictionaryType::None | DictionaryType::Single | DictionaryType::Shared
487 )
488 );
489 Ok(Self { lengths, data })
490 }
491
492 #[must_use]
493 pub fn streams(&self) -> Vec<&EncodedStream> {
494 vec![&self.lengths, &self.data]
495 }
496}
497
498impl<'a> RawFsstData<'a> {
499 pub fn new(
500 symbol_lengths: RawStream<'a>,
501 symbol_table: RawStream<'a>,
502 lengths: RawStream<'a>,
503 corpus: RawStream<'a>,
504 ) -> MltResult<Self> {
505 validate_stream!(symbol_lengths, StreamType::Length(LengthType::Symbol));
506 validate_stream!(symbol_table, StreamType::Data(DictionaryType::Fsst));
507 validate_stream!(lengths, StreamType::Length(LengthType::Dictionary));
508 validate_stream!(
509 corpus,
510 StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
511 );
512 Ok(Self {
513 symbol_lengths,
514 symbol_table,
515 lengths,
516 corpus,
517 })
518 }
519
520 pub fn decode(self, dec: &mut Decoder) -> Result<(String, Vec<u32>), MltError> {
521 decode_fsst(self, dec)
522 }
523
524 #[must_use]
525 pub fn streams(&self) -> Vec<&RawStream<'_>> {
526 vec![
527 &self.symbol_lengths,
528 &self.symbol_table,
529 &self.lengths,
530 &self.corpus,
531 ]
532 }
533}
534
535impl EncodedFsstData {
536 #[must_use]
537 pub fn streams(&self) -> Vec<&EncodedStream> {
538 vec![
539 &self.symbol_lengths,
540 &self.symbol_table,
541 &self.lengths,
542 &self.corpus,
543 ]
544 }
545}
546
547impl<'a> RawStringsEncoding<'a> {
548 #[must_use]
549 pub fn plain(plain_data: RawPlainData<'a>) -> Self {
550 Self::Plain(plain_data)
551 }
552
553 pub fn dictionary(plain_data: RawPlainData<'a>, offsets: RawStream<'a>) -> MltResult<Self> {
554 validate_stream!(offsets, StreamType::Offset(OffsetType::String));
555 Ok(Self::Dictionary {
556 plain_data,
557 offsets,
558 })
559 }
560
561 #[must_use]
562 pub fn fsst_plain(fsst_data: RawFsstData<'a>) -> Self {
563 Self::FsstPlain(fsst_data)
564 }
565
566 pub fn fsst_dictionary(fsst_data: RawFsstData<'a>, offsets: RawStream<'a>) -> MltResult<Self> {
567 validate_stream!(offsets, StreamType::Offset(OffsetType::String));
568 Ok(Self::FsstDictionary { fsst_data, offsets })
569 }
570
571 #[must_use]
573 pub fn streams(&self) -> Vec<&RawStream<'_>> {
574 match self {
575 Self::Plain(plain_data) => plain_data.streams(),
576 Self::Dictionary {
577 plain_data,
578 offsets,
579 } => {
580 let mut streams = plain_data.streams();
581 streams.insert(1, offsets); streams
583 }
584 Self::FsstPlain(fsst_data) => fsst_data.streams(),
585 Self::FsstDictionary { fsst_data, offsets } => {
586 let mut streams = fsst_data.streams();
587 streams.push(offsets);
588 streams
589 }
590 }
591 }
592}
593
594impl EncodedStringsEncoding {
595 #[must_use]
597 pub fn content_streams(&self) -> Vec<&EncodedStream> {
598 match self {
599 Self::Plain(plain_data) => plain_data.streams(),
600 Self::Dictionary {
601 plain_data,
602 offsets,
603 } => {
604 let mut streams = plain_data.streams();
605 streams.insert(1, offsets); streams
607 }
608 Self::FsstPlain(fsst_data) => fsst_data.streams(),
609 Self::FsstDictionary { fsst_data, offsets } => {
610 let mut streams = fsst_data.streams();
611 streams.push(offsets);
612 streams
613 }
614 }
615 }
616
617 #[must_use]
619 pub fn streams(&self) -> Vec<&EncodedStream> {
620 self.content_streams()
621 }
622}
623
624impl RawStrings<'_> {
625 #[must_use]
627 pub fn streams(&self) -> Vec<&RawStream<'_>> {
628 self.encoding.streams()
629 }
630}
631
632impl EncodedStrings {
633 #[must_use]
635 pub fn streams(&self) -> Vec<&EncodedStream> {
636 self.encoding.streams()
637 }
638}
639
640impl<'a> RawSharedDictEncoding<'a> {
641 #[must_use]
643 pub fn plain(plain_data: RawPlainData<'a>) -> Self {
644 Self::Plain(plain_data)
645 }
646
647 #[must_use]
649 pub fn fsst_plain(fsst_data: RawFsstData<'a>) -> Self {
650 Self::FsstPlain(fsst_data)
651 }
652
653 #[must_use]
655 pub fn dict_streams(&self) -> Vec<&RawStream<'_>> {
656 match self {
657 Self::Plain(plain_data) => plain_data.streams(),
658 Self::FsstPlain(fsst_data) => fsst_data.streams(),
659 }
660 }
661}
662
663impl EncodedSharedDictEncoding {
664 #[must_use]
665 pub fn dict_streams(&self) -> Vec<&EncodedStream> {
666 match self {
667 Self::Plain(plain_data) => plain_data.streams(),
668 Self::FsstPlain(fsst_data) => fsst_data.streams(),
669 }
670 }
671}
672
673impl RawSharedDict<'_> {
674 #[must_use]
676 pub fn dict_streams(&self) -> Vec<&RawStream<'_>> {
677 self.encoding.dict_streams()
678 }
679}
680
681impl EncodedSharedDict {
682 #[must_use]
683 pub fn dict_streams(&self) -> Vec<&EncodedStream> {
684 self.encoding.dict_streams()
685 }
686}
687
688pub fn encode_shared_dict_prop(
690 shared_dict: &StagedSharedDict,
691 encoder: &SharedDictEncoder,
692) -> MltResult<EncodedProperty> {
693 if shared_dict.items.len() != encoder.items.len() {
694 return Err(NotImplemented(
695 "SharedDict items count must match encoder items count",
696 ));
697 }
698
699 let dict_spans = collect_staged_shared_dict_spans(&shared_dict.items);
700 let dict: Vec<&str> = dict_spans
701 .iter()
702 .map(|&span| {
703 shared_dict
704 .get(span)
705 .ok_or(DictIndexOutOfBounds(span.0, dict_spans.len()))
706 })
707 .collect::<Result<_, _>>()?;
708 let dict_index: HashMap<(u32, u32), u32> = dict_spans.iter().copied().zip(0_u32..).collect();
709
710 let dict_encoded = match encoder.dict_encoder {
711 StrEncoder::Plain { string_lengths } => EncodedStream::encode_strings_with_type(
712 &dict,
713 string_lengths,
714 LengthType::Dictionary,
715 DictionaryType::Shared,
716 )?,
717 StrEncoder::Fsst(enc) => {
718 EncodedStream::encode_strings_fsst_plain_with_type(&dict, enc, DictionaryType::Single)?
719 }
720 };
721
722 let mut children = Vec::with_capacity(shared_dict.items.len());
724 for (item, item_enc) in shared_dict.items.iter().zip(&encoder.items) {
725 let presence = if item_enc.presence == PresenceStream::Present {
727 let present_bools = item.presence_bools();
728 Some(EncodedStream::encode_presence(&present_bools)?)
729 } else {
730 None
731 };
732
733 let offsets: Vec<u32> = item
735 .dense_spans()
736 .iter()
737 .map(|span| {
738 dict_index
739 .get(span)
740 .copied()
741 .ok_or(DictIndexOutOfBounds(span.0, dict_spans.len()))
742 })
743 .collect::<Result<_, _>>()?;
744
745 let data = EncodedStream::encode_u32s_of_type(
746 &offsets,
747 item_enc.offsets,
748 StreamType::Offset(OffsetType::String),
749 )?;
750
751 children.push(EncodedSharedDictItem {
752 name: EncodedName(item.suffix.clone()),
753 presence: EncodedPresence(presence),
754 data,
755 });
756 }
757
758 let encoding = match dict_encoded {
759 EncodedStringsEncoding::Plain(plain_data) => EncodedSharedDictEncoding::Plain(plain_data),
760 EncodedStringsEncoding::FsstPlain(fsst_data) => {
761 EncodedSharedDictEncoding::FsstPlain(fsst_data)
762 }
763 EncodedStringsEncoding::Dictionary { .. }
764 | EncodedStringsEncoding::FsstDictionary { .. } => {
765 return Err(NotImplemented(
766 "SharedDict only supports Plain or FsstPlain encoding",
767 ));
768 }
769 };
770
771 Ok(EncodedProperty::SharedDict(EncodedSharedDict {
772 name: EncodedName(shared_dict.prefix.clone()),
773 encoding,
774 children,
775 }))
776}
777
778pub fn build_staged_shared_dict(
783 prefix: impl Into<String>,
784 items: impl IntoIterator<Item = (String, StagedStrings)>,
785) -> MltResult<StagedSharedDict> {
786 let prefix = prefix.into();
787 let items = items.into_iter().collect::<Vec<_>>();
788 let mut dict_entries = Vec::<String>::new();
789 let mut dict_index = HashMap::<String, u32>::new();
790
791 for (_, values) in &items {
792 for value in values.dense_values() {
793 if let Entry::Vacant(entry) = dict_index.entry(value.clone()) {
794 let idx = u32::try_from(dict_entries.len())?;
795 entry.insert(idx);
796 dict_entries.push(value);
797 }
798 }
799 }
800
801 let mut dict_ranges = Vec::with_capacity(dict_entries.len());
802 let mut data = String::new();
803 for value in &dict_entries {
804 let offset = u32::try_from(data.len())?;
805 let len = u32::try_from(value.len())?;
806 let end = offset.saturating_add(len);
807 dict_ranges.push((offset, end));
808 data.push_str(value);
809 }
810
811 let items = items
812 .into_iter()
813 .map(|(suffix, values)| -> MltResult<StagedSharedDictItem> {
814 let mut ranges = Vec::with_capacity(values.feature_count());
815 for i in 0..u32::try_from(values.feature_count())? {
816 if let Some(value) = values.get(i) {
817 let idx = dict_index
818 .get(value)
819 .copied()
820 .ok_or(DictIndexOutOfBounds(0, dict_entries.len()))?;
821 let span = dict_ranges[idx as usize];
822 ranges.push(encode_shared_dict_range(span.0, span.1)?);
823 } else {
824 ranges.push((-1, -1));
825 }
826 }
827 Ok(StagedSharedDictItem { suffix, ranges })
828 })
829 .collect::<Result<Vec<_>, _>>()?;
830
831 Ok(StagedSharedDict {
832 prefix,
833 data,
834 items,
835 })
836}
837
838impl EncodedSharedDictItem {
839 pub(crate) fn write_columns_meta_to<W: Write>(&self, writer: &mut W) -> MltResult<()> {
840 let typ = if self.presence.0.is_some() {
841 ColumnType::OptStr
842 } else {
843 ColumnType::Str
844 };
845 typ.write_to(writer)?;
846 Ok(())
847 }
848}
849
850impl<'a> RawStrings<'a> {
851 #[must_use]
852 pub fn new(name: &'a str, presence: RawPresence<'a>, encoding: RawStringsEncoding<'a>) -> Self {
853 Self {
854 name,
855 presence,
856 encoding,
857 }
858 }
859
860 pub fn decode(self, dec: &mut Decoder) -> Result<ParsedStrings<'a>, MltError> {
862 let name = self.name;
863 let presence = match self.presence.0 {
864 Some(s) => Some(s.decode_bools(dec)?),
865 None => None,
866 };
867
868 let parsed = match self.encoding {
869 RawStringsEncoding::Plain(plain_data) => {
870 let (data, lengths) = plain_data.decode(dec)?;
871 ParsedStrings {
872 name,
873 lengths: to_absolute_lengths(&lengths, presence.as_deref(), dec)?,
874 data: data.into(),
875 }
876 }
877 RawStringsEncoding::Dictionary {
878 plain_data,
879 offsets,
880 } => {
881 let (data, lengths) = plain_data.decode(dec)?;
882 let offsets: Vec<u32> = offsets.decode_u32s(dec)?;
883 decode_dictionary_strings(name, &lengths, &offsets, presence.as_deref(), data, dec)?
884 }
885 RawStringsEncoding::FsstPlain(fsst_data) => {
886 let (data, dict_lens) = fsst_data.decode(dec)?;
887 ParsedStrings {
888 name,
889 lengths: to_absolute_lengths(&dict_lens, presence.as_deref(), dec)?,
890 data: data.into(),
891 }
892 }
893 RawStringsEncoding::FsstDictionary { fsst_data, offsets } => {
894 let (data, lengths) = fsst_data.decode(dec)?;
895 let offsets: Vec<u32> = offsets.decode_u32s(dec)?;
896 decode_dictionary_strings(
897 name,
898 &lengths,
899 &offsets,
900 presence.as_deref(),
901 &data,
902 dec,
903 )?
904 }
905 };
906 Ok(parsed)
907 }
908}
909
910fn to_absolute_lengths(
911 lengths: &[u32],
912 presence: Option<&[bool]>,
913 dec: &mut Decoder,
914) -> Result<Vec<i32>, MltError> {
915 let capacity = presence.map_or(lengths.len(), <[bool]>::len);
916 let mut absolute = dec.alloc(capacity)?;
917 let mut iter = lengths.iter().copied();
918 let mut end = 0_i32;
919 if let Some(presence) = presence {
920 for &present in presence {
921 if present {
922 let len = iter.next().ok_or(MltError::PresenceValueCountMismatch(
923 presence.len(),
924 lengths.len(),
925 ))?;
926 end = checked_absolute_end(end, len)?;
927 absolute.push(end);
928 } else {
929 absolute.push(encode_null_end(end));
930 }
931 }
932 if iter.next().is_some() {
933 return Err(MltError::PresenceValueCountMismatch(
934 presence.iter().filter(|v| **v).count(),
935 lengths.len(),
936 ));
937 }
938 } else {
939 for &len in lengths {
940 end = checked_absolute_end(end, len)?;
941 absolute.push(end);
942 }
943 }
944 Ok(absolute)
945}
946
947fn decode_dictionary_strings<'a>(
948 name: &'a str,
949 dict_lengths: &[u32],
950 offsets: &[u32],
951 presence: Option<&[bool]>,
952 dict_data: &str,
953 dec: &mut Decoder,
954) -> Result<ParsedStrings<'a>, MltError> {
955 let dict_spans = shared_dict_spans(dict_lengths, dec)?;
956 let resolved_spans = resolve_dict_spans(offsets, presence, &dict_spans, dec)?;
957 let mut lengths = dec.alloc(resolved_spans.len())?;
958 let mut data = String::new();
959 let mut end = 0_i32;
960 for span in resolved_spans {
961 if let Some(span) = span {
962 let value = dict_span_str(dict_data, span)?;
963 data.push_str(value);
964 end = checked_string_end(end, value.len())?;
965 lengths.push(end);
966 } else {
967 lengths.push(encode_null_end(end));
968 }
969 }
970 Ok(ParsedStrings {
971 name,
972 lengths,
973 data: Cow::Owned(data),
974 })
975}
976
977fn encode_null_end(end: i32) -> i32 {
978 -end - 1
979}
980
981fn decode_end(end: i32) -> u32 {
982 if end >= 0 {
983 u32::try_from(end).expect("non-negative decoded string end must fit in u32")
984 } else {
985 u32::try_from(-i64::from(end) - 1).expect("encoded null boundary must fit in u32")
986 }
987}
988
989fn checked_string_end(current_end: i32, byte_len: usize) -> MltResult<i32> {
990 let byte_len = u32::try_from(byte_len)?;
991 checked_absolute_end(current_end, byte_len)
992}
993
994fn checked_absolute_end(current_end: i32, delta: u32) -> MltResult<i32> {
995 let delta = i32::try_from(delta)?;
996 current_end
997 .checked_add(delta)
998 .ok_or(MltError::IntegerOverflow)
999}
1000
1001impl<'a> RawSharedDict<'a> {
1002 #[must_use]
1003 pub fn new(
1004 name: &'a str,
1005 encoding: RawSharedDictEncoding<'a>,
1006 children: Vec<RawSharedDictItem<'a>>,
1007 ) -> Self {
1008 Self {
1009 name,
1010 encoding,
1011 children,
1012 }
1013 }
1014
1015 pub fn decode(self, dec: &mut Decoder) -> Result<ParsedSharedDict<'a>, MltError> {
1017 let prefix = self.name;
1018 let (data, dict_spans) = match self.encoding {
1019 RawSharedDictEncoding::Plain(plain_data) => {
1020 let (decoded, lengths) = plain_data.decode(dec)?;
1021 let dict_spans = shared_dict_spans(&lengths, dec)?;
1022 (Cow::Borrowed(decoded), dict_spans)
1023 }
1024 RawSharedDictEncoding::FsstPlain(fsst_data) => {
1025 let (decoded, lengths) = fsst_data.decode(dec)?;
1026 let dict_spans = shared_dict_spans(&lengths, dec)?;
1027 (decoded.into(), dict_spans)
1028 }
1029 };
1030 let mut items = Vec::with_capacity(self.children.len());
1031 for child in self.children {
1032 let offsets: Vec<u32> = child.data.decode_u32s(dec)?;
1033 let presence = match child.presence.0 {
1034 Some(s) => Some(s.decode_bools(dec)?),
1035 None => None,
1036 };
1037 let ranges = resolve_dict_spans(&offsets, presence.as_deref(), &dict_spans, dec)?
1038 .into_iter()
1039 .map(|span| match span {
1040 Some(span) => encode_shared_dict_range(span.0, span.1),
1041 None => Ok((-1, -1)),
1042 })
1043 .collect::<Result<Vec<_>, _>>()?;
1044 items.push(ParsedSharedDictItem {
1045 suffix: child.name,
1046 ranges,
1047 });
1048 }
1049
1050 let parsed = ParsedSharedDict {
1051 prefix,
1052 data,
1053 items,
1054 };
1055 let bytes = parsed.items.iter().try_fold(
1057 u32::try_from(parsed.data.len()).or_overflow()?,
1058 |acc, item| {
1059 let n = u32::try_from(item.ranges.len() * size_of::<(i32, i32)>()).or_overflow()?;
1060 acc.checked_add(n).ok_or(MltError::IntegerOverflow)
1061 },
1062 )?;
1063 dec.consume(bytes)?;
1064 Ok(parsed)
1065 }
1066}
1067
1068impl From<SharedDictEncoder> for PropertyEncoder {
1070 fn from(encoder: SharedDictEncoder) -> Self {
1071 Self::SharedDict(encoder)
1072 }
1073}