mlt_core/frames/v01/property/
optimizer.rs1use std::collections::hash_set::IntoIter;
15use std::collections::{HashMap, HashSet};
16
17use fsst::Compressor;
18use probabilistic_collections::similarity::MinHash;
19use union_find::{QuickUnionUf, UnionBySize, UnionFind as _};
20
21use crate::MltError;
22use crate::codecs::zigzag::encode_zigzag;
23use crate::v01::property::encode::encode_properties;
24use crate::v01::property::strings::{build_staged_shared_dict, collect_staged_shared_dict_spans};
25use crate::v01::property::{
26 PresenceStream, PropertyEncoder, ScalarEncoder, StagedProperty, StagedSharedDict,
27 StagedSharedDictItem,
28};
29use crate::v01::stream::IntEncoder;
30use crate::v01::{EncodedProperty, SharedDictEncoder, SharedDictItemEncoder, StrEncoder};
31
32const MINHASH_PERMUTATIONS: usize = 128;
34
35const MINHASH_SIMILARITY_THRESHOLD: f64 = 0.6;
38
39const FSST_OVERHEAD_THRESHOLD: usize = 4_096;
42
43const FSST_SAMPLE_STRINGS: usize = 512;
45
46struct StringProfile {
48 col_idx: usize,
50 min_hashes: Vec<u64>,
53}
54
55#[derive(Debug, Clone, PartialEq)]
66pub struct PropertyProfile {
67 string_groups: Vec<Vec<String>>,
73}
74
75impl PropertyProfile {
76 #[doc(hidden)]
77 #[must_use]
78 pub fn new(string_groups: Vec<Vec<String>>) -> Self {
79 Self { string_groups }
80 }
81
82 #[must_use]
87 pub fn from_sample(properties: &[StagedProperty]) -> Self {
88 let min_hash = MinHash::<IntoIter<&str>, &str>::new(MINHASH_PERMUTATIONS);
89 let profiles = profile_string_columns(properties, &min_hash);
90
91 let string_groups = if profiles.is_empty() {
92 Vec::new()
93 } else {
94 compute_string_groups(&profiles, &min_hash)
95 .into_iter()
96 .filter(|g| g.len() >= 2)
97 .map(|group| {
98 group
99 .iter()
100 .map(|&ci| properties[ci].name().to_owned())
101 .collect()
102 })
103 .collect()
104 };
105
106 Self { string_groups }
107 }
108
109 #[must_use]
114 pub fn merge(mut self, other: &Self) -> Self {
115 'outer: for other_group in &other.string_groups {
116 for self_group in &mut self.string_groups {
117 if other_group.iter().any(|n| self_group.contains(n)) {
118 for name in other_group {
119 if !self_group.contains(name) {
120 self_group.push(name.clone());
121 }
122 }
123 continue 'outer;
124 }
125 }
126 self.string_groups.push(other_group.clone());
127 }
128 self
129 }
130}
131
132pub trait EncodeProperties: Sized {
134 fn encode(self, encoder: Vec<PropertyEncoder>) -> Result<Vec<EncodedProperty>, MltError>;
136 fn encode_with_profile(
138 self,
139 profile: &PropertyProfile,
140 ) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError>;
141 fn encode_auto(self) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError>;
143}
144
145impl EncodeProperties for Vec<StagedProperty> {
146 fn encode(self, encoder: Vec<PropertyEncoder>) -> Result<Vec<EncodedProperty>, MltError> {
147 encode_properties(&self, encoder)
148 }
149
150 fn encode_with_profile(
151 mut self,
152 profile: &PropertyProfile,
153 ) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError> {
154 let enc = apply_profile(&mut self, profile);
155 let encoded = encode_properties(&self, enc.clone())?;
156 Ok((encoded, enc))
157 }
158
159 fn encode_auto(mut self) -> Result<(Vec<EncodedProperty>, Vec<PropertyEncoder>), MltError> {
160 let enc = optimize(&mut self);
161 let encoded = encode_properties(&self, enc.clone())?;
162 Ok((encoded, enc))
163 }
164}
165
166fn optimize(properties: &mut Vec<StagedProperty>) -> Vec<PropertyEncoder> {
171 let profile = PropertyProfile::from_sample(properties);
172 apply_profile(properties, &profile)
173}
174
175fn apply_profile(
180 properties: &mut Vec<StagedProperty>,
181 profile: &PropertyProfile,
182) -> Vec<PropertyEncoder> {
183 if properties.is_empty() {
184 return Vec::new();
185 }
186 apply_string_groups(properties, &profile.string_groups);
187 properties.iter().map(build_encoder).collect()
188}
189
190fn apply_string_groups(properties: &mut Vec<StagedProperty>, string_groups: &[Vec<String>]) {
196 let matched_groups: Vec<Vec<usize>> = string_groups
197 .iter()
198 .filter_map(|group| {
199 let mut indices: Vec<usize> = group
200 .iter()
201 .filter_map(|name| {
202 properties.iter().position(
203 |p| matches!(p, StagedProperty::Str(v) if v.name == name.as_str()),
204 )
205 })
206 .collect();
207 indices.sort_unstable();
208 if indices.len() >= 2 {
209 Some(indices)
210 } else {
211 None
212 }
213 })
214 .collect();
215
216 if !matched_groups.is_empty() {
217 merge_str_to_shared_dicts(properties, &matched_groups);
218 }
219}
220
221fn profile_string_columns(
223 properties: &[StagedProperty],
224 min_hash: &MinHash<IntoIter<&str>, &str>,
225) -> Vec<StringProfile> {
226 properties
227 .iter()
228 .enumerate()
229 .filter_map(|(col_idx, prop)| {
230 if let StagedProperty::Str(values) = prop {
231 let owned_values = values.dense_values();
232 let unique_set: HashSet<&str> = owned_values.iter().map(String::as_str).collect();
233
234 let min_hashes = if unique_set.is_empty() {
236 Vec::new()
237 } else {
238 min_hash.get_min_hashes(unique_set.into_iter())
239 };
240 Some(StringProfile {
241 col_idx,
242 min_hashes,
243 })
244 } else {
245 None
246 }
247 })
248 .collect()
249}
250
251fn compute_string_groups(
256 profiles: &[StringProfile],
257 min_hash: &MinHash<IntoIter<&str>, &str>,
258) -> Vec<Vec<usize>> {
259 let n = profiles.len();
260 let mut uf = QuickUnionUf::<UnionBySize>::new(n);
261
262 for i in 0..n {
264 if !profiles[i].min_hashes.is_empty() {
265 for j in (i + 1)..n {
266 if !profiles[j].min_hashes.is_empty() {
267 let sim = min_hash.get_similarity_from_hashes(
268 &profiles[i].min_hashes,
269 &profiles[j].min_hashes,
270 );
271 if sim > MINHASH_SIMILARITY_THRESHOLD {
272 uf.union(i, j);
273 }
274 }
275 }
276 }
277 }
278
279 let mut groups_map = HashMap::<usize, Vec<usize>>::new();
281 for (i, profile) in profiles.iter().enumerate() {
282 let root = uf.find(i);
283 groups_map.entry(root).or_default().push(profile.col_idx);
284 }
285
286 let mut groups: Vec<Vec<usize>> = groups_map.into_values().collect();
288 for g in &mut groups {
289 g.sort_unstable();
290 }
291 groups.sort_unstable_by_key(|g| g[0]);
292
293 groups
294}
295
296fn merge_str_to_shared_dicts(properties: &mut Vec<StagedProperty>, groups: &[Vec<usize>]) {
304 let mut indices_to_remove: HashSet<usize> = HashSet::new();
305
306 for group in groups {
307 if group.len() < 2 {
308 continue;
309 }
310 let names: Vec<&str> = group.iter().map(|&ci| properties[ci].name()).collect();
311 let prefix = common_prefix_name(&names);
312
313 let items = group
314 .iter()
315 .map(|&col_idx| {
316 let prop = &properties[col_idx];
317 let name = prop.name();
318 let suffix = name.strip_prefix(&prefix).unwrap_or(name).to_owned();
319 let StagedProperty::Str(values) = prop else {
320 unreachable!("group should only contain Str columns");
321 };
322 (suffix, values.clone())
323 })
324 .collect::<Vec<_>>();
325 let shared_dict = build_staged_shared_dict(prefix.clone(), items)
326 .expect("building staged shared dictionary from string columns should succeed");
327
328 properties[group[0]] = StagedProperty::SharedDict(shared_dict);
330
331 for &col_idx in &group[1..] {
333 indices_to_remove.insert(col_idx);
334 }
335 }
336
337 let mut indices: Vec<usize> = indices_to_remove.into_iter().collect();
339 indices.sort_unstable();
340 for idx in indices.into_iter().rev() {
341 properties.remove(idx);
342 }
343}
344
345fn build_encoder(prop: &StagedProperty) -> PropertyEncoder {
347 match prop {
348 StagedProperty::Bool(v) => {
349 PropertyEncoder::Scalar(ScalarEncoder::bool(presence_stream(has_nulls(&v.values))))
350 }
351 StagedProperty::F32(v) => {
352 PropertyEncoder::Scalar(ScalarEncoder::float(presence_stream(has_nulls(&v.values))))
353 }
354 StagedProperty::F64(v) => {
355 PropertyEncoder::Scalar(ScalarEncoder::float(presence_stream(has_nulls(&v.values))))
356 }
357 StagedProperty::I8(v) => {
358 let presence = presence_stream(has_nulls(&v.values));
359 let non_null = v
361 .values
362 .iter()
363 .flatten()
364 .copied()
365 .map(i32::from)
366 .collect::<Vec<i32>>();
367 let enc = encode_zigzag(&non_null);
368 PropertyEncoder::Scalar(ScalarEncoder::int(presence, IntEncoder::auto_u32(&enc)))
369 }
370 StagedProperty::U8(v) => {
371 let presence = presence_stream(has_nulls(&v.values));
372 let non_null: Vec<u32> = v.values.iter().flatten().copied().map(u32::from).collect();
374 PropertyEncoder::Scalar(ScalarEncoder::int(
375 presence,
376 IntEncoder::auto_u32(&non_null),
377 ))
378 }
379 StagedProperty::I32(v) => {
380 let presence = presence_stream(has_nulls(&v.values));
381 let non_null = v.values.iter().flatten().copied().collect::<Vec<i32>>();
382 let enc = encode_zigzag(&non_null);
383 PropertyEncoder::Scalar(ScalarEncoder::int(presence, IntEncoder::auto_u32(&enc)))
384 }
385 StagedProperty::U32(v) => {
386 let presence = presence_stream(has_nulls(&v.values));
387 let non_null: Vec<u32> = v.values.iter().flatten().copied().collect();
388 PropertyEncoder::Scalar(ScalarEncoder::int(
389 presence,
390 IntEncoder::auto_u32(&non_null),
391 ))
392 }
393 StagedProperty::I64(v) => {
394 let presence = presence_stream(has_nulls(&v.values));
395 let non_null = &v.values.iter().flatten().copied().collect::<Vec<i64>>();
396 let enc = encode_zigzag(non_null);
397 PropertyEncoder::Scalar(ScalarEncoder::int(presence, IntEncoder::auto_u64(&enc)))
398 }
399 StagedProperty::U64(v) => {
400 let non_null: Vec<u64> = v.values.iter().flatten().copied().collect();
401 PropertyEncoder::Scalar(ScalarEncoder::int(
402 presence_stream(has_nulls(&v.values)),
403 IntEncoder::auto_u64(&non_null),
404 ))
405 }
406 StagedProperty::Str(v) => {
407 let presence = presence_stream(v.has_nulls());
408 let owned_values = v.dense_values();
409 let non_null: Vec<&str> = owned_values.iter().map(String::as_str).collect();
410 scalar_str_encoder(presence, &non_null)
411 }
412 StagedProperty::SharedDict(shared_dict) => build_shared_dict_encoder(shared_dict),
413 }
414}
415
416fn build_shared_dict_encoder(shared_dict: &StagedSharedDict) -> PropertyEncoder {
418 let dict_spans = collect_staged_shared_dict_spans(&shared_dict.items);
419 let all_strings: Vec<&str> = dict_spans
420 .iter()
421 .filter_map(|&span| shared_dict.get(span))
422 .collect();
423
424 let dict_encoder = if fsst_is_viable(&all_strings) {
425 StrEncoder::fsst(IntEncoder::varint(), IntEncoder::varint())
426 } else {
427 let lengths: Vec<u32> = all_strings
428 .iter()
429 .map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX))
430 .collect();
431 StrEncoder::Plain {
432 string_lengths: IntEncoder::auto_u32(&lengths),
433 }
434 };
435
436 let item_encoders: Vec<SharedDictItemEncoder> = shared_dict
437 .items
438 .iter()
439 .map(|item| {
440 let presence = presence_stream(item.has_nulls());
441 let offsets = compute_offset_encoder(&shared_dict.items, item);
442 SharedDictItemEncoder { presence, offsets }
443 })
444 .collect();
445
446 SharedDictEncoder {
447 dict_encoder,
448 items: item_encoders,
449 }
450 .into()
451}
452
453fn compute_offset_encoder(
456 items: &[StagedSharedDictItem],
457 target_item: &StagedSharedDictItem,
458) -> IntEncoder {
459 let dict_index: HashMap<(u32, u32), u32> = collect_staged_shared_dict_spans(items)
460 .into_iter()
461 .zip(0_u32..)
462 .collect();
463 let offsets: Vec<u32> = target_item
464 .dense_spans()
465 .iter()
466 .map(|span| {
467 *dict_index
468 .get(span)
469 .expect("non-null string span missing from shared dictionary")
470 })
471 .collect();
472
473 if offsets.is_empty() {
474 IntEncoder::plain()
475 } else {
476 IntEncoder::auto_u32(&offsets)
477 }
478}
479
480fn has_nulls<T>(values: &[Option<T>]) -> bool {
481 values.iter().any(Option::is_none)
482}
483
484fn presence_stream(has_nulls: bool) -> PresenceStream {
485 if has_nulls {
486 PresenceStream::Present
487 } else {
488 PresenceStream::Absent
489 }
490}
491
492fn common_prefix_name(names: &[&str]) -> String {
494 debug_assert!(!names.is_empty());
495 let first = names[0];
496 let mut prefix_len = first.len();
497 for name in &names[1..] {
498 let new_len = first
499 .chars()
500 .zip(name.chars())
501 .take_while(|(a, b)| a == b)
502 .count();
503 prefix_len = prefix_len.min(new_len);
504 if prefix_len == 0 {
505 break;
506 }
507 }
508 let prefix_len = first.floor_char_boundary(prefix_len);
509 let raw = &first[..prefix_len];
510 if raw.is_empty() {
511 String::new()
512 } else {
513 raw.to_owned()
514 }
515}
516
517fn scalar_str_encoder(presence: PresenceStream, non_null: &[&str]) -> PropertyEncoder {
519 let lengths: Vec<u32> = non_null
520 .iter()
521 .map(|s| u32::try_from(s.len()).unwrap_or(u32::MAX))
522 .collect();
523 if fsst_is_viable(non_null) {
524 PropertyEncoder::Scalar(ScalarEncoder::str_fsst(
525 presence,
526 IntEncoder::varint(),
527 IntEncoder::auto_u32(&lengths),
528 ))
529 } else {
530 PropertyEncoder::Scalar(ScalarEncoder::str(presence, IntEncoder::auto_u32(&lengths)))
531 }
532}
533
534fn fsst_is_viable(strings: &[&str]) -> bool {
536 if strings.is_empty() {
537 return false;
538 }
539 let sample = if strings.len() <= FSST_SAMPLE_STRINGS {
540 strings
541 } else {
542 &strings[..FSST_SAMPLE_STRINGS]
543 };
544 let plain_size: usize = sample.iter().map(|s| s.len()).sum();
545 if plain_size < FSST_OVERHEAD_THRESHOLD {
546 return false;
547 }
548 let byte_slices: Vec<&[u8]> = sample.iter().map(|s| s.as_bytes()).collect();
549 let compressor = Compressor::train(&byte_slices);
550 let symbols = compressor.symbol_table();
551 let symbol_lengths = compressor.symbol_lengths();
552 let symbol_overhead: usize = symbol_lengths
553 .iter()
554 .take(symbols.len())
555 .map(|&l| usize::from(l))
556 .sum();
557 let compressed_size: usize = sample
558 .iter()
559 .map(|s| compressor.compress(s.as_bytes()).len())
560 .sum();
561 symbol_overhead + compressed_size < plain_size
562}