re_chunk_store/
writes.rs

1use std::{collections::BTreeSet, sync::Arc};
2
3use ahash::HashMap;
4use arrow::array::Array as _;
5use itertools::Itertools as _;
6
7use re_byte_size::SizeBytes;
8use re_chunk::{Chunk, EntityPath, RowId};
9
10use crate::{
11    store::ChunkIdSetPerTime, ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreDiff,
12    ChunkStoreError, ChunkStoreEvent, ChunkStoreResult, ColumnMetadataState,
13};
14
15// Used all over in docstrings.
16#[allow(unused_imports)]
17use crate::ChunkId;
18
19// ---
20
21impl ChunkStore {
22    /// Inserts a [`Chunk`] in the store.
23    ///
24    /// Iff the store was modified, all registered subscribers will be notified and the
25    /// resulting [`ChunkStoreEvent`] will be returned, or `None` otherwise.
26    ///
27    /// * Trying to insert an unsorted chunk ([`Chunk::is_sorted`]) will fail with an error.
28    /// * Inserting a duplicated [`ChunkId`] will result in a no-op.
29    /// * Inserting an empty [`Chunk`] will result in a no-op.
30    pub fn insert_chunk(&mut self, chunk: &Arc<Chunk>) -> ChunkStoreResult<Vec<ChunkStoreEvent>> {
31        if self.chunks_per_chunk_id.contains_key(&chunk.id()) {
32            // We assume that chunk IDs are unique, and that reinserting a chunk has no effect.
33            re_log::debug_once!(
34                "Chunk #{} was inserted more than once (this has no effect)",
35                chunk.id()
36            );
37            return Ok(Vec::new());
38        }
39
40        if !chunk.is_sorted() {
41            return Err(ChunkStoreError::UnsortedChunk);
42        }
43
44        let Some(row_id_range) = chunk.row_id_range() else {
45            return Ok(Vec::new());
46        };
47
48        re_tracing::profile_function!();
49
50        self.insert_id += 1;
51
52        let mut chunk = Arc::clone(chunk);
53
54        // We're in a transition period during which the Rerun ecosystem is slowly moving over to tagged data.
55        //
56        // During that time, it is common to end up in situations where the blueprint intermixes both tagged
57        // and untagged components, which invariably leads to undefined behavior.
58        // To prevent that, we just always hot-patch it to untagged, for now.
59        //
60        // Examples:
61        // * An SDK logs a blueprint (tagged), which is then updated by the viewer (which uses untagged log calls).
62        // * Somebody loads an old .rbl from somewhere and starts logging new blueprint data to it.
63        // * Etc.
64        if self.id.kind == re_log_types::StoreKind::Blueprint {
65            let patched = chunk.patched_for_blueprint_021_compat();
66            chunk = Arc::new(patched);
67        }
68
69        #[cfg(debug_assertions)]
70        for (component_name, per_desc) in chunk.components().iter() {
71            assert!(
72                per_desc.len() <= 1,
73                "[DEBUG ONLY] Insert Chunk with multiple values for component named `{component_name}`: this is currently UB\n{chunk}",
74            );
75        }
76
77        let non_compacted_chunk = Arc::clone(&chunk); // we'll need it to create the store event
78
79        let (chunk, diffs) = if chunk.is_static() {
80            // Static data: make sure to keep the most recent chunk available for each component column.
81            re_tracing::profile_scope!("static");
82
83            let row_id_range_per_component = chunk.row_id_range_per_component();
84
85            let mut overwritten_chunk_ids = HashMap::default();
86
87            for (component_desc, list_array) in chunk.components().iter_flattened() {
88                let is_empty = list_array
89                    .nulls()
90                    .is_some_and(|validity| validity.is_empty());
91                if is_empty {
92                    continue;
93                }
94
95                let Some((_row_id_min_for_component, row_id_max_for_component)) =
96                    row_id_range_per_component
97                        .get(&component_desc.component_name)
98                        .and_then(|per_desc| per_desc.get(component_desc))
99                else {
100                    continue;
101                };
102
103                self.static_chunk_ids_per_entity
104                    .entry(chunk.entity_path().clone())
105                    .or_default()
106                    .entry(component_desc.component_name)
107                    .and_modify(|cur_chunk_id| {
108                        // NOTE: When attempting to overwrite static data, the chunk with the most
109                        // recent data within -- according to RowId -- wins.
110
111                        let cur_row_id_max_for_component = self
112                            .chunks_per_chunk_id
113                            .get(cur_chunk_id)
114                            .map_or(RowId::ZERO, |chunk| {
115                                chunk
116                                    .row_id_range_per_component()
117                                    .get(&component_desc.component_name)
118                                    .and_then(|per_desc| per_desc.get(component_desc))
119                                    .map_or(RowId::ZERO, |(_, row_id_max)| *row_id_max)
120                            });
121
122                        if *row_id_max_for_component > cur_row_id_max_for_component {
123                            // We are about to overwrite the existing chunk with the new one, at
124                            // least for this one specific component.
125                            // Keep track of the overwritten ChunkId: we'll need it further down in
126                            // order to check whether that chunk is now dangling.
127
128                            // NOTE: The chunks themselves are indexed using the smallest RowId in
129                            // the chunk _as a whole_, as opposed to the smallest RowId of one
130                            // specific component in that chunk.
131                            let cur_row_id_min_for_chunk = self
132                                .chunks_per_chunk_id
133                                .get(cur_chunk_id)
134                                .and_then(|chunk| {
135                                    chunk.row_id_range().map(|(row_id_min, _)| row_id_min)
136                                });
137
138                            debug_assert!(
139                                cur_row_id_min_for_chunk.is_some(),
140                                "This condition cannot fail, we just want to avoid unwrapping",
141                            );
142                            if let Some(cur_row_id_min_for_chunk) = cur_row_id_min_for_chunk {
143                                overwritten_chunk_ids
144                                    .insert(*cur_chunk_id, cur_row_id_min_for_chunk);
145                            }
146
147                            *cur_chunk_id = chunk.id();
148                        }
149                    })
150                    .or_insert_with(|| chunk.id());
151            }
152
153            self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(&chunk);
154
155            let mut diffs = vec![ChunkStoreDiff::addition(
156                non_compacted_chunk, /* added */
157                None,                /* compacted */
158            )];
159
160            // NOTE: Our chunks can only cover a single entity path at a time, therefore we know we
161            // only have to check that one entity for complete overwrite.
162            debug_assert!(
163                self.static_chunk_ids_per_entity
164                    .contains_key(chunk.entity_path()),
165                "This condition cannot fail, we just want to avoid unwrapping",
166            );
167            if let Some(per_component) = self.static_chunk_ids_per_entity.get(chunk.entity_path()) {
168                re_tracing::profile_scope!("static dangling checks");
169
170                // At this point, we are in possession of a list of ChunkIds that were at least
171                // _partially_ overwritten (i.e. some, but not necessarily all, of the components
172                // that they used to provide the data for are now provided by another, newer chunk).
173                //
174                // To determine whether any of these chunks are actually fully overwritten, and
175                // therefore dangling, we need to make sure there are no components left
176                // referencing these ChunkIds whatsoever.
177                //
178                // Because our storage model guarantees that a single chunk cannot cover more than
179                // one entity, this is actually pretty cheap to do, since we only have to loop over
180                // all the components of a single entity.
181
182                for (chunk_id, chunk_row_id_min) in overwritten_chunk_ids {
183                    let has_been_fully_overwritten = !per_component
184                        .values()
185                        .any(|cur_chunk_id| *cur_chunk_id == chunk_id);
186
187                    if has_been_fully_overwritten {
188                        // The chunk is now dangling: remove it from all relevant indices, update
189                        // the stats, and fire deletion events.
190
191                        let chunk_id_removed =
192                            self.chunk_ids_per_min_row_id.remove(&chunk_row_id_min);
193                        debug_assert!(chunk_id_removed.is_some());
194
195                        let chunk_removed = self.chunks_per_chunk_id.remove(&chunk_id);
196                        debug_assert!(chunk_removed.is_some());
197
198                        if let Some(chunk_removed) = chunk_removed {
199                            self.static_chunks_stats -=
200                                ChunkStoreChunkStats::from_chunk(&chunk_removed);
201                            diffs.push(ChunkStoreDiff::deletion(chunk_removed));
202                        }
203                    }
204                }
205            }
206
207            (Arc::clone(&chunk), diffs)
208        } else {
209            // Temporal data: just index the chunk on every dimension of interest.
210            re_tracing::profile_scope!("temporal");
211
212            let (elected_chunk, chunk_or_compacted) = {
213                re_tracing::profile_scope!("election");
214
215                let elected_chunk = self.find_and_elect_compaction_candidate(&chunk);
216
217                let chunk_or_compacted = if let Some(elected_chunk) = &elected_chunk {
218                    let chunk_rowid_min = chunk.row_id_range().map(|(min, _)| min);
219                    let elected_rowid_min = elected_chunk.row_id_range().map(|(min, _)| min);
220
221                    let mut compacted = if elected_rowid_min < chunk_rowid_min {
222                        re_tracing::profile_scope!("concat");
223                        elected_chunk.concatenated(&chunk)?
224                    } else {
225                        re_tracing::profile_scope!("concat");
226                        chunk.concatenated(elected_chunk)?
227                    };
228
229                    {
230                        re_tracing::profile_scope!("sort");
231                        compacted.sort_if_unsorted();
232                    }
233
234                    re_log::trace!(
235                        "compacted {} ({} rows) and {} ({} rows) together, resulting in {} ({} rows)",
236                        chunk.id(),
237                        re_format::format_uint(chunk.num_rows()),
238                        elected_chunk.id(),
239                        re_format::format_uint(elected_chunk.num_rows()),
240                        compacted.id(),
241                        re_format::format_uint(compacted.num_rows()),
242                    );
243
244                    Arc::new(compacted)
245                } else {
246                    Arc::clone(&chunk)
247                };
248
249                (elected_chunk, chunk_or_compacted)
250            };
251
252            {
253                re_tracing::profile_scope!("insertion (w/ component)");
254
255                let temporal_chunk_ids_per_timeline = self
256                    .temporal_chunk_ids_per_entity_per_component
257                    .entry(chunk_or_compacted.entity_path().clone())
258                    .or_default();
259
260                // NOTE: We must make sure to use the time range of each specific component column
261                // here, or we open ourselves to nasty edge cases.
262                //
263                // See the `latest_at_sparse_component_edge_case` test.
264                for (timeline, time_range_per_component) in
265                    chunk_or_compacted.time_range_per_component()
266                {
267                    let temporal_chunk_ids_per_component =
268                        temporal_chunk_ids_per_timeline.entry(timeline).or_default();
269
270                    for (component_name, per_desc) in time_range_per_component {
271                        for (_component_desc, time_range) in per_desc {
272                            let temporal_chunk_ids_per_time = temporal_chunk_ids_per_component
273                                .entry(component_name)
274                                .or_default();
275
276                            // See `ChunkIdSetPerTime::max_interval_length`'s documentation.
277                            temporal_chunk_ids_per_time.max_interval_length = u64::max(
278                                temporal_chunk_ids_per_time.max_interval_length,
279                                time_range.abs_length(),
280                            );
281
282                            temporal_chunk_ids_per_time
283                                .per_start_time
284                                .entry(time_range.min())
285                                .or_default()
286                                .insert(chunk_or_compacted.id());
287                            temporal_chunk_ids_per_time
288                                .per_end_time
289                                .entry(time_range.max())
290                                .or_default()
291                                .insert(chunk_or_compacted.id());
292                        }
293                    }
294                }
295            }
296
297            {
298                re_tracing::profile_scope!("insertion (w/o component)");
299
300                let temporal_chunk_ids_per_timeline = self
301                    .temporal_chunk_ids_per_entity
302                    .entry(chunk_or_compacted.entity_path().clone())
303                    .or_default();
304
305                for (timeline, time_column) in chunk_or_compacted.timelines() {
306                    let temporal_chunk_ids_per_time = temporal_chunk_ids_per_timeline
307                        .entry(*timeline)
308                        .or_default();
309
310                    let time_range = time_column.time_range();
311
312                    // See `ChunkIdSetPerTime::max_interval_length`'s documentation.
313                    temporal_chunk_ids_per_time.max_interval_length = u64::max(
314                        temporal_chunk_ids_per_time.max_interval_length,
315                        time_range.abs_length(),
316                    );
317
318                    temporal_chunk_ids_per_time
319                        .per_start_time
320                        .entry(time_range.min())
321                        .or_default()
322                        .insert(chunk_or_compacted.id());
323                    temporal_chunk_ids_per_time
324                        .per_end_time
325                        .entry(time_range.max())
326                        .or_default()
327                        .insert(chunk_or_compacted.id());
328                }
329            }
330
331            self.temporal_chunks_stats += ChunkStoreChunkStats::from_chunk(&chunk_or_compacted);
332
333            let mut diff = ChunkStoreDiff::addition(
334                // NOTE: We are advertising only the non-compacted chunk as "added", i.e. only the new data.
335                //
336                // This makes sure that downstream subscribers only have to process what is new,
337                // instead of needlessly reprocessing old rows that would appear to have been
338                // removed and reinserted due to compaction.
339                //
340                // Subscribers will still be capable of tracking which chunks have been merged with which
341                // by using the compaction report that we fill below.
342                Arc::clone(&non_compacted_chunk), /* added */
343                None,                             /* compacted */
344            );
345            if let Some(elected_chunk) = &elected_chunk {
346                // NOTE: The chunk that we've just added has been compacted already!
347                let srcs = std::iter::once((non_compacted_chunk.id(), non_compacted_chunk))
348                    .chain(
349                        self.remove_chunk(elected_chunk.id())
350                            .into_iter()
351                            .filter(|diff| diff.kind == crate::ChunkStoreDiffKind::Deletion)
352                            .map(|diff| (diff.chunk.id(), diff.chunk)),
353                    )
354                    .collect();
355
356                diff.compacted = Some(crate::ChunkCompactionReport {
357                    srcs,
358                    new_chunk: chunk_or_compacted.clone(),
359                });
360            }
361
362            (chunk_or_compacted, vec![diff])
363        };
364
365        self.chunks_per_chunk_id.insert(chunk.id(), chunk.clone());
366        self.chunk_ids_per_min_row_id
367            .entry(row_id_range.0)
368            .or_default()
369            .push(chunk.id());
370
371        for (name, columns) in chunk.timelines() {
372            let new_typ = columns.timeline().typ();
373            if let Some(old_typ) = self.time_type_registry.insert(*name, new_typ) {
374                if old_typ != new_typ {
375                    re_log::warn_once!(
376                        "Timeline '{name}' changed type from {old_typ:?} to {new_typ:?}. \
377                        Rerun does not support using different types for the same timeline.",
378                    );
379                }
380            }
381        }
382
383        for (component_descr, list_array) in chunk.components().iter_flattened() {
384            if let Some(old_typ) = self
385                .type_registry
386                .insert(component_descr.component_name, list_array.value_type())
387            {
388                if old_typ != list_array.value_type() {
389                    re_log::warn_once!(
390                        "Component column '{}' changed type from {old_typ:?} to {:?}",
391                        component_descr.component_name,
392                        list_array.value_type()
393                    );
394                }
395            }
396
397            let column_metadata_state = self
398                .per_column_metadata
399                .entry(chunk.entity_path().clone())
400                .or_default()
401                .entry(component_descr.component_name)
402                .or_default()
403                .entry(component_descr.clone())
404                .or_insert(ColumnMetadataState {
405                    is_semantically_empty: true,
406                });
407            {
408                let is_semantically_empty =
409                    re_arrow_util::is_list_array_semantically_empty(list_array);
410
411                column_metadata_state.is_semantically_empty &= is_semantically_empty;
412            }
413        }
414
415        let events = if self.config.enable_changelog {
416            let events: Vec<_> = diffs
417                .into_iter()
418                .map(|diff| ChunkStoreEvent {
419                    store_id: self.id.clone(),
420                    store_generation: self.generation(),
421                    event_id: self
422                        .event_id
423                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
424                    diff,
425                })
426                .collect();
427
428            Self::on_events(&events);
429
430            events
431        } else {
432            Vec::new()
433        };
434
435        Ok(events)
436    }
437
438    /// Finds the most appropriate candidate for compaction.
439    ///
440    /// The algorithm is simple: for each incoming [`Chunk`], we take a look at its future neighbors.
441    /// Each neighbor is a potential candidate for compaction.
442    ///
443    /// Because the chunk is going to be inserted into many different indices -- for each of its timelines
444    /// and components -- it will have many direct neighbors.
445    /// Everytime we encounter a neighbor, it earns points.
446    ///
447    /// The neighbor with the most points at the end of the process is elected.
448    fn find_and_elect_compaction_candidate(&self, chunk: &Arc<Chunk>) -> Option<Arc<Chunk>> {
449        re_tracing::profile_function!();
450
451        let mut candidates_below_threshold: HashMap<ChunkId, bool> = HashMap::default();
452        let mut check_if_chunk_below_threshold =
453            |store: &Self, candidate_chunk_id: ChunkId| -> bool {
454                let ChunkStoreConfig {
455                    enable_changelog: _,
456                    chunk_max_bytes,
457                    chunk_max_rows,
458                    chunk_max_rows_if_unsorted,
459                } = store.config;
460
461                *candidates_below_threshold
462                    .entry(candidate_chunk_id)
463                    .or_insert_with(|| {
464                        store
465                            .chunks_per_chunk_id
466                            .get(&candidate_chunk_id)
467                            .is_some_and(|candidate| {
468                                if !chunk.concatenable(candidate) {
469                                    return false;
470                                }
471
472                                let total_bytes = <Chunk as SizeBytes>::total_size_bytes(chunk)
473                                    + <Chunk as SizeBytes>::total_size_bytes(candidate);
474                                let is_below_bytes_threshold = total_bytes <= chunk_max_bytes;
475
476                                let total_rows = (chunk.num_rows() + candidate.num_rows()) as u64;
477                                let is_below_rows_threshold = if candidate.is_time_sorted() {
478                                    total_rows <= chunk_max_rows
479                                } else {
480                                    total_rows <= chunk_max_rows_if_unsorted
481                                };
482
483                                is_below_bytes_threshold && is_below_rows_threshold
484                            })
485                    })
486            };
487
488        let mut candidates: HashMap<ChunkId, u64> = HashMap::default();
489
490        let temporal_chunk_ids_per_timeline = self
491            .temporal_chunk_ids_per_entity_per_component
492            .get(chunk.entity_path())?;
493
494        for (timeline, time_range_per_component) in chunk.time_range_per_component() {
495            let Some(temporal_chunk_ids_per_component) =
496                temporal_chunk_ids_per_timeline.get(&timeline)
497            else {
498                continue;
499            };
500
501            for (component_name, per_desc) in time_range_per_component {
502                for (_component_desc, time_range) in per_desc {
503                    let Some(temporal_chunk_ids_per_time) =
504                        temporal_chunk_ids_per_component.get(&component_name)
505                    else {
506                        continue;
507                    };
508
509                    {
510                        // Direct neighbors (before): 1 point each.
511                        if let Some((_data_time, chunk_id_set)) = temporal_chunk_ids_per_time
512                            .per_start_time
513                            .range(..time_range.min())
514                            .next_back()
515                        {
516                            for &chunk_id in chunk_id_set {
517                                if check_if_chunk_below_threshold(self, chunk_id) {
518                                    *candidates.entry(chunk_id).or_default() += 1;
519                                }
520                            }
521                        }
522
523                        // Direct neighbors (after): 1 point each.
524                        if let Some((_data_time, chunk_id_set)) = temporal_chunk_ids_per_time
525                            .per_start_time
526                            .range(time_range.max().inc()..)
527                            .next()
528                        {
529                            for &chunk_id in chunk_id_set {
530                                if check_if_chunk_below_threshold(self, chunk_id) {
531                                    *candidates.entry(chunk_id).or_default() += 1;
532                                }
533                            }
534                        }
535
536                        let chunk_id_set = temporal_chunk_ids_per_time
537                            .per_start_time
538                            .get(&time_range.min());
539
540                        // Shared start times: 2 points each.
541                        for chunk_id in chunk_id_set.iter().flat_map(|set| set.iter().copied()) {
542                            if check_if_chunk_below_threshold(self, chunk_id) {
543                                *candidates.entry(chunk_id).or_default() += 2;
544                            }
545                        }
546                    }
547                }
548            }
549        }
550
551        debug_assert!(!candidates.contains_key(&chunk.id()));
552
553        let mut candidates = candidates.into_iter().collect_vec();
554        candidates.sort_by_key(|(_chunk_id, points)| *points);
555        candidates.reverse();
556
557        candidates
558            .into_iter()
559            .find_map(|(chunk_id, _points)| self.chunks_per_chunk_id.get(&chunk_id).map(Arc::clone))
560    }
561
562    /// Unconditionally drops all the data for a given `entity_path`.
563    ///
564    /// Returns the list of `Chunk`s that were dropped from the store in the form of [`ChunkStoreEvent`]s.
565    ///
566    /// This is _not_ recursive. The store is unaware of the entity hierarchy.
567    pub fn drop_entity_path(&mut self, entity_path: &EntityPath) -> Vec<ChunkStoreEvent> {
568        re_tracing::profile_function!(entity_path.to_string());
569
570        self.gc_id += 1; // close enough
571
572        let generation = self.generation();
573
574        let Self {
575            id,
576            info: _,
577            config: _,
578            time_type_registry: _,
579            type_registry: _,
580            per_column_metadata,
581            chunks_per_chunk_id,
582            chunk_ids_per_min_row_id,
583            temporal_chunk_ids_per_entity_per_component,
584            temporal_chunk_ids_per_entity,
585            temporal_chunks_stats,
586            static_chunk_ids_per_entity,
587            static_chunks_stats,
588            insert_id: _,
589            gc_id: _,
590            event_id,
591        } = self;
592
593        per_column_metadata.remove(entity_path);
594
595        let dropped_static_chunks = {
596            let dropped_static_chunk_ids: BTreeSet<_> = static_chunk_ids_per_entity
597                .remove(entity_path)
598                .unwrap_or_default()
599                .into_values()
600                .collect();
601
602            chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
603                chunk_ids.retain(|chunk_id| !dropped_static_chunk_ids.contains(chunk_id));
604                !chunk_ids.is_empty()
605            });
606
607            dropped_static_chunk_ids.into_iter()
608        };
609
610        let dropped_temporal_chunks = {
611            temporal_chunk_ids_per_entity_per_component.remove(entity_path);
612
613            let dropped_temporal_chunk_ids: BTreeSet<_> = temporal_chunk_ids_per_entity
614                .remove(entity_path)
615                .unwrap_or_default()
616                .into_values()
617                .flat_map(|temporal_chunk_ids_per_time| {
618                    let ChunkIdSetPerTime {
619                        max_interval_length: _,
620                        per_start_time,
621                        per_end_time: _, // same chunk IDs as above
622                    } = temporal_chunk_ids_per_time;
623
624                    per_start_time
625                        .into_values()
626                        .flat_map(|chunk_ids| chunk_ids.into_iter())
627                })
628                .collect();
629
630            chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
631                chunk_ids.retain(|chunk_id| !dropped_temporal_chunk_ids.contains(chunk_id));
632                !chunk_ids.is_empty()
633            });
634
635            dropped_temporal_chunk_ids.into_iter()
636        };
637
638        let dropped_static_chunks = dropped_static_chunks
639            .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
640            .inspect(|chunk| {
641                *static_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
642            })
643            // NOTE: gotta collect to release the mut ref on `chunks_per_chunk_id`.
644            .collect_vec();
645
646        let dropped_temporal_chunks = dropped_temporal_chunks
647            .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
648            .inspect(|chunk| {
649                *temporal_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
650            });
651
652        if self.config.enable_changelog {
653            let events: Vec<_> = dropped_static_chunks
654                .into_iter()
655                .chain(dropped_temporal_chunks)
656                .map(ChunkStoreDiff::deletion)
657                .map(|diff| ChunkStoreEvent {
658                    store_id: id.clone(),
659                    store_generation: generation.clone(),
660                    event_id: event_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
661                    diff,
662                })
663                .collect();
664
665            Self::on_events(&events);
666
667            events
668        } else {
669            Vec::new()
670        }
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use re_chunk::{TimePoint, Timeline};
677    use re_log_types::example_components::{MyColor, MyLabel, MyPoint};
678    use similar_asserts::assert_eq;
679
680    use crate::ChunkStoreDiffKind;
681
682    use super::*;
683
684    // TODO(cmc): We could have more test coverage here, especially regarding thresholds etc.
685    // For now the development and maintenance cost doesn't seem to be worth it.
686    // We can re-assess later if things turns out to be shaky in practice.
687
688    #[test]
689    fn compaction_simple() -> anyhow::Result<()> {
690        re_log::setup_logging();
691
692        let mut store = ChunkStore::new(
693            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
694            Default::default(),
695        );
696
697        let entity_path = EntityPath::from("this/that");
698
699        let row_id1 = RowId::new();
700        let row_id2 = RowId::new();
701        let row_id3 = RowId::new();
702        let row_id4 = RowId::new();
703        let row_id5 = RowId::new();
704        let row_id6 = RowId::new();
705        let row_id7 = RowId::new();
706        let row_id8 = RowId::new();
707        let row_id9 = RowId::new();
708        let row_id10 = RowId::new();
709
710        let timepoint1 = [(Timeline::new_sequence("frame"), 1)];
711        let timepoint2 = [(Timeline::new_sequence("frame"), 3)];
712        let timepoint3 = [(Timeline::new_sequence("frame"), 5)];
713        let timepoint4 = [(Timeline::new_sequence("frame"), 7)];
714        let timepoint5 = [(Timeline::new_sequence("frame"), 9)];
715
716        let points1 = &[MyPoint::new(1.0, 1.0)];
717        let points2 = &[MyPoint::new(2.0, 2.0)];
718        let points3 = &[MyPoint::new(3.0, 3.0)];
719        let points4 = &[MyPoint::new(4.0, 4.0)];
720        let points5 = &[MyPoint::new(5.0, 5.0)];
721
722        let chunk1 = Chunk::builder(entity_path.clone())
723            .with_component_batches(row_id1, timepoint1, [points1 as _])
724            .with_component_batches(row_id2, timepoint2, [points2 as _])
725            .with_component_batches(row_id3, timepoint3, [points3 as _])
726            .build()?;
727        let chunk2 = Chunk::builder(entity_path.clone())
728            .with_component_batches(row_id4, timepoint4, [points4 as _])
729            .with_component_batches(row_id5, timepoint5, [points5 as _])
730            .build()?;
731        let chunk3 = Chunk::builder(entity_path.clone())
732            .with_component_batches(row_id6, timepoint1, [points1 as _])
733            .with_component_batches(row_id7, timepoint2, [points2 as _])
734            .with_component_batches(row_id8, timepoint3, [points3 as _])
735            .build()?;
736        let chunk4 = Chunk::builder(entity_path.clone())
737            .with_component_batches(row_id9, timepoint4, [points4 as _])
738            .with_component_batches(row_id10, timepoint5, [points5 as _])
739            .build()?;
740
741        let chunk1 = Arc::new(chunk1);
742        let chunk2 = Arc::new(chunk2);
743        let chunk3 = Arc::new(chunk3);
744        let chunk4 = Arc::new(chunk4);
745
746        eprintln!("---\n{store}\ninserting {}", chunk1.id());
747
748        store.insert_chunk(&chunk1)?;
749
750        eprintln!("---\n{store}\ninserting {}", chunk2.id());
751
752        store.insert_chunk(&chunk2)?;
753
754        eprintln!("---\n{store}\ninserting {}", chunk3.id());
755
756        store.insert_chunk(&chunk3)?;
757
758        eprintln!("---\n{store}\ninserting {}", chunk4.id());
759
760        store.insert_chunk(&chunk4)?;
761
762        eprintln!("---\n{store}");
763
764        let got = store
765            .chunks_per_chunk_id
766            .first_key_value()
767            .map(|(_id, chunk)| chunk)
768            .unwrap();
769
770        let expected = Chunk::builder_with_id(got.id(), entity_path.clone())
771            .with_component_batches(row_id1, timepoint1, [points1 as _])
772            .with_component_batches(row_id2, timepoint2, [points2 as _])
773            .with_component_batches(row_id3, timepoint3, [points3 as _])
774            .with_component_batches(row_id4, timepoint4, [points4 as _])
775            .with_component_batches(row_id5, timepoint5, [points5 as _])
776            .with_component_batches(row_id6, timepoint1, [points1 as _])
777            .with_component_batches(row_id7, timepoint2, [points2 as _])
778            .with_component_batches(row_id8, timepoint3, [points3 as _])
779            .with_component_batches(row_id9, timepoint4, [points4 as _])
780            .with_component_batches(row_id10, timepoint5, [points5 as _])
781            .build()?;
782
783        assert_eq!(1, store.chunks_per_chunk_id.len());
784        assert_eq!(
785            expected,
786            **got,
787            "{}",
788            similar_asserts::SimpleDiff::from_str(
789                &format!("{expected}"),
790                &format!("{got}"),
791                "expected",
792                "got",
793            ),
794        );
795
796        Ok(())
797    }
798
799    #[test]
800    fn static_overwrites() -> anyhow::Result<()> {
801        re_log::setup_logging();
802
803        let mut store = ChunkStore::new(
804            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
805            Default::default(),
806        );
807
808        let entity_path = EntityPath::from("this/that");
809
810        let row_id1_1 = RowId::new();
811        let row_id2_1 = RowId::new();
812        let row_id2_2 = RowId::new();
813
814        let timepoint_static = TimePoint::default();
815
816        let points1 = &[MyPoint::new(1.0, 1.0)];
817        let colors1 = &[MyColor::from_rgb(1, 1, 1)];
818        let labels1 = &[MyLabel("111".to_owned())];
819
820        let points2 = &[MyPoint::new(2.0, 2.0)];
821        let colors2 = &[MyColor::from_rgb(2, 2, 2)];
822        let labels2 = &[MyLabel("222".to_owned())];
823
824        let chunk1 = Chunk::builder(entity_path.clone())
825            .with_component_batches(
826                row_id1_1,
827                timepoint_static.clone(),
828                [points1 as _, colors1 as _, labels1 as _],
829            )
830            .build()?;
831        let chunk2 = Chunk::builder(entity_path.clone())
832            .with_component_batches(
833                row_id2_1,
834                timepoint_static.clone(),
835                [points2 as _, colors2 as _],
836            )
837            .build()?;
838        let chunk3 = Chunk::builder(entity_path.clone())
839            .with_component_batches(row_id2_2, timepoint_static, [labels2 as _])
840            .build()?;
841
842        let chunk1 = Arc::new(chunk1);
843        let chunk2 = Arc::new(chunk2);
844        let chunk3 = Arc::new(chunk3);
845
846        let events = store.insert_chunk(&chunk1)?;
847        assert!(
848            events.len() == 1
849                && events[0].chunk.id() == chunk1.id()
850                && events[0].kind == ChunkStoreDiffKind::Addition,
851            "the first write should result in the addition of chunk1 and nothing else"
852        );
853
854        let events = store.insert_chunk(&chunk2)?;
855        assert!(
856            events.len() == 1
857                && events[0].chunk.id() == chunk2.id()
858                && events[0].kind == ChunkStoreDiffKind::Addition,
859            "the second write should result in the addition of chunk2 and nothing else"
860        );
861
862        let stats_before = store.stats();
863        {
864            let ChunkStoreChunkStats {
865                num_chunks,
866                total_size_bytes: _,
867                num_rows,
868                num_events,
869            } = stats_before.static_chunks;
870            assert_eq!(2, num_chunks);
871            assert_eq!(2, num_rows);
872            assert_eq!(5, num_events);
873        }
874
875        let events = store.insert_chunk(&chunk3)?;
876        assert!(
877            events.len() == 2
878                && events[0].chunk.id() == chunk3.id()
879                && events[0].kind == ChunkStoreDiffKind::Addition
880                && events[1].chunk.id() == chunk1.id()
881                && events[1].kind == ChunkStoreDiffKind::Deletion,
882            "the final write should result in the addition of chunk3 _and_ the deletion of the now fully overwritten chunk1"
883        );
884
885        let stats_after = store.stats();
886        {
887            let ChunkStoreChunkStats {
888                num_chunks,
889                total_size_bytes: _,
890                num_rows,
891                num_events,
892            } = stats_after.static_chunks;
893            assert_eq!(2, num_chunks);
894            assert_eq!(2, num_rows);
895            assert_eq!(3, num_events);
896        }
897
898        Ok(())
899    }
900}