Skip to main content

mlt_core/frames/v01/property/
strings.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::hash_map::Entry;
4use std::io::Write;
5use std::mem::size_of;
6
7use crate::MltError::{
8    BufferUnderflow, DictIndexOutOfBounds, NotImplemented, UnexpectedStreamType2,
9};
10use crate::codecs::fsst::decode_fsst;
11use crate::errors::AsMltError as _;
12use crate::utils::AsUsize as _;
13use crate::v01::{
14    ColumnType, DictionaryType, EncodedFsstData, EncodedName, EncodedPlainData, EncodedPresence,
15    EncodedProperty, EncodedSharedDict, EncodedSharedDictEncoding, EncodedSharedDictItem,
16    EncodedStream, EncodedStrings, EncodedStringsEncoding, FsstStrEncoder, IntEncoder, LengthType,
17    OffsetType, ParsedSharedDict, ParsedSharedDictItem, ParsedStrings, PresenceStream,
18    PropertyEncoder, RawFsstData, RawPlainData, RawPresence, RawSharedDict, RawSharedDictEncoding,
19    RawSharedDictItem, RawStream, RawStrings, RawStringsEncoding, SharedDictEncoder,
20    StagedSharedDict, StagedSharedDictItem, StagedStrings, StrEncoder, StreamType,
21};
22use crate::{Analyze, Decoder, MltError, MltResult, StatType};
23
24impl StrEncoder {
25    #[must_use]
26    pub fn plain(string_lengths: IntEncoder) -> Self {
27        Self::Plain { string_lengths }
28    }
29    #[must_use]
30    pub fn fsst(symbol_lengths: IntEncoder, dict_lengths: IntEncoder) -> Self {
31        Self::Fsst(FsstStrEncoder {
32            symbol_lengths,
33            dict_lengths,
34        })
35    }
36}
37
38impl From<Vec<Option<String>>> for StagedStrings {
39    fn from(values: Vec<Option<String>>) -> Self {
40        Self::from_optional_strings(values)
41    }
42}
43
44impl From<Vec<String>> for StagedStrings {
45    fn from(values: Vec<String>) -> Self {
46        Self::from_optional_strings(values.into_iter().map(Some).collect())
47    }
48}
49
50impl StagedStrings {
51    fn from_optional_strings(values: Vec<Option<String>>) -> Self {
52        let mut lengths = Vec::with_capacity(values.len());
53        let mut data = String::new();
54        let mut end = 0_i32;
55        for value in values {
56            match value {
57                Some(value) => {
58                    end = checked_string_end(end, value.len())
59                        .expect("staged string corpus exceeds supported i32 range");
60                    lengths.push(end);
61                    data.push_str(&value);
62                }
63                None => lengths.push(encode_null_end(end)),
64            }
65        }
66        Self {
67            name: String::new(),
68            lengths,
69            data,
70        }
71    }
72
73    #[must_use]
74    pub fn feature_count(&self) -> usize {
75        self.lengths.len()
76    }
77
78    fn bounds(&self, i: u32) -> Option<(u32, u32)> {
79        let i = i.as_usize();
80        let end = *self.lengths.get(i)?;
81        if end < 0 {
82            return None;
83        }
84        let end = end.cast_unsigned();
85        let start = if i == 0 {
86            0
87        } else {
88            let prev = self.lengths[i - 1];
89            if prev < 0 {
90                (!prev).cast_unsigned()
91            } else {
92                prev.cast_unsigned()
93            }
94        };
95        Some((start, end))
96    }
97
98    #[must_use]
99    pub fn has_nulls(&self) -> bool {
100        self.lengths.iter().any(|&end| end < 0)
101    }
102
103    #[must_use]
104    pub fn presence_bools(&self) -> Vec<bool> {
105        self.lengths.iter().map(|&end| end >= 0).collect()
106    }
107
108    #[must_use]
109    pub fn get(&self, i: u32) -> Option<&str> {
110        let (start, end) = self.bounds(i)?;
111        self.data.get(start.as_usize()..end.as_usize())
112    }
113
114    #[must_use]
115    pub fn dense_values(&self) -> Vec<String> {
116        let mut values = Vec::new();
117        let mut start = 0_u32;
118        for &end in &self.lengths {
119            if end >= 0 {
120                let end = end.cast_unsigned();
121                values.push(self.data[start.as_usize()..end.as_usize()].to_string());
122                start = end;
123            } else {
124                start = (!end).cast_unsigned();
125            }
126        }
127        values
128    }
129}
130
131// ── StagedSharedDict ──────────────────────────────────────────────────────────
132
133impl StagedSharedDict {
134    #[must_use]
135    pub fn corpus(&self) -> &str {
136        &self.data
137    }
138
139    #[must_use]
140    pub fn get(&self, span: (u32, u32)) -> Option<&str> {
141        self.corpus().get(span.0.as_usize()..span.1.as_usize())
142    }
143}
144
145pub fn collect_staged_shared_dict_spans(items: &[StagedSharedDictItem]) -> Vec<(u32, u32)> {
146    let mut spans = items
147        .iter()
148        .flat_map(StagedSharedDictItem::dense_spans)
149        .collect::<Vec<_>>();
150    spans.sort_unstable();
151    spans.dedup();
152    spans
153}
154
155impl StagedSharedDictItem {
156    #[must_use]
157    pub fn feature_count(&self) -> usize {
158        self.ranges.len()
159    }
160
161    #[must_use]
162    pub fn has_nulls(&self) -> bool {
163        self.ranges
164            .iter()
165            .any(|&range| decode_shared_dict_range(range).is_none())
166    }
167
168    #[must_use]
169    pub fn presence_bools(&self) -> Vec<bool> {
170        self.ranges
171            .iter()
172            .map(|&range| decode_shared_dict_range(range).is_some())
173            .collect()
174    }
175
176    #[must_use]
177    pub fn dense_spans(&self) -> Vec<(u32, u32)> {
178        self.ranges
179            .iter()
180            .filter_map(|&range| decode_shared_dict_range(range))
181            .collect()
182    }
183
184    #[must_use]
185    pub fn get<'a>(&self, shared_dict: &'a StagedSharedDict, i: usize) -> Option<&'a str> {
186        self.ranges
187            .get(i)
188            .copied()
189            .and_then(decode_shared_dict_range)
190            .and_then(|span| shared_dict.get(span))
191    }
192
193    #[must_use]
194    pub fn materialize(&self, shared_dict: &StagedSharedDict) -> Vec<Option<String>> {
195        self.ranges
196            .iter()
197            .map(|&range| {
198                decode_shared_dict_range(range)
199                    .and_then(|span| shared_dict.get(span))
200                    .map(str::to_string)
201            })
202            .collect()
203    }
204}
205
206// ── ParsedStrings ─────────────────────────────────────────────────────────────
207
208impl<'a> ParsedStrings<'a> {
209    #[must_use]
210    pub fn new(name: &'a str, lengths: Vec<i32>, data: Cow<'a, str>) -> Self {
211        ParsedStrings {
212            name,
213            lengths,
214            data,
215        }
216    }
217
218    #[must_use]
219    pub fn feature_count(&self) -> usize {
220        self.lengths.len()
221    }
222
223    #[must_use]
224    pub fn has_nulls(&self) -> bool {
225        self.lengths.iter().any(|end| *end < 0)
226    }
227
228    #[must_use]
229    pub fn presence_bools(&self) -> Vec<bool> {
230        self.lengths.iter().map(|&end| end >= 0).collect()
231    }
232
233    fn bounds(&self, i: u32) -> Option<(u32, u32)> {
234        let idx = i.as_usize();
235        let end = *self.lengths.get(idx)?;
236        if end < 0 {
237            return None;
238        }
239        let start = idx
240            .checked_sub(1)
241            .and_then(|prev| self.lengths.get(prev).copied())
242            .map_or(0, decode_end);
243        Some((start, decode_end(end)))
244    }
245
246    #[must_use]
247    pub fn get(&self, i: u32) -> Option<&str> {
248        let (start, end) = self.bounds(i)?;
249        let start = start.as_usize();
250        let end = end.as_usize();
251        self.data.get(start..end)
252    }
253
254    #[must_use]
255    pub fn dense_values(&self) -> Vec<String> {
256        let mut values = Vec::new();
257        let mut start = 0_u32;
258        for &end in &self.lengths {
259            let end_u32 = decode_end(end);
260            let start_idx = start.as_usize();
261            let end_idx = end_u32.as_usize();
262            if end >= 0
263                && let Some(value) = self.data.get(start_idx..end_idx)
264            {
265                values.push(value.to_string());
266            }
267            start = end_u32;
268        }
269        values
270    }
271
272    #[must_use]
273    pub fn materialize(&self) -> Vec<Option<String>> {
274        (0..u32::try_from(self.feature_count()).unwrap_or(u32::MAX))
275            .map(|i| self.get(i).map(str::to_string))
276            .collect()
277    }
278}
279
280impl Analyze for ParsedStrings<'_> {
281    fn collect_statistic(&self, stat: StatType) -> usize {
282        let meta = if stat == StatType::DecodedMetaSize {
283            self.name.len()
284        } else {
285            0
286        };
287        meta + self.dense_values().collect_statistic(stat)
288    }
289}
290
291fn encode_shared_dict_range(start: u32, end: u32) -> Result<(i32, i32), MltError> {
292    Ok((i32::try_from(start)?, i32::try_from(end)?))
293}
294
295fn decode_shared_dict_range(range: (i32, i32)) -> Option<(u32, u32)> {
296    if let (Ok(start), Ok(end)) = (u32::try_from(range.0), u32::try_from(range.1)) {
297        Some((start, end))
298    } else {
299        None
300    }
301}
302
303fn shared_dict_spans(lengths: &[u32], dec: &mut Decoder) -> Result<Vec<(u32, u32)>, MltError> {
304    let mut spans = dec.alloc(lengths.len())?;
305    let mut offset = 0_u32;
306    for &len in lengths {
307        let start = offset;
308        offset = offset.saturating_add(len);
309        spans.push((start, offset));
310    }
311    Ok(spans)
312}
313
314fn resolve_dict_spans(
315    offsets: &[u32],
316    presence: Option<&[bool]>,
317    dict_spans: &[(u32, u32)],
318    dec: &mut Decoder,
319) -> Result<Vec<Option<(u32, u32)>>, MltError> {
320    let present_count = presence.map_or(offsets.len(), <[bool]>::len);
321    let mut resolved = dec.alloc(present_count)?;
322    let mut next = offsets.iter().copied();
323
324    if let Some(presence) = presence {
325        let fail = || {
326            MltError::PresenceValueCountMismatch(
327                presence.iter().filter(|&&v| v).count(),
328                offsets.len(),
329            )
330        };
331        for &present in presence {
332            if !present {
333                resolved.push(None);
334                continue;
335            }
336            let idx = next.next().ok_or_else(fail)?;
337            let span = dict_spans
338                .get(idx as usize)
339                .copied()
340                .ok_or(DictIndexOutOfBounds(idx, dict_spans.len()))?;
341            resolved.push(Some(span));
342        }
343
344        if next.next().is_some() {
345            return Err(fail());
346        }
347    } else {
348        for &idx in offsets {
349            let span = dict_spans
350                .get(idx as usize)
351                .copied()
352                .ok_or(DictIndexOutOfBounds(idx, dict_spans.len()))?;
353            resolved.push(Some(span));
354        }
355    }
356
357    Ok(resolved)
358}
359
360fn dict_span_str(dict_data: &str, span: (u32, u32)) -> MltResult<&str> {
361    let start = span.0.as_usize();
362    let end = span.1.as_usize();
363    let bytes = dict_data.as_bytes();
364    let Some(value) = bytes.get(start..end) else {
365        let len = span.1.saturating_sub(span.0);
366        return Err(BufferUnderflow(len, bytes.len().saturating_sub(start)));
367    };
368    Ok(str::from_utf8(value)?)
369}
370
371impl ParsedSharedDict<'_> {
372    #[must_use]
373    pub fn corpus(&self) -> &str {
374        &self.data
375    }
376
377    #[must_use]
378    pub fn get(&self, span: (u32, u32)) -> Option<&str> {
379        let start = span.0.as_usize();
380        let end = span.1.as_usize();
381        self.corpus().get(start..end)
382    }
383}
384
385impl ParsedSharedDictItem<'_> {
386    #[must_use]
387    pub fn feature_count(&self) -> usize {
388        self.ranges.len()
389    }
390
391    #[must_use]
392    pub fn has_nulls(&self) -> bool {
393        self.ranges
394            .iter()
395            .any(|&range| decode_shared_dict_range(range).is_none())
396    }
397
398    #[must_use]
399    pub fn presence_bools(&self) -> Vec<bool> {
400        self.ranges
401            .iter()
402            .map(|&range| decode_shared_dict_range(range).is_some())
403            .collect()
404    }
405
406    #[must_use]
407    pub fn dense_spans(&self) -> Vec<(u32, u32)> {
408        self.ranges
409            .iter()
410            .filter_map(|&range| decode_shared_dict_range(range))
411            .collect()
412    }
413
414    #[must_use]
415    pub fn materialize(&self, shared_dict: &ParsedSharedDict<'_>) -> Vec<Option<String>> {
416        self.ranges
417            .iter()
418            .map(|&range| {
419                decode_shared_dict_range(range)
420                    .and_then(|span| shared_dict.get(span))
421                    .map(str::to_string)
422            })
423            .collect()
424    }
425
426    #[must_use]
427    pub fn get<'a>(&self, shared_dict: &'a ParsedSharedDict<'_>, i: usize) -> Option<&'a str> {
428        self.ranges
429            .get(i)
430            .copied()
431            .and_then(decode_shared_dict_range)
432            .and_then(|span| shared_dict.get(span))
433    }
434}
435
436/// a helper to validate stream type to match expectation using matches! syntax
437macro_rules! validate_stream {
438    ($stream:expr, $expected:pat $(,)?) => {
439        if !matches!($stream.meta.stream_type, $expected) {
440            return Err(UnexpectedStreamType2(
441                $stream.meta.stream_type,
442                stringify!($expected),
443                stringify!($stream),
444            ));
445        }
446    };
447}
448
449impl<'a> RawPlainData<'a> {
450    pub fn new(lengths: RawStream<'a>, data: RawStream<'a>) -> MltResult<Self> {
451        validate_stream!(
452            lengths,
453            StreamType::Length(LengthType::VarBinary | LengthType::Dictionary)
454        );
455        validate_stream!(
456            data,
457            StreamType::Data(
458                DictionaryType::None | DictionaryType::Single | DictionaryType::Shared
459            )
460        );
461        Ok(Self { lengths, data })
462    }
463
464    pub fn decode(self, dec: &mut Decoder) -> Result<(&'a str, Vec<u32>), MltError> {
465        Ok((
466            str::from_utf8(self.data.as_bytes())?,
467            self.lengths.decode_u32s(dec)?,
468        ))
469    }
470
471    #[must_use]
472    pub fn streams(&self) -> Vec<&RawStream<'_>> {
473        vec![&self.lengths, &self.data]
474    }
475}
476
477impl EncodedPlainData {
478    pub fn new(lengths: EncodedStream, data: EncodedStream) -> MltResult<Self> {
479        validate_stream!(
480            lengths,
481            StreamType::Length(LengthType::VarBinary | LengthType::Dictionary)
482        );
483        validate_stream!(
484            data,
485            StreamType::Data(
486                DictionaryType::None | DictionaryType::Single | DictionaryType::Shared
487            )
488        );
489        Ok(Self { lengths, data })
490    }
491
492    #[must_use]
493    pub fn streams(&self) -> Vec<&EncodedStream> {
494        vec![&self.lengths, &self.data]
495    }
496}
497
498impl<'a> RawFsstData<'a> {
499    pub fn new(
500        symbol_lengths: RawStream<'a>,
501        symbol_table: RawStream<'a>,
502        lengths: RawStream<'a>,
503        corpus: RawStream<'a>,
504    ) -> MltResult<Self> {
505        validate_stream!(symbol_lengths, StreamType::Length(LengthType::Symbol));
506        validate_stream!(symbol_table, StreamType::Data(DictionaryType::Fsst));
507        validate_stream!(lengths, StreamType::Length(LengthType::Dictionary));
508        validate_stream!(
509            corpus,
510            StreamType::Data(DictionaryType::Single | DictionaryType::Shared)
511        );
512        Ok(Self {
513            symbol_lengths,
514            symbol_table,
515            lengths,
516            corpus,
517        })
518    }
519
520    pub fn decode(self, dec: &mut Decoder) -> Result<(String, Vec<u32>), MltError> {
521        decode_fsst(self, dec)
522    }
523
524    #[must_use]
525    pub fn streams(&self) -> Vec<&RawStream<'_>> {
526        vec![
527            &self.symbol_lengths,
528            &self.symbol_table,
529            &self.lengths,
530            &self.corpus,
531        ]
532    }
533}
534
535impl EncodedFsstData {
536    #[must_use]
537    pub fn streams(&self) -> Vec<&EncodedStream> {
538        vec![
539            &self.symbol_lengths,
540            &self.symbol_table,
541            &self.lengths,
542            &self.corpus,
543        ]
544    }
545}
546
547impl<'a> RawStringsEncoding<'a> {
548    #[must_use]
549    pub fn plain(plain_data: RawPlainData<'a>) -> Self {
550        Self::Plain(plain_data)
551    }
552
553    pub fn dictionary(plain_data: RawPlainData<'a>, offsets: RawStream<'a>) -> MltResult<Self> {
554        validate_stream!(offsets, StreamType::Offset(OffsetType::String));
555        Ok(Self::Dictionary {
556            plain_data,
557            offsets,
558        })
559    }
560
561    #[must_use]
562    pub fn fsst_plain(fsst_data: RawFsstData<'a>) -> Self {
563        Self::FsstPlain(fsst_data)
564    }
565
566    pub fn fsst_dictionary(fsst_data: RawFsstData<'a>, offsets: RawStream<'a>) -> MltResult<Self> {
567        validate_stream!(offsets, StreamType::Offset(OffsetType::String));
568        Ok(Self::FsstDictionary { fsst_data, offsets })
569    }
570
571    /// Content streams in wire order.
572    #[must_use]
573    pub fn streams(&self) -> Vec<&RawStream<'_>> {
574        match self {
575            Self::Plain(plain_data) => plain_data.streams(),
576            Self::Dictionary {
577                plain_data,
578                offsets,
579            } => {
580                let mut streams = plain_data.streams();
581                streams.insert(1, offsets); // Offset stays here to preserve the current wire order.
582                streams
583            }
584            Self::FsstPlain(fsst_data) => fsst_data.streams(),
585            Self::FsstDictionary { fsst_data, offsets } => {
586                let mut streams = fsst_data.streams();
587                streams.push(offsets);
588                streams
589            }
590        }
591    }
592}
593
594impl EncodedStringsEncoding {
595    /// Content streams only.
596    #[must_use]
597    pub fn content_streams(&self) -> Vec<&EncodedStream> {
598        match self {
599            Self::Plain(plain_data) => plain_data.streams(),
600            Self::Dictionary {
601                plain_data,
602                offsets,
603            } => {
604                let mut streams = plain_data.streams();
605                streams.insert(1, offsets); // Offset stays here to preserve the current wire order.
606                streams
607            }
608            Self::FsstPlain(fsst_data) => fsst_data.streams(),
609            Self::FsstDictionary { fsst_data, offsets } => {
610                let mut streams = fsst_data.streams();
611                streams.push(offsets);
612                streams
613            }
614        }
615    }
616
617    /// Streams in wire order.
618    #[must_use]
619    pub fn streams(&self) -> Vec<&EncodedStream> {
620        self.content_streams()
621    }
622}
623
624impl RawStrings<'_> {
625    /// Content streams in wire order.
626    #[must_use]
627    pub fn streams(&self) -> Vec<&RawStream<'_>> {
628        self.encoding.streams()
629    }
630}
631
632impl EncodedStrings {
633    /// Streams in wire order.
634    #[must_use]
635    pub fn streams(&self) -> Vec<&EncodedStream> {
636        self.encoding.streams()
637    }
638}
639
640impl<'a> RawSharedDictEncoding<'a> {
641    /// Plain shared dict (2 streams): lengths + data.
642    #[must_use]
643    pub fn plain(plain_data: RawPlainData<'a>) -> Self {
644        Self::Plain(plain_data)
645    }
646
647    /// FSST plain shared dict (4 streams): symbol lengths, symbol table, lengths, corpus.
648    #[must_use]
649    pub fn fsst_plain(fsst_data: RawFsstData<'a>) -> Self {
650        Self::FsstPlain(fsst_data)
651    }
652
653    /// Dict streams in wire order (for serialization).
654    #[must_use]
655    pub fn dict_streams(&self) -> Vec<&RawStream<'_>> {
656        match self {
657            Self::Plain(plain_data) => plain_data.streams(),
658            Self::FsstPlain(fsst_data) => fsst_data.streams(),
659        }
660    }
661}
662
663impl EncodedSharedDictEncoding {
664    #[must_use]
665    pub fn dict_streams(&self) -> Vec<&EncodedStream> {
666        match self {
667            Self::Plain(plain_data) => plain_data.streams(),
668            Self::FsstPlain(fsst_data) => fsst_data.streams(),
669        }
670    }
671}
672
673impl RawSharedDict<'_> {
674    /// Dict streams in wire order (for serialization).
675    #[must_use]
676    pub fn dict_streams(&self) -> Vec<&RawStream<'_>> {
677        self.encoding.dict_streams()
678    }
679}
680
681impl EncodedSharedDict {
682    #[must_use]
683    pub fn dict_streams(&self) -> Vec<&EncodedStream> {
684        self.encoding.dict_streams()
685    }
686}
687
688/// Encode a staged shared dictionary property using `SharedDictEncoder`.
689pub fn encode_shared_dict_prop(
690    shared_dict: &StagedSharedDict,
691    encoder: &SharedDictEncoder,
692) -> MltResult<EncodedProperty> {
693    if shared_dict.items.len() != encoder.items.len() {
694        return Err(NotImplemented(
695            "SharedDict items count must match encoder items count",
696        ));
697    }
698
699    let dict_spans = collect_staged_shared_dict_spans(&shared_dict.items);
700    let dict: Vec<&str> = dict_spans
701        .iter()
702        .map(|&span| {
703            shared_dict
704                .get(span)
705                .ok_or(DictIndexOutOfBounds(span.0, dict_spans.len()))
706        })
707        .collect::<Result<_, _>>()?;
708    let dict_index: HashMap<(u32, u32), u32> = dict_spans.iter().copied().zip(0_u32..).collect();
709
710    let dict_encoded = match encoder.dict_encoder {
711        StrEncoder::Plain { string_lengths } => EncodedStream::encode_strings_with_type(
712            &dict,
713            string_lengths,
714            LengthType::Dictionary,
715            DictionaryType::Shared,
716        )?,
717        StrEncoder::Fsst(enc) => {
718            EncodedStream::encode_strings_fsst_plain_with_type(&dict, enc, DictionaryType::Single)?
719        }
720    };
721
722    // Encode each child column.
723    let mut children = Vec::with_capacity(shared_dict.items.len());
724    for (item, item_enc) in shared_dict.items.iter().zip(&encoder.items) {
725        // Presence stream
726        let presence = if item_enc.presence == PresenceStream::Present {
727            let present_bools = item.presence_bools();
728            Some(EncodedStream::encode_presence(&present_bools)?)
729        } else {
730            None
731        };
732
733        // Offset indices for non-null values only.
734        let offsets: Vec<u32> = item
735            .dense_spans()
736            .iter()
737            .map(|span| {
738                dict_index
739                    .get(span)
740                    .copied()
741                    .ok_or(DictIndexOutOfBounds(span.0, dict_spans.len()))
742            })
743            .collect::<Result<_, _>>()?;
744
745        let data = EncodedStream::encode_u32s_of_type(
746            &offsets,
747            item_enc.offsets,
748            StreamType::Offset(OffsetType::String),
749        )?;
750
751        children.push(EncodedSharedDictItem {
752            name: EncodedName(item.suffix.clone()),
753            presence: EncodedPresence(presence),
754            data,
755        });
756    }
757
758    let encoding = match dict_encoded {
759        EncodedStringsEncoding::Plain(plain_data) => EncodedSharedDictEncoding::Plain(plain_data),
760        EncodedStringsEncoding::FsstPlain(fsst_data) => {
761            EncodedSharedDictEncoding::FsstPlain(fsst_data)
762        }
763        EncodedStringsEncoding::Dictionary { .. }
764        | EncodedStringsEncoding::FsstDictionary { .. } => {
765            return Err(NotImplemented(
766                "SharedDict only supports Plain or FsstPlain encoding",
767            ));
768        }
769    };
770
771    Ok(EncodedProperty::SharedDict(EncodedSharedDict {
772        name: EncodedName(shared_dict.prefix.clone()),
773        encoding,
774        children,
775    }))
776}
777
778/// Build a [`StagedSharedDict`] from a list of `(suffix, values)` pairs.
779///
780/// Deduplicates string values into a shared corpus and records per-feature
781/// byte-range offsets into it.
782pub fn build_staged_shared_dict(
783    prefix: impl Into<String>,
784    items: impl IntoIterator<Item = (String, StagedStrings)>,
785) -> MltResult<StagedSharedDict> {
786    let prefix = prefix.into();
787    let items = items.into_iter().collect::<Vec<_>>();
788    let mut dict_entries = Vec::<String>::new();
789    let mut dict_index = HashMap::<String, u32>::new();
790
791    for (_, values) in &items {
792        for value in values.dense_values() {
793            if let Entry::Vacant(entry) = dict_index.entry(value.clone()) {
794                let idx = u32::try_from(dict_entries.len())?;
795                entry.insert(idx);
796                dict_entries.push(value);
797            }
798        }
799    }
800
801    let mut dict_ranges = Vec::with_capacity(dict_entries.len());
802    let mut data = String::new();
803    for value in &dict_entries {
804        let offset = u32::try_from(data.len())?;
805        let len = u32::try_from(value.len())?;
806        let end = offset.saturating_add(len);
807        dict_ranges.push((offset, end));
808        data.push_str(value);
809    }
810
811    let items = items
812        .into_iter()
813        .map(|(suffix, values)| -> MltResult<StagedSharedDictItem> {
814            let mut ranges = Vec::with_capacity(values.feature_count());
815            for i in 0..u32::try_from(values.feature_count())? {
816                if let Some(value) = values.get(i) {
817                    let idx = dict_index
818                        .get(value)
819                        .copied()
820                        .ok_or(DictIndexOutOfBounds(0, dict_entries.len()))?;
821                    let span = dict_ranges[idx as usize];
822                    ranges.push(encode_shared_dict_range(span.0, span.1)?);
823                } else {
824                    ranges.push((-1, -1));
825                }
826            }
827            Ok(StagedSharedDictItem { suffix, ranges })
828        })
829        .collect::<Result<Vec<_>, _>>()?;
830
831    Ok(StagedSharedDict {
832        prefix,
833        data,
834        items,
835    })
836}
837
838impl EncodedSharedDictItem {
839    pub(crate) fn write_columns_meta_to<W: Write>(&self, writer: &mut W) -> MltResult<()> {
840        let typ = if self.presence.0.is_some() {
841            ColumnType::OptStr
842        } else {
843            ColumnType::Str
844        };
845        typ.write_to(writer)?;
846        Ok(())
847    }
848}
849
850impl<'a> RawStrings<'a> {
851    #[must_use]
852    pub fn new(name: &'a str, presence: RawPresence<'a>, encoding: RawStringsEncoding<'a>) -> Self {
853        Self {
854            name,
855            presence,
856            encoding,
857        }
858    }
859
860    /// Decode string property from its encoded column.
861    pub fn decode(self, dec: &mut Decoder) -> Result<ParsedStrings<'a>, MltError> {
862        let name = self.name;
863        let presence = match self.presence.0 {
864            Some(s) => Some(s.decode_bools(dec)?),
865            None => None,
866        };
867
868        let parsed = match self.encoding {
869            RawStringsEncoding::Plain(plain_data) => {
870                let (data, lengths) = plain_data.decode(dec)?;
871                ParsedStrings {
872                    name,
873                    lengths: to_absolute_lengths(&lengths, presence.as_deref(), dec)?,
874                    data: data.into(),
875                }
876            }
877            RawStringsEncoding::Dictionary {
878                plain_data,
879                offsets,
880            } => {
881                let (data, lengths) = plain_data.decode(dec)?;
882                let offsets: Vec<u32> = offsets.decode_u32s(dec)?;
883                decode_dictionary_strings(name, &lengths, &offsets, presence.as_deref(), data, dec)?
884            }
885            RawStringsEncoding::FsstPlain(fsst_data) => {
886                let (data, dict_lens) = fsst_data.decode(dec)?;
887                ParsedStrings {
888                    name,
889                    lengths: to_absolute_lengths(&dict_lens, presence.as_deref(), dec)?,
890                    data: data.into(),
891                }
892            }
893            RawStringsEncoding::FsstDictionary { fsst_data, offsets } => {
894                let (data, lengths) = fsst_data.decode(dec)?;
895                let offsets: Vec<u32> = offsets.decode_u32s(dec)?;
896                decode_dictionary_strings(
897                    name,
898                    &lengths,
899                    &offsets,
900                    presence.as_deref(),
901                    &data,
902                    dec,
903                )?
904            }
905        };
906        Ok(parsed)
907    }
908}
909
910fn to_absolute_lengths(
911    lengths: &[u32],
912    presence: Option<&[bool]>,
913    dec: &mut Decoder,
914) -> Result<Vec<i32>, MltError> {
915    let capacity = presence.map_or(lengths.len(), <[bool]>::len);
916    let mut absolute = dec.alloc(capacity)?;
917    let mut iter = lengths.iter().copied();
918    let mut end = 0_i32;
919    if let Some(presence) = presence {
920        for &present in presence {
921            if present {
922                let len = iter.next().ok_or(MltError::PresenceValueCountMismatch(
923                    presence.len(),
924                    lengths.len(),
925                ))?;
926                end = checked_absolute_end(end, len)?;
927                absolute.push(end);
928            } else {
929                absolute.push(encode_null_end(end));
930            }
931        }
932        if iter.next().is_some() {
933            return Err(MltError::PresenceValueCountMismatch(
934                presence.iter().filter(|v| **v).count(),
935                lengths.len(),
936            ));
937        }
938    } else {
939        for &len in lengths {
940            end = checked_absolute_end(end, len)?;
941            absolute.push(end);
942        }
943    }
944    Ok(absolute)
945}
946
947fn decode_dictionary_strings<'a>(
948    name: &'a str,
949    dict_lengths: &[u32],
950    offsets: &[u32],
951    presence: Option<&[bool]>,
952    dict_data: &str,
953    dec: &mut Decoder,
954) -> Result<ParsedStrings<'a>, MltError> {
955    let dict_spans = shared_dict_spans(dict_lengths, dec)?;
956    let resolved_spans = resolve_dict_spans(offsets, presence, &dict_spans, dec)?;
957    let mut lengths = dec.alloc(resolved_spans.len())?;
958    let mut data = String::new();
959    let mut end = 0_i32;
960    for span in resolved_spans {
961        if let Some(span) = span {
962            let value = dict_span_str(dict_data, span)?;
963            data.push_str(value);
964            end = checked_string_end(end, value.len())?;
965            lengths.push(end);
966        } else {
967            lengths.push(encode_null_end(end));
968        }
969    }
970    Ok(ParsedStrings {
971        name,
972        lengths,
973        data: Cow::Owned(data),
974    })
975}
976
977fn encode_null_end(end: i32) -> i32 {
978    -end - 1
979}
980
981fn decode_end(end: i32) -> u32 {
982    if end >= 0 {
983        u32::try_from(end).expect("non-negative decoded string end must fit in u32")
984    } else {
985        u32::try_from(-i64::from(end) - 1).expect("encoded null boundary must fit in u32")
986    }
987}
988
989fn checked_string_end(current_end: i32, byte_len: usize) -> MltResult<i32> {
990    let byte_len = u32::try_from(byte_len)?;
991    checked_absolute_end(current_end, byte_len)
992}
993
994fn checked_absolute_end(current_end: i32, delta: u32) -> MltResult<i32> {
995    let delta = i32::try_from(delta)?;
996    current_end
997        .checked_add(delta)
998        .ok_or(MltError::IntegerOverflow)
999}
1000
1001impl<'a> RawSharedDict<'a> {
1002    #[must_use]
1003    pub fn new(
1004        name: &'a str,
1005        encoding: RawSharedDictEncoding<'a>,
1006        children: Vec<RawSharedDictItem<'a>>,
1007    ) -> Self {
1008        Self {
1009            name,
1010            encoding,
1011            children,
1012        }
1013    }
1014
1015    /// Decode a shared-dictionary column into its decoded form.
1016    pub fn decode(self, dec: &mut Decoder) -> Result<ParsedSharedDict<'a>, MltError> {
1017        let prefix = self.name;
1018        let (data, dict_spans) = match self.encoding {
1019            RawSharedDictEncoding::Plain(plain_data) => {
1020                let (decoded, lengths) = plain_data.decode(dec)?;
1021                let dict_spans = shared_dict_spans(&lengths, dec)?;
1022                (Cow::Borrowed(decoded), dict_spans)
1023            }
1024            RawSharedDictEncoding::FsstPlain(fsst_data) => {
1025                let (decoded, lengths) = fsst_data.decode(dec)?;
1026                let dict_spans = shared_dict_spans(&lengths, dec)?;
1027                (decoded.into(), dict_spans)
1028            }
1029        };
1030        let mut items = Vec::with_capacity(self.children.len());
1031        for child in self.children {
1032            let offsets: Vec<u32> = child.data.decode_u32s(dec)?;
1033            let presence = match child.presence.0 {
1034                Some(s) => Some(s.decode_bools(dec)?),
1035                None => None,
1036            };
1037            let ranges = resolve_dict_spans(&offsets, presence.as_deref(), &dict_spans, dec)?
1038                .into_iter()
1039                .map(|span| match span {
1040                    Some(span) => encode_shared_dict_range(span.0, span.1),
1041                    None => Ok((-1, -1)),
1042                })
1043                .collect::<Result<Vec<_>, _>>()?;
1044            items.push(ParsedSharedDictItem {
1045                suffix: child.name,
1046                ranges,
1047            });
1048        }
1049
1050        let parsed = ParsedSharedDict {
1051            prefix,
1052            data,
1053            items,
1054        };
1055        // Corpus size is only known after decompression; charge after.
1056        let bytes = parsed.items.iter().try_fold(
1057            u32::try_from(parsed.data.len()).or_overflow()?,
1058            |acc, item| {
1059                let n = u32::try_from(item.ranges.len() * size_of::<(i32, i32)>()).or_overflow()?;
1060                acc.checked_add(n).ok_or(MltError::IntegerOverflow)
1061            },
1062        )?;
1063        dec.consume(bytes)?;
1064        Ok(parsed)
1065    }
1066}
1067
1068/// FIXME:  uncertain why we need this, delete?
1069impl From<SharedDictEncoder> for PropertyEncoder {
1070    fn from(encoder: SharedDictEncoder) -> Self {
1071        Self::SharedDict(encoder)
1072    }
1073}