re_chunk_store/
gc.rs

1use std::{
2    collections::{btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeSet},
3    time::Duration,
4};
5
6use ahash::{HashMap, HashSet};
7use nohash_hasher::IntMap;
8use re_byte_size::SizeBytes;
9use web_time::Instant;
10
11use re_chunk::{Chunk, ChunkId, TimelineName};
12use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt};
13use re_types_core::ComponentName;
14
15use crate::{
16    store::ChunkIdSetPerTime, ChunkStore, ChunkStoreChunkStats, ChunkStoreDiff, ChunkStoreDiffKind,
17    ChunkStoreEvent, ChunkStoreStats,
18};
19
20// Used all over in docstrings.
21#[allow(unused_imports)]
22use crate::RowId;
23
24// ---
25
26#[derive(Debug, Clone, Copy)]
27pub enum GarbageCollectionTarget {
28    /// Try to drop _at least_ the given fraction.
29    ///
30    /// The fraction must be a float in the range [0.0 : 1.0].
31    DropAtLeastFraction(f64),
32
33    /// GC Everything that isn't protected.
34    Everything,
35}
36
37#[derive(Debug, Clone)]
38pub struct GarbageCollectionOptions {
39    /// What target threshold should the GC try to meet.
40    pub target: GarbageCollectionTarget,
41
42    /// How long the garbage collection in allowed to run for.
43    ///
44    /// Trades off latency for throughput:
45    /// - A smaller `time_budget` will clear less data in a shorter amount of time, allowing for a
46    ///   more responsive UI at the cost of more GC overhead and more frequent runs.
47    /// - A larger `time_budget` will clear more data in a longer amount of time, increasing the
48    ///   chance of UI freeze frames but decreasing GC overhead and running less often.
49    ///
50    /// The default is an unbounded time budget (i.e. throughput only).
51    pub time_budget: Duration,
52
53    /// How many component revisions to preserve on each timeline.
54    pub protect_latest: usize,
55
56    /// Do not remove any data within these time ranges.
57    pub protected_time_ranges: IntMap<TimelineName, ResolvedTimeRange>,
58}
59
60impl GarbageCollectionOptions {
61    pub fn gc_everything() -> Self {
62        Self {
63            target: GarbageCollectionTarget::Everything,
64            time_budget: std::time::Duration::MAX,
65            protect_latest: 0,
66            protected_time_ranges: Default::default(),
67        }
68    }
69
70    /// If true, we cannot remove this chunk.
71    pub fn is_chunk_protected(&self, chunk: &Chunk) -> bool {
72        for (timeline, protected_time_range) in &self.protected_time_ranges {
73            if let Some(time_column) = chunk.timelines().get(timeline) {
74                if time_column.time_range().intersects(*protected_time_range) {
75                    return true;
76                }
77            }
78        }
79        false
80    }
81}
82
83impl std::fmt::Display for GarbageCollectionTarget {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match self {
86            Self::DropAtLeastFraction(p) => {
87                write!(f, "DropAtLeast({:.3}%)", *p * 100.0)
88            }
89            Self::Everything => write!(f, "Everything"),
90        }
91    }
92}
93
94pub type RemovableChunkIdPerTimePerComponentPerTimelinePerEntity =
95    IntMap<EntityPath, IntMap<TimelineName, IntMap<ComponentName, HashMap<TimeInt, Vec<ChunkId>>>>>;
96
97impl ChunkStore {
98    /// Triggers a garbage collection according to the desired `target`.
99    ///
100    /// Returns the list of `Chunk`s that were purged from the store in the form of [`ChunkStoreEvent`]s.
101    ///
102    /// ## Semantics
103    ///
104    /// Garbage collection works on a chunk-level basis and is driven by [`RowId`] order
105    /// (specifically, the smallest `RowId` of each respective Chunk), i.e. the order defined
106    /// by the clients' wall-clocks, allowing it to drop data across the different timelines in
107    /// a fair, deterministic manner.
108    /// Similarly, out-of-order data is supported out of the box.
109    ///
110    /// The garbage collector doesn't deallocate data in and of itself: all it does is drop the
111    /// store's internal references to that data (the `Chunk`s), which will be deallocated once
112    /// their reference count reaches 0.
113    ///
114    /// ## Limitations
115    ///
116    /// The garbage collector has limited support for latest-at semantics. The configuration option:
117    /// [`GarbageCollectionOptions::protect_latest`] will protect the N latest values of each
118    /// component on each timeline. The only practical guarantee this gives is that a latest-at query
119    /// with a value of max-int will be unchanged. However, latest-at queries from other arbitrary
120    /// points in time may provide different results pre- and post- GC.
121    pub fn gc(
122        &mut self,
123        options: &GarbageCollectionOptions,
124    ) -> (Vec<ChunkStoreEvent>, ChunkStoreStats) {
125        re_tracing::profile_function!();
126
127        self.gc_id += 1;
128
129        let stats_before = self.stats();
130
131        let total_size_bytes_before = stats_before.total().total_size_bytes as f64;
132        let total_num_chunks_before = stats_before.total().num_chunks;
133        let total_num_rows_before = stats_before.total().num_rows;
134
135        let protected_chunk_ids = self.find_all_protected_chunk_ids(options.protect_latest);
136
137        let diffs = match options.target {
138            GarbageCollectionTarget::DropAtLeastFraction(p) => {
139                assert!((0.0..=1.0).contains(&p));
140
141                let num_bytes_to_drop = total_size_bytes_before * p;
142                let target_size_bytes = total_size_bytes_before - num_bytes_to_drop;
143
144                re_log::trace!(
145                    kind = "gc",
146                    id = self.gc_id,
147                    %options.target,
148                    total_num_chunks_before = re_format::format_uint(total_num_chunks_before),
149                    total_num_rows_before = re_format::format_uint(total_num_rows_before),
150                    total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
151                    target_size_bytes = re_format::format_bytes(target_size_bytes),
152                    drop_at_least_num_bytes = re_format::format_bytes(num_bytes_to_drop),
153                    "starting GC"
154                );
155
156                self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_chunk_ids)
157            }
158            GarbageCollectionTarget::Everything => {
159                re_log::trace!(
160                    kind = "gc",
161                    id = self.gc_id,
162                    %options.target,
163                    total_num_rows_before = re_format::format_uint(total_num_rows_before),
164                    total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
165                    "starting GC"
166                );
167
168                self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_chunk_ids)
169            }
170        };
171
172        let stats_after = self.stats();
173        let total_size_bytes_after = stats_after.total().total_size_bytes as f64;
174        let total_num_chunks_after = stats_after.total().num_chunks;
175        let total_num_rows_after = stats_after.total().num_rows;
176
177        re_log::trace!(
178            kind = "gc",
179            id = self.gc_id,
180            %options.target,
181            total_num_chunks_before = re_format::format_uint(total_num_chunks_before),
182            total_num_rows_before = re_format::format_uint(total_num_rows_before),
183            total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
184            total_num_chunks_after = re_format::format_uint(total_num_chunks_after),
185            total_num_rows_after = re_format::format_uint(total_num_rows_after),
186            total_size_bytes_after = re_format::format_bytes(total_size_bytes_after),
187            "GC done"
188        );
189
190        let events = if self.config.enable_changelog {
191            let events: Vec<_> = diffs
192                .into_iter()
193                .map(|diff| ChunkStoreEvent {
194                    store_id: self.id.clone(),
195                    store_generation: self.generation(),
196                    event_id: self
197                        .event_id
198                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
199                    diff,
200                })
201                .collect();
202            {
203                if cfg!(debug_assertions) {
204                    let any_event_other_than_deletion = events
205                        .iter()
206                        .any(|e| e.kind != ChunkStoreDiffKind::Deletion);
207                    assert!(!any_event_other_than_deletion);
208                }
209
210                Self::on_events(&events);
211            }
212
213            events
214        } else {
215            Vec::new()
216        };
217
218        (events, stats_before - stats_after)
219    }
220
221    /// For each `EntityPath`, `Timeline`, `Component` find the N latest [`ChunkId`]s.
222    //
223    // TODO(jleibs): More complex functionality might required expanding this to also
224    // *ignore* specific entities, components, timelines, etc. for this protection.
225    fn find_all_protected_chunk_ids(&self, target_count: usize) -> BTreeSet<ChunkId> {
226        re_tracing::profile_function!();
227
228        if target_count == 0 {
229            return Default::default();
230        }
231
232        self.temporal_chunk_ids_per_entity_per_component
233            .values()
234            .flat_map(|temporal_chunk_ids_per_timeline| {
235                temporal_chunk_ids_per_timeline.iter().flat_map(
236                    |(_timeline, temporal_chunk_ids_per_component)| {
237                        temporal_chunk_ids_per_component.iter().flat_map(
238                            |(_, temporal_chunk_ids_per_time)| {
239                                temporal_chunk_ids_per_time
240                                    .per_start_time
241                                    .last_key_value()
242                                    .map(|(_, chunk_ids)| chunk_ids.iter().copied())
243                                    .into_iter()
244                                    .flatten()
245                                    .chain(
246                                        temporal_chunk_ids_per_time
247                                            .per_end_time
248                                            .last_key_value()
249                                            .map(|(_, chunk_ids)| chunk_ids.iter().copied())
250                                            .into_iter()
251                                            .flatten(),
252                                    )
253                                    .collect::<BTreeSet<_>>()
254                                    .into_iter()
255                                    .rev()
256                                    .take(target_count)
257                            },
258                        )
259                    },
260                )
261            })
262            .collect()
263    }
264
265    fn gc_drop_at_least_num_bytes(
266        &mut self,
267        options: &GarbageCollectionOptions,
268        mut num_bytes_to_drop: f64,
269        protected_chunk_ids: &BTreeSet<ChunkId>,
270    ) -> Vec<ChunkStoreDiff> {
271        re_tracing::profile_function!(re_format::format_bytes(num_bytes_to_drop));
272
273        let mut chunk_ids_to_be_removed =
274            RemovableChunkIdPerTimePerComponentPerTimelinePerEntity::default();
275        let mut chunk_ids_dangling = HashSet::default();
276
277        let start_time = Instant::now();
278
279        {
280            re_tracing::profile_scope!("mark");
281
282            for chunk_id in self
283                .chunk_ids_per_min_row_id
284                .values()
285                .flatten()
286                .filter(|chunk_id| !protected_chunk_ids.contains(chunk_id))
287            {
288                if let Some(chunk) = self.chunks_per_chunk_id.get(chunk_id) {
289                    if options.is_chunk_protected(chunk) {
290                        continue;
291                    }
292
293                    // NOTE: Do _NOT_ use `chunk.total_size_bytes` as it is sitting behind an Arc
294                    // and would count as amortized (i.e. 0 bytes).
295                    num_bytes_to_drop -= <Chunk as SizeBytes>::total_size_bytes(chunk) as f64;
296
297                    // NOTE: We cannot blindly `retain` across all temporal tables, it's way too costly
298                    // and slow. Rather we need to surgically remove the superfluous chunks.
299                    let entity_path = chunk.entity_path();
300                    let per_timeline = chunk_ids_to_be_removed
301                        .entry(entity_path.clone())
302                        .or_default();
303                    for (&timeline, time_column) in chunk.timelines() {
304                        let per_component = per_timeline.entry(timeline).or_default();
305                        for component_name in chunk.component_names() {
306                            let per_time = per_component.entry(component_name).or_default();
307
308                            // NOTE: As usual, these are vectors of `ChunkId`s, as it is legal to
309                            // have perfectly overlapping chunks.
310                            let time_range = time_column.time_range();
311                            per_time
312                                .entry(time_range.min())
313                                .or_default()
314                                .push(chunk.id());
315                            if time_range.min() != time_range.max() {
316                                per_time
317                                    .entry(time_range.max())
318                                    .or_default()
319                                    .push(chunk.id());
320                            }
321                        }
322                    }
323                } else {
324                    chunk_ids_dangling.insert(*chunk_id);
325                }
326
327                // NOTE: There is no point in spending more than a fourth of the time budget on the
328                // mark phase or there is no way the sweep phase will have any time to do anything
329                // with the results anyhow.
330                if start_time.elapsed() >= options.time_budget / 4 || num_bytes_to_drop <= 0.0 {
331                    break;
332                }
333            }
334        }
335
336        {
337            re_tracing::profile_scope!("sweep");
338
339            let Self {
340                id: _,
341                info: _,
342                config: _,
343                time_type_registry: _,
344                type_registry: _,
345                per_column_metadata: _, // column metadata is additive only
346                chunks_per_chunk_id,
347                chunk_ids_per_min_row_id,
348                temporal_chunk_ids_per_entity_per_component,
349                temporal_chunk_ids_per_entity,
350                temporal_chunks_stats: _,
351                static_chunk_ids_per_entity: _, // we don't GC static data
352                static_chunks_stats: _,         // we don't GC static data
353                insert_id: _,
354                gc_id: _,
355                event_id: _,
356            } = self;
357
358            let mut diffs = Vec::new();
359
360            // NOTE: Dangling chunks should never happen: it is the job of the GC to ensure that.
361            //
362            // In release builds, we still want to do the nice thing and clean them up as best as we
363            // can in order to prevent OOMs.
364            //
365            // We should really never be in there, so don't bother accounting that in the time
366            // budget.
367            debug_assert!(
368                chunk_ids_dangling.is_empty(),
369                "detected dangling chunks -- there's a GC bug"
370            );
371            if !chunk_ids_dangling.is_empty() {
372                re_tracing::profile_scope!("dangling");
373
374                chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
375                    chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
376                    !chunk_ids.is_empty()
377                });
378
379                // Component-less indices
380                for temporal_chunk_ids_per_timeline in temporal_chunk_ids_per_entity.values_mut() {
381                    for temporal_chunk_ids_per_time in temporal_chunk_ids_per_timeline.values_mut()
382                    {
383                        let ChunkIdSetPerTime {
384                            max_interval_length: _,
385                            per_start_time,
386                            per_end_time,
387                        } = temporal_chunk_ids_per_time;
388
389                        // TODO(cmc): Technically, the optimal thing to do would be to
390                        // recompute `max_interval_length` per time here.
391                        // In practice, this adds a lot of complexity for likely very little
392                        // performance benefit, since we expect the chunks to have similar
393                        // interval lengths on the happy path.
394
395                        for chunk_ids in per_start_time.values_mut() {
396                            chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
397                        }
398                        for chunk_ids in per_end_time.values_mut() {
399                            chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
400                        }
401                    }
402                }
403
404                // Per-component indices
405                for temporal_chunk_ids_per_component in
406                    temporal_chunk_ids_per_entity_per_component.values_mut()
407                {
408                    for temporal_chunk_ids_per_timeline in
409                        temporal_chunk_ids_per_component.values_mut()
410                    {
411                        for temporal_chunk_ids_per_time in
412                            temporal_chunk_ids_per_timeline.values_mut()
413                        {
414                            let ChunkIdSetPerTime {
415                                max_interval_length: _,
416                                per_start_time,
417                                per_end_time,
418                            } = temporal_chunk_ids_per_time;
419
420                            // TODO(cmc): Technically, the optimal thing to do would be to
421                            // recompute `max_interval_length` per time here.
422                            // In practice, this adds a lot of complexity for likely very little
423                            // performance benefit, since we expect the chunks to have similar
424                            // interval lengths on the happy path.
425
426                            for chunk_ids in per_start_time.values_mut() {
427                                chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
428                            }
429                            for chunk_ids in per_end_time.values_mut() {
430                                chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
431                            }
432                        }
433                    }
434                }
435
436                diffs.extend(
437                    chunk_ids_dangling
438                        .into_iter()
439                        .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
440                        .map(ChunkStoreDiff::deletion),
441                );
442            }
443
444            if !chunk_ids_to_be_removed.is_empty() {
445                diffs.extend(self.remove_chunks(
446                    chunk_ids_to_be_removed,
447                    Some((start_time, options.time_budget)),
448                ));
449            }
450
451            diffs
452        }
453    }
454
455    /// Surgically removes a _temporal_ [`ChunkId`] from all indices.
456    ///
457    /// This is orders of magnitude faster than trying to `retain()` on all our internal indices.
458    ///
459    /// See also [`ChunkStore::remove_chunks`].
460    pub(crate) fn remove_chunk(&mut self, chunk_id: ChunkId) -> Vec<ChunkStoreDiff> {
461        let Some(chunk) = self.chunks_per_chunk_id.get(&chunk_id) else {
462            return Vec::new();
463        };
464
465        let mut chunk_ids_to_be_removed =
466            RemovableChunkIdPerTimePerComponentPerTimelinePerEntity::default();
467
468        {
469            let chunk_ids_to_be_removed = chunk_ids_to_be_removed
470                .entry(chunk.entity_path().clone())
471                .or_default();
472
473            for (timeline, time_range_per_component) in chunk.time_range_per_component() {
474                let chunk_ids_to_be_removed = chunk_ids_to_be_removed.entry(timeline).or_default();
475
476                for (component_name, per_desc) in time_range_per_component {
477                    for (_component_desc, time_range) in per_desc {
478                        let chunk_ids_to_be_removed =
479                            chunk_ids_to_be_removed.entry(component_name).or_default();
480
481                        chunk_ids_to_be_removed
482                            .entry(time_range.min())
483                            .or_default()
484                            .push(chunk.id());
485                        chunk_ids_to_be_removed
486                            .entry(time_range.max())
487                            .or_default()
488                            .push(chunk.id());
489                    }
490                }
491            }
492        }
493
494        self.remove_chunks(chunk_ids_to_be_removed, None)
495    }
496
497    /// Surgically removes a set of _temporal_ [`ChunkId`]s from all indices.
498    ///
499    /// This is orders of magnitude faster than trying to `retain()` on all our internal indices,
500    /// when you already know where these chunks live.
501    ///
502    /// See also [`ChunkStore::remove_chunk`].
503    pub(crate) fn remove_chunks(
504        &mut self,
505        chunk_ids_to_be_removed: RemovableChunkIdPerTimePerComponentPerTimelinePerEntity,
506        time_budget: Option<(Instant, Duration)>,
507    ) -> Vec<ChunkStoreDiff> {
508        re_tracing::profile_function!();
509
510        // NOTE: We cannot blindly `retain` across all temporal tables, it's way too costly
511        // and slow. Rather we need to surgically remove the superfluous chunks.
512
513        let mut chunk_ids_removed = HashSet::default();
514
515        // Because we have both a per-component and a component-less index that refer to the same
516        // chunks, we must make sure that they get garbage collected in sync.
517        // That implies making sure that we don't run out of time budget after we've GC'd one but
518        // before we had time to clean the other.
519
520        for (entity_path, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
521            let HashMapEntry::Occupied(mut temporal_chunk_ids_per_timeline) = self
522                .temporal_chunk_ids_per_entity_per_component
523                .entry(entity_path.clone())
524            else {
525                continue;
526            };
527
528            let HashMapEntry::Occupied(mut temporal_chunk_ids_per_timeline_componentless) =
529                self.temporal_chunk_ids_per_entity.entry(entity_path)
530            else {
531                continue;
532            };
533
534            for (timeline, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
535                // Component-less indices
536                {
537                    let HashMapEntry::Occupied(mut temporal_chunk_ids_per_time_componentless) =
538                        temporal_chunk_ids_per_timeline_componentless
539                            .get_mut()
540                            .entry(timeline)
541                    else {
542                        continue;
543                    };
544
545                    let ChunkIdSetPerTime {
546                        max_interval_length: _,
547                        per_start_time,
548                        per_end_time,
549                    } = temporal_chunk_ids_per_time_componentless.get_mut();
550
551                    // TODO(cmc): Technically, the optimal thing to do would be to
552                    // recompute `max_interval_length` per time here.
553                    // In practice, this adds a lot of complexity for likely very little
554                    // performance benefit, since we expect the chunks to have similar
555                    // interval lengths on the happy path.
556
557                    for chunk_ids_to_be_removed in chunk_ids_to_be_removed.values() {
558                        for (&time, chunk_ids) in chunk_ids_to_be_removed {
559                            if let BTreeMapEntry::Occupied(mut chunk_id_set) =
560                                per_start_time.entry(time)
561                            {
562                                for chunk_id in chunk_ids {
563                                    chunk_id_set.get_mut().remove(chunk_id);
564                                }
565                                if chunk_id_set.get().is_empty() {
566                                    chunk_id_set.remove_entry();
567                                }
568                            }
569
570                            if let BTreeMapEntry::Occupied(mut chunk_id_set) =
571                                per_end_time.entry(time)
572                            {
573                                for chunk_id in chunk_ids {
574                                    chunk_id_set.get_mut().remove(chunk_id);
575                                }
576                                if chunk_id_set.get().is_empty() {
577                                    chunk_id_set.remove_entry();
578                                }
579                            }
580
581                            chunk_ids_removed.extend(chunk_ids);
582                        }
583
584                        if let Some((start_time, time_budget)) = time_budget {
585                            if start_time.elapsed() >= time_budget {
586                                break;
587                            }
588                        }
589                    }
590
591                    if per_start_time.is_empty() && per_end_time.is_empty() {
592                        temporal_chunk_ids_per_time_componentless.remove_entry();
593                    }
594                }
595
596                // Per-component indices
597                //
598                // NOTE: This must go all the way, no matter the time budget left. Otherwise the
599                // component-less and per-component indices would go out of sync.
600
601                let HashMapEntry::Occupied(mut temporal_chunk_ids_per_component) =
602                    temporal_chunk_ids_per_timeline.get_mut().entry(timeline)
603                else {
604                    continue;
605                };
606
607                for (component_name, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
608                    let HashMapEntry::Occupied(mut temporal_chunk_ids_per_time) =
609                        temporal_chunk_ids_per_component
610                            .get_mut()
611                            .entry(component_name)
612                    else {
613                        continue;
614                    };
615
616                    let ChunkIdSetPerTime {
617                        max_interval_length: _,
618                        per_start_time,
619                        per_end_time,
620                    } = temporal_chunk_ids_per_time.get_mut();
621
622                    // TODO(cmc): Technically, the optimal thing to do would be to
623                    // recompute `max_interval_length` per time here.
624                    // In practice, this adds a lot of complexity for likely very little
625                    // performance benefit, since we expect the chunks to have similar
626                    // interval lengths on the happy path.
627
628                    for (time, chunk_ids) in chunk_ids_to_be_removed {
629                        if let BTreeMapEntry::Occupied(mut chunk_id_set) =
630                            per_start_time.entry(time)
631                        {
632                            for chunk_id in chunk_ids
633                                .iter()
634                                .filter(|chunk_id| chunk_ids_removed.contains(*chunk_id))
635                            {
636                                chunk_id_set.get_mut().remove(chunk_id);
637                            }
638                            if chunk_id_set.get().is_empty() {
639                                chunk_id_set.remove_entry();
640                            }
641                        }
642
643                        if let BTreeMapEntry::Occupied(mut chunk_id_set) = per_end_time.entry(time)
644                        {
645                            for chunk_id in chunk_ids
646                                .iter()
647                                .filter(|chunk_id| chunk_ids_removed.contains(*chunk_id))
648                            {
649                                chunk_id_set.get_mut().remove(chunk_id);
650                            }
651                            if chunk_id_set.get().is_empty() {
652                                chunk_id_set.remove_entry();
653                            }
654                        }
655                    }
656
657                    if per_start_time.is_empty() && per_end_time.is_empty() {
658                        temporal_chunk_ids_per_time.remove_entry();
659                    }
660                }
661
662                if temporal_chunk_ids_per_component.get().is_empty() {
663                    temporal_chunk_ids_per_component.remove_entry();
664                }
665            }
666
667            if temporal_chunk_ids_per_timeline.get().is_empty() {
668                temporal_chunk_ids_per_timeline.remove_entry();
669            }
670
671            if temporal_chunk_ids_per_timeline_componentless
672                .get()
673                .is_empty()
674            {
675                temporal_chunk_ids_per_timeline_componentless.remove_entry();
676            }
677        }
678
679        self.chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
680            chunk_ids.retain(|chunk_id| !chunk_ids_removed.contains(chunk_id));
681            !chunk_ids.is_empty()
682        });
683
684        chunk_ids_removed
685            .into_iter()
686            .filter_map(|chunk_id| self.chunks_per_chunk_id.remove(&chunk_id))
687            .inspect(|chunk| {
688                self.temporal_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
689            })
690            .map(ChunkStoreDiff::deletion)
691            .collect()
692    }
693}