Skip to main content

mlt_core/frames/v01/property/
optimizer.rs

1//! Optimizer that analyses a batch of [`StagedProperty`] values and produces
2//! [`Vec<PropertyEncoder>`] with near-optimal per-column encoding settings.
3//!
4//! # Pipeline
5//!
6//! 1. **Profile & Group** - compute `MinHash` signatures for string columns and
7//!    cluster similar columns into shared dictionaries using union-find.
8//! 2. **Transform** - merge grouped string columns into `ParsedProperty::SharedDict`.
9//! 3. **Compete & Select** - choose the best `IntEncoder` for integer columns
10//!    via `auto_u32` / `auto_u64` pruning-competition; decide between
11//!    `Plain` and `Fsst` string encodings using an FSST viability probe;
12//!    set `PresenceStream::Absent` for columns that have no null values.
13
14use std::collections::hash_set::IntoIter;
15use std::collections::{HashMap, HashSet};
16
17use fsst::Compressor;
18use probabilistic_collections::similarity::MinHash;
19use union_find::{QuickUnionUf, UnionBySize, UnionFind as _};
20
21use crate::MltError;
22use crate::codecs::zigzag::encode_zigzag;
23use crate::v01::property::encode::encode_properties;
24use crate::v01::property::strings::{build_staged_shared_dict, collect_staged_shared_dict_spans};
25use crate::v01::property::{
26    PresenceStream, PropertyEncoder, ScalarEncoder, StagedProperty, StagedSharedDict,
27    StagedSharedDictItem,
28};
29use crate::v01::stream::IntEncoder;
30use crate::v01::{EncodedProperty, SharedDictEncoder, SharedDictItemEncoder, StrEncoder};
31
32/// Number of [`MinHash`] permutations. 128 gives ~7 % error on Jaccard estimates.
33const MINHASH_PERMUTATIONS: usize = 128;
34
35/// String columns whose estimated Jaccard similarity exceeds this threshold are
36/// grouped into a single shared dictionary.
37const MINHASH_SIMILARITY_THRESHOLD: f64 = 0.6;
38
39/// Minimum total raw byte size of a column before attempting FSST compression.
40/// Below this the symbol-table overhead dominates and FSST never wins.
41const FSST_OVERHEAD_THRESHOLD: usize = 4_096;
42
43/// Maximum number of strings sampled for the FSST viability probe.
44const FSST_SAMPLE_STRINGS: usize = 512;
45
46/// Statistics collected during Phase 1 for a single string-typed column.
47struct StringProfile {
48    /// Index of this column in the original `properties` slice.
49    col_idx: usize,
50    /// `MinHash` signature computed over the set of unique non-null values.
51    /// Empty when the column contains no non-null values (all-null column).
52    min_hashes: Vec<u64>,
53}
54
55/// A pre-computed set of string column groupings derived from a representative
56/// sample of tiles.
57///
58/// Building a profile once from sample tiles avoids re-running the expensive
59/// `MinHash` similarity analysis on every subsequent tile; the profile's
60/// pre-computed string groups are applied directly during the grouping step
61/// instead.
62///
63/// Profiles from multiple samples are combined with [`PropertyProfile::merge`],
64/// which takes the union of both sets of string groups.
65#[derive(Debug, Clone, PartialEq)]
66pub struct PropertyProfile {
67    /// Pre-computed string column groupings by column name.
68    ///
69    /// Each inner vec contains 2 or more column names that should share a
70    /// dictionary. An absent entry causes the caller to skip shared-dict
71    /// merging for that group.
72    string_groups: Vec<Vec<String>>,
73}
74
75impl PropertyProfile {
76    #[doc(hidden)]
77    #[must_use]
78    pub fn new(string_groups: Vec<Vec<String>>) -> Self {
79        Self { string_groups }
80    }
81
82    /// Build a profile from a sample of staged properties.
83    ///
84    /// Runs `MinHash` similarity analysis over all string columns and records
85    /// which column names should be grouped into shared dictionaries.
86    #[must_use]
87    pub fn from_sample(properties: &[StagedProperty]) -> Self {
88        let min_hash = MinHash::<IntoIter<&str>, &str>::new(MINHASH_PERMUTATIONS);
89        let profiles = profile_string_columns(properties, &min_hash);
90
91        let string_groups = if profiles.is_empty() {
92            Vec::new()
93        } else {
94            compute_string_groups(&profiles, &min_hash)
95                .into_iter()
96                .filter(|g| g.len() >= 2)
97                .map(|group| {
98                    group
99                        .iter()
100                        .map(|&ci| properties[ci].name().to_owned())
101                        .collect()
102                })
103                .collect()
104        };
105
106        Self { string_groups }
107    }
108
109    /// Merge two profiles by taking the union of their string groups.
110    ///
111    /// Groups that share at least one column name are merged together.
112    /// Groups already present in `self` are not duplicated.
113    #[must_use]
114    pub fn merge(mut self, other: &Self) -> Self {
115        'outer: for other_group in &other.string_groups {
116            for self_group in &mut self.string_groups {
117                if other_group.iter().any(|n| self_group.contains(n)) {
118                    for name in other_group {
119                        if !self_group.contains(name) {
120                            self_group.push(name.clone());
121                        }
122                    }
123                    continue 'outer;
124                }
125            }
126            self.string_groups.push(other_group.clone());
127        }
128        self
129    }
130}
131
132/// Extension trait for consuming-style encoding of staged property columns.
133pub trait EncodeProperties: Sized {
134    /// Encode with a specific encoder, consuming `self`.
135    fn encode(self, encoder: Vec<PropertyEncoder>) -> Result<Vec<EncodedProperty>, MltError>;
136    /// Profile-driven encode, consuming `self`.
137    fn encode_with_profile(
138        self,
139        profile: &PropertyProfile,
140    ) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError>;
141    /// Automatic encoding, consuming `self`.
142    fn encode_auto(self) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError>;
143}
144
145impl EncodeProperties for Vec<StagedProperty> {
146    fn encode(self, encoder: Vec<PropertyEncoder>) -> Result<Vec<EncodedProperty>, MltError> {
147        encode_properties(&self, encoder)
148    }
149
150    fn encode_with_profile(
151        mut self,
152        profile: &PropertyProfile,
153    ) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError> {
154        let enc = apply_profile(&mut self, profile);
155        let encoded = encode_properties(&self, enc.clone())?;
156        Ok((encoded, enc))
157    }
158
159    fn encode_auto(mut self) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError> {
160        let enc = optimize(&mut self);
161        let encoded = encode_properties(&self, enc.clone())?;
162        Ok((encoded, enc))
163    }
164}
165
166/// Analyze `properties` and return a configured [`Vec<PropertyEncoder>`].
167///
168/// This function mutates `properties` by combining similar string columns
169/// into `StagedProperty::SharedDict` values.
170fn optimize(properties: &mut Vec<StagedProperty>) -> Vec<PropertyEncoder> {
171    let profile = PropertyProfile::from_sample(properties);
172    apply_profile(properties, &profile)
173}
174
175/// Apply a profile to `properties`, using the pre-computed string groups
176/// instead of re-running the `MinHash` similarity analysis.
177///
178/// The same encoder selection logic as [`optimize`] is applied after grouping.
179fn apply_profile(
180    properties: &mut Vec<StagedProperty>,
181    profile: &PropertyProfile,
182) -> Vec<PropertyEncoder> {
183    if properties.is_empty() {
184        return Vec::new();
185    }
186    apply_string_groups(properties, &profile.string_groups);
187    properties.iter().map(build_encoder).collect()
188}
189
190/// Apply pre-computed string groups to `properties` by matching column names.
191///
192/// Columns present in the profile's groups but absent from this tile are
193/// silently skipped. Groups that resolve to fewer than 2 present columns are
194/// also skipped.
195fn apply_string_groups(properties: &mut Vec<StagedProperty>, string_groups: &[Vec<String>]) {
196    let matched_groups: Vec<Vec<usize>> = string_groups
197        .iter()
198        .filter_map(|group| {
199            let mut indices: Vec<usize> = group
200                .iter()
201                .filter_map(|name| {
202                    properties.iter().position(
203                        |p| matches!(p, StagedProperty::Str(v) if v.name == name.as_str()),
204                    )
205                })
206                .collect();
207            indices.sort_unstable();
208            if indices.len() >= 2 {
209                Some(indices)
210            } else {
211                None
212            }
213        })
214        .collect();
215
216    if !matched_groups.is_empty() {
217        merge_str_to_shared_dicts(properties, &matched_groups);
218    }
219}
220
221/// Profile string columns by computing `MinHash` signatures.
222fn profile_string_columns(
223    properties: &[StagedProperty],
224    min_hash: &MinHash<IntoIter<&str>, &str>,
225) -> Vec<StringProfile> {
226    properties
227        .iter()
228        .enumerate()
229        .filter_map(|(col_idx, prop)| {
230            if let StagedProperty::Str(values) = prop {
231                let owned_values = values.dense_values();
232                let unique_set: HashSet<&str> = owned_values.iter().map(String::as_str).collect();
233
234                // Guard against all-null columns - MinHash panics on an empty iterator.
235                let min_hashes = if unique_set.is_empty() {
236                    Vec::new()
237                } else {
238                    min_hash.get_min_hashes(unique_set.into_iter())
239                };
240                Some(StringProfile {
241                    col_idx,
242                    min_hashes,
243                })
244            } else {
245                None
246            }
247        })
248        .collect()
249}
250
251/// Compute groups of similar string columns using union-find.
252///
253/// Returns groups as `Vec<Vec<usize>>` where each inner vec contains
254/// column indices sorted by position.
255fn compute_string_groups(
256    profiles: &[StringProfile],
257    min_hash: &MinHash<IntoIter<&str>, &str>,
258) -> Vec<Vec<usize>> {
259    let n = profiles.len();
260    let mut uf = QuickUnionUf::<UnionBySize>::new(n);
261
262    // Compare pairs and union similar columns
263    for i in 0..n {
264        if !profiles[i].min_hashes.is_empty() {
265            for j in (i + 1)..n {
266                if !profiles[j].min_hashes.is_empty() {
267                    let sim = min_hash.get_similarity_from_hashes(
268                        &profiles[i].min_hashes,
269                        &profiles[j].min_hashes,
270                    );
271                    if sim > MINHASH_SIMILARITY_THRESHOLD {
272                        uf.union(i, j);
273                    }
274                }
275            }
276        }
277    }
278
279    // Collect groups by root
280    let mut groups_map = HashMap::<usize, Vec<usize>>::new();
281    for (i, profile) in profiles.iter().enumerate() {
282        let root = uf.find(i);
283        groups_map.entry(root).or_default().push(profile.col_idx);
284    }
285
286    // Convert map to Vec<Vec<usize>>, sort inner lists, sort by first column
287    let mut groups: Vec<Vec<usize>> = groups_map.into_values().collect();
288    for g in &mut groups {
289        g.sort_unstable();
290    }
291    groups.sort_unstable_by_key(|g| g[0]);
292
293    groups
294}
295
296/// Transform multi-member groups into [`StagedProperty::SharedDict`].
297///
298/// For each group with 2+ members:
299/// - Computes the common prefix name
300/// - Builds [`StagedSharedDictItem`] for each child
301/// - Replaces the first property with [`StagedProperty::SharedDict`]
302/// - Removes the other properties from the vector
303fn merge_str_to_shared_dicts(properties: &mut Vec<StagedProperty>, groups: &[Vec<usize>]) {
304    let mut indices_to_remove: HashSet<usize> = HashSet::new();
305
306    for group in groups {
307        if group.len() < 2 {
308            continue;
309        }
310        let names: Vec<&str> = group.iter().map(|&ci| properties[ci].name()).collect();
311        let prefix = common_prefix_name(&names);
312
313        let items = group
314            .iter()
315            .map(|&col_idx| {
316                let prop = &properties[col_idx];
317                let name = prop.name();
318                let suffix = name.strip_prefix(&prefix).unwrap_or(name).to_owned();
319                let StagedProperty::Str(values) = prop else {
320                    unreachable!("group should only contain Str columns");
321                };
322                (suffix, values.clone())
323            })
324            .collect::<Vec<_>>();
325        let shared_dict = build_staged_shared_dict(prefix.clone(), items)
326            .expect("building staged shared dictionary from string columns should succeed");
327
328        // Replace first property with SharedDict
329        properties[group[0]] = StagedProperty::SharedDict(shared_dict);
330
331        // Mark other properties for removal
332        for &col_idx in &group[1..] {
333            indices_to_remove.insert(col_idx);
334        }
335    }
336
337    // Remove merged properties in reverse order to preserve indices
338    let mut indices: Vec<usize> = indices_to_remove.into_iter().collect();
339    indices.sort_unstable();
340    for idx in indices.into_iter().rev() {
341        properties.remove(idx);
342    }
343}
344
345/// Build an encoder for any staged property type.
346fn build_encoder(prop: &StagedProperty) -> PropertyEncoder {
347    match prop {
348        StagedProperty::Bool(v) => {
349            PropertyEncoder::Scalar(ScalarEncoder::bool(presence_stream(has_nulls(&v.values))))
350        }
351        StagedProperty::F32(v) => {
352            PropertyEncoder::Scalar(ScalarEncoder::float(presence_stream(has_nulls(&v.values))))
353        }
354        StagedProperty::F64(v) => {
355            PropertyEncoder::Scalar(ScalarEncoder::float(presence_stream(has_nulls(&v.values))))
356        }
357        StagedProperty::I8(v) => {
358            let presence = presence_stream(has_nulls(&v.values));
359            // FIXME: inaccurate, but encoders don't support i8 widely. Sometimes, plain might be more efficient for this, but is estimated less effective
360            let non_null = v
361                .values
362                .iter()
363                .flatten()
364                .copied()
365                .map(i32::from)
366                .collect::<Vec<i32>>();
367            let enc = encode_zigzag(&non_null);
368            PropertyEncoder::Scalar(ScalarEncoder::int(presence, IntEncoder::auto_u32(&enc)))
369        }
370        StagedProperty::U8(v) => {
371            let presence = presence_stream(has_nulls(&v.values));
372            // FIXME: inaccurate, but encoders don't support u8 widely. Sometimes, plain might be more efficient for this, but is estimated less effective
373            let non_null: Vec<u32> = v.values.iter().flatten().copied().map(u32::from).collect();
374            PropertyEncoder::Scalar(ScalarEncoder::int(
375                presence,
376                IntEncoder::auto_u32(&non_null),
377            ))
378        }
379        StagedProperty::I32(v) => {
380            let presence = presence_stream(has_nulls(&v.values));
381            let non_null = v.values.iter().flatten().copied().collect::<Vec<i32>>();
382            let enc = encode_zigzag(&non_null);
383            PropertyEncoder::Scalar(ScalarEncoder::int(presence, IntEncoder::auto_u32(&enc)))
384        }
385        StagedProperty::U32(v) => {
386            let presence = presence_stream(has_nulls(&v.values));
387            let non_null: Vec<u32> = v.values.iter().flatten().copied().collect();
388            PropertyEncoder::Scalar(ScalarEncoder::int(
389                presence,
390                IntEncoder::auto_u32(&non_null),
391            ))
392        }
393        StagedProperty::I64(v) => {
394            let presence = presence_stream(has_nulls(&v.values));
395            let non_null = &v.values.iter().flatten().copied().collect::<Vec<i64>>();
396            let enc = encode_zigzag(non_null);
397            PropertyEncoder::Scalar(ScalarEncoder::int(presence, IntEncoder::auto_u64(&enc)))
398        }
399        StagedProperty::U64(v) => {
400            let non_null: Vec<u64> = v.values.iter().flatten().copied().collect();
401            PropertyEncoder::Scalar(ScalarEncoder::int(
402                presence_stream(has_nulls(&v.values)),
403                IntEncoder::auto_u64(&non_null),
404            ))
405        }
406        StagedProperty::Str(v) => {
407            let presence = presence_stream(v.has_nulls());
408            let owned_values = v.dense_values();
409            let non_null: Vec<&str> = owned_values.iter().map(String::as_str).collect();
410            scalar_str_encoder(presence, &non_null)
411        }
412        StagedProperty::SharedDict(shared_dict) => build_shared_dict_encoder(shared_dict),
413    }
414}
415
416/// Build a `SharedDictEncoder` for a `StagedProperty::SharedDict`.
417fn build_shared_dict_encoder(shared_dict: &StagedSharedDict) -> PropertyEncoder {
418    let dict_spans = collect_staged_shared_dict_spans(&shared_dict.items);
419    let all_strings: Vec<&str> = dict_spans
420        .iter()
421        .filter_map(|&span| shared_dict.get(span))
422        .collect();
423
424    let dict_encoder = if fsst_is_viable(&all_strings) {
425        StrEncoder::fsst(IntEncoder::varint(), IntEncoder::varint())
426    } else {
427        let lengths: Vec<u32> = all_strings
428            .iter()
429            .map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX))
430            .collect();
431        StrEncoder::Plain {
432            string_lengths: IntEncoder::auto_u32(&lengths),
433        }
434    };
435
436    let item_encoders: Vec<SharedDictItemEncoder> = shared_dict
437        .items
438        .iter()
439        .map(|item| {
440            let presence = presence_stream(item.has_nulls());
441            let offsets = compute_offset_encoder(&shared_dict.items, item);
442            SharedDictItemEncoder { presence, offsets }
443        })
444        .collect();
445
446    SharedDictEncoder {
447        dict_encoder,
448        items: item_encoders,
449    }
450    .into()
451}
452
453/// Compute the optimal `IntEncoder` for the offset stream of one item
454/// in a staged shared dictionary.
455fn compute_offset_encoder(
456    items: &[StagedSharedDictItem],
457    target_item: &StagedSharedDictItem,
458) -> IntEncoder {
459    let dict_index: HashMap<(u32, u32), u32> = collect_staged_shared_dict_spans(items)
460        .into_iter()
461        .zip(0_u32..)
462        .collect();
463    let offsets: Vec<u32> = target_item
464        .dense_spans()
465        .iter()
466        .map(|span| {
467            *dict_index
468                .get(span)
469                .expect("non-null string span missing from shared dictionary")
470        })
471        .collect();
472
473    if offsets.is_empty() {
474        IntEncoder::plain()
475    } else {
476        IntEncoder::auto_u32(&offsets)
477    }
478}
479
480fn has_nulls<T>(values: &[Option<T>]) -> bool {
481    values.iter().any(Option::is_none)
482}
483
484fn presence_stream(has_nulls: bool) -> PresenceStream {
485    if has_nulls {
486        PresenceStream::Present
487    } else {
488        PresenceStream::Absent
489    }
490}
491
492/// Returns the longest common byte prefix of `names`.
493fn common_prefix_name(names: &[&str]) -> String {
494    debug_assert!(!names.is_empty());
495    let first = names[0];
496    let mut prefix_len = first.len();
497    for name in &names[1..] {
498        let new_len = first
499            .chars()
500            .zip(name.chars())
501            .take_while(|(a, b)| a == b)
502            .count();
503        prefix_len = prefix_len.min(new_len);
504        if prefix_len == 0 {
505            break;
506        }
507    }
508    let prefix_len = first.floor_char_boundary(prefix_len);
509    let raw = &first[..prefix_len];
510    if raw.is_empty() {
511        String::new()
512    } else {
513        raw.to_owned()
514    }
515}
516
517/// Choose between `Plain` and `Fsst` for a standalone string column.
518fn scalar_str_encoder(presence: PresenceStream, non_null: &[&str]) -> PropertyEncoder {
519    let lengths: Vec<u32> = non_null
520        .iter()
521        .map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX))
522        .collect();
523    if fsst_is_viable(non_null) {
524        PropertyEncoder::Scalar(ScalarEncoder::str_fsst(
525            presence,
526            IntEncoder::varint(),
527            IntEncoder::auto_u32(&lengths),
528        ))
529    } else {
530        PropertyEncoder::Scalar(ScalarEncoder::str(presence, IntEncoder::auto_u32(&lengths)))
531    }
532}
533
534/// Returns `true` when FSST compression is likely to save space on `strings`.
535fn fsst_is_viable(strings: &[&str]) -> bool {
536    if strings.is_empty() {
537        return false;
538    }
539    let sample = if strings.len() <= FSST_SAMPLE_STRINGS {
540        strings
541    } else {
542        &strings[..FSST_SAMPLE_STRINGS]
543    };
544    let plain_size: usize = sample.iter().map(|s| s.len()).sum();
545    if plain_size < FSST_OVERHEAD_THRESHOLD {
546        return false;
547    }
548    let byte_slices: Vec<&[u8]> = sample.iter().map(|s| s.as_bytes()).collect();
549    let compressor = Compressor::train(&byte_slices);
550    let symbols = compressor.symbol_table();
551    let symbol_lengths = compressor.symbol_lengths();
552    let symbol_overhead: usize = symbol_lengths
553        .iter()
554        .take(symbols.len())
555        .map(|&l| usize::from(l))
556        .sum();
557    let compressed_size: usize = sample
558        .iter()
559        .map(|s| compressor.compress(s.as_bytes()).len())
560        .sum();
561    symbol_overhead + compressed_size < plain_size
562}