1use std::{
2 collections::{BTreeSet, btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry},
3 time::Duration,
4};
5
6use ahash::{HashMap, HashSet};
7use nohash_hasher::IntMap;
8use re_byte_size::SizeBytes;
9use web_time::Instant;
10
11use re_chunk::{Chunk, ChunkId, ComponentIdentifier, TimelineName};
12use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt};
13
14use crate::{
15 ChunkStore, ChunkStoreChunkStats, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent,
16 ChunkStoreStats, store::ChunkIdSetPerTime,
17};
18
19#[expect(unused_imports)]
21use crate::RowId;
22
23#[derive(Debug, Clone, Copy)]
26pub enum GarbageCollectionTarget {
27 DropAtLeastFraction(f64),
31
32 Everything,
34}
35
36#[derive(Debug, Clone)]
37pub struct GarbageCollectionOptions {
38 pub target: GarbageCollectionTarget,
40
41 pub time_budget: Duration,
51
52 pub protect_latest: usize,
54
55 pub protected_time_ranges: IntMap<TimelineName, AbsoluteTimeRange>,
57}
58
59impl GarbageCollectionOptions {
60 pub fn gc_everything() -> Self {
61 Self {
62 target: GarbageCollectionTarget::Everything,
63 time_budget: std::time::Duration::MAX,
64 protect_latest: 0,
65 protected_time_ranges: Default::default(),
66 }
67 }
68
69 pub fn is_chunk_protected(&self, chunk: &Chunk) -> bool {
71 for (timeline, protected_time_range) in &self.protected_time_ranges {
72 if let Some(time_column) = chunk.timelines().get(timeline)
73 && time_column.time_range().intersects(*protected_time_range)
74 {
75 return true;
76 }
77 }
78 false
79 }
80}
81
82impl std::fmt::Display for GarbageCollectionTarget {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 Self::DropAtLeastFraction(p) => {
86 write!(f, "DropAtLeast({:.3}%)", *p * 100.0)
87 }
88 Self::Everything => write!(f, "Everything"),
89 }
90 }
91}
92
93pub type RemovableChunkIdPerTimePerComponentPerTimelinePerEntity = IntMap<
94 EntityPath,
95 IntMap<TimelineName, IntMap<ComponentIdentifier, HashMap<TimeInt, Vec<ChunkId>>>>,
96>;
97
98impl ChunkStore {
99 pub fn gc(
123 &mut self,
124 options: &GarbageCollectionOptions,
125 ) -> (Vec<ChunkStoreEvent>, ChunkStoreStats) {
126 re_tracing::profile_function!();
127
128 self.gc_id += 1;
129
130 let stats_before = self.stats();
131
132 let total_size_bytes_before = stats_before.total().total_size_bytes as f64;
133 let total_num_chunks_before = stats_before.total().num_chunks;
134 let total_num_rows_before = stats_before.total().num_rows;
135
136 let protected_chunk_ids = self.find_all_protected_chunk_ids(options.protect_latest);
137
138 let diffs = match options.target {
139 GarbageCollectionTarget::DropAtLeastFraction(p) => {
140 assert!((0.0..=1.0).contains(&p));
141
142 let num_bytes_to_drop = total_size_bytes_before * p;
143 let target_size_bytes = total_size_bytes_before - num_bytes_to_drop;
144
145 re_log::trace!(
146 kind = "gc",
147 id = self.gc_id,
148 %options.target,
149 total_num_chunks_before = re_format::format_uint(total_num_chunks_before),
150 total_num_rows_before = re_format::format_uint(total_num_rows_before),
151 total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
152 target_size_bytes = re_format::format_bytes(target_size_bytes),
153 drop_at_least_num_bytes = re_format::format_bytes(num_bytes_to_drop),
154 "starting GC"
155 );
156
157 self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_chunk_ids)
158 }
159 GarbageCollectionTarget::Everything => {
160 re_log::trace!(
161 kind = "gc",
162 id = self.gc_id,
163 %options.target,
164 total_num_rows_before = re_format::format_uint(total_num_rows_before),
165 total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
166 "starting GC"
167 );
168
169 self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_chunk_ids)
170 }
171 };
172
173 let stats_after = self.stats();
174 let total_size_bytes_after = stats_after.total().total_size_bytes as f64;
175 let total_num_chunks_after = stats_after.total().num_chunks;
176 let total_num_rows_after = stats_after.total().num_rows;
177
178 re_log::trace!(
179 kind = "gc",
180 id = self.gc_id,
181 %options.target,
182 total_num_chunks_before = re_format::format_uint(total_num_chunks_before),
183 total_num_rows_before = re_format::format_uint(total_num_rows_before),
184 total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
185 total_num_chunks_after = re_format::format_uint(total_num_chunks_after),
186 total_num_rows_after = re_format::format_uint(total_num_rows_after),
187 total_size_bytes_after = re_format::format_bytes(total_size_bytes_after),
188 "GC done"
189 );
190
191 let events = if self.config.enable_changelog {
192 let events: Vec<_> = diffs
193 .into_iter()
194 .map(|diff| ChunkStoreEvent {
195 store_id: self.id.clone(),
196 store_generation: self.generation(),
197 event_id: self
198 .event_id
199 .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
200 diff,
201 })
202 .collect();
203 {
204 if cfg!(debug_assertions) {
205 let any_event_other_than_deletion = events
206 .iter()
207 .any(|e| e.kind != ChunkStoreDiffKind::Deletion);
208 assert!(!any_event_other_than_deletion);
209 }
210
211 Self::on_events(&events);
212 }
213
214 events
215 } else {
216 Vec::new()
217 };
218
219 (events, stats_before - stats_after)
220 }
221
222 fn find_all_protected_chunk_ids(&self, target_count: usize) -> BTreeSet<ChunkId> {
227 re_tracing::profile_function!();
228
229 if target_count == 0 {
230 return Default::default();
231 }
232
233 self.temporal_chunk_ids_per_entity_per_component
234 .values()
235 .flat_map(|temporal_chunk_ids_per_timeline| {
236 temporal_chunk_ids_per_timeline.iter().flat_map(
237 |(_timeline, temporal_chunk_ids_per_component)| {
238 temporal_chunk_ids_per_component.iter().flat_map(
239 |(_, temporal_chunk_ids_per_time)| {
240 temporal_chunk_ids_per_time
241 .per_start_time
242 .last_key_value()
243 .map(|(_, chunk_ids)| chunk_ids.iter().copied())
244 .into_iter()
245 .flatten()
246 .chain(
247 temporal_chunk_ids_per_time
248 .per_end_time
249 .last_key_value()
250 .map(|(_, chunk_ids)| chunk_ids.iter().copied())
251 .into_iter()
252 .flatten(),
253 )
254 .collect::<BTreeSet<_>>()
255 .into_iter()
256 .rev()
257 .take(target_count)
258 },
259 )
260 },
261 )
262 })
263 .collect()
264 }
265
266 fn gc_drop_at_least_num_bytes(
267 &mut self,
268 options: &GarbageCollectionOptions,
269 mut num_bytes_to_drop: f64,
270 protected_chunk_ids: &BTreeSet<ChunkId>,
271 ) -> Vec<ChunkStoreDiff> {
272 re_tracing::profile_function!(re_format::format_bytes(num_bytes_to_drop));
273
274 let mut chunk_ids_to_be_removed =
275 RemovableChunkIdPerTimePerComponentPerTimelinePerEntity::default();
276 let mut chunk_ids_dangling = HashSet::default();
277
278 let start_time = Instant::now();
279
280 {
281 re_tracing::profile_scope!("mark");
282
283 for chunk_id in self
284 .chunk_ids_per_min_row_id
285 .values()
286 .filter(|chunk_id| !protected_chunk_ids.contains(chunk_id))
287 {
288 if let Some(chunk) = self.chunks_per_chunk_id.get(chunk_id) {
289 if options.is_chunk_protected(chunk) {
290 continue;
291 }
292
293 num_bytes_to_drop -= <Chunk as SizeBytes>::total_size_bytes(chunk) as f64;
296
297 let entity_path = chunk.entity_path();
300 let per_timeline = chunk_ids_to_be_removed
301 .entry(entity_path.clone())
302 .or_default();
303 for (&timeline, time_column) in chunk.timelines() {
304 let per_component = per_timeline.entry(timeline).or_default();
305 for component in chunk.components_identifiers() {
306 let per_time = per_component.entry(component).or_default();
307
308 let time_range = time_column.time_range();
311 per_time
312 .entry(time_range.min())
313 .or_default()
314 .push(chunk.id());
315 if time_range.min() != time_range.max() {
316 per_time
317 .entry(time_range.max())
318 .or_default()
319 .push(chunk.id());
320 }
321 }
322 }
323 } else {
324 chunk_ids_dangling.insert(*chunk_id);
325 }
326
327 if start_time.elapsed() >= options.time_budget / 4 || num_bytes_to_drop <= 0.0 {
331 break;
332 }
333 }
334 }
335
336 {
337 re_tracing::profile_scope!("sweep");
338
339 let Self {
340 id: _,
341 config: _,
342 time_type_registry: _,
343 type_registry: _,
344 per_column_metadata: _, chunks_per_chunk_id,
346 chunk_ids_per_min_row_id,
347 temporal_chunk_ids_per_entity_per_component,
348 temporal_chunk_ids_per_entity,
349 temporal_chunks_stats: _,
350 static_chunk_ids_per_entity: _, static_chunks_stats: _, insert_id: _,
353 gc_id: _,
354 event_id: _,
355 } = self;
356
357 let mut diffs = Vec::new();
358
359 debug_assert!(
367 chunk_ids_dangling.is_empty(),
368 "detected dangling chunks -- there's a GC bug"
369 );
370 if !chunk_ids_dangling.is_empty() {
371 re_tracing::profile_scope!("dangling");
372
373 chunk_ids_per_min_row_id
374 .retain(|_row_id, chunk_id| !chunk_ids_dangling.contains(chunk_id));
375
376 for temporal_chunk_ids_per_timeline in temporal_chunk_ids_per_entity.values_mut() {
378 for temporal_chunk_ids_per_time in temporal_chunk_ids_per_timeline.values_mut()
379 {
380 let ChunkIdSetPerTime {
381 max_interval_length: _,
382 per_start_time,
383 per_end_time,
384 } = temporal_chunk_ids_per_time;
385
386 for chunk_ids in per_start_time.values_mut() {
393 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
394 }
395 for chunk_ids in per_end_time.values_mut() {
396 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
397 }
398 }
399 }
400
401 for temporal_chunk_ids_per_component in
403 temporal_chunk_ids_per_entity_per_component.values_mut()
404 {
405 for temporal_chunk_ids_per_timeline in
406 temporal_chunk_ids_per_component.values_mut()
407 {
408 for temporal_chunk_ids_per_time in
409 temporal_chunk_ids_per_timeline.values_mut()
410 {
411 let ChunkIdSetPerTime {
412 max_interval_length: _,
413 per_start_time,
414 per_end_time,
415 } = temporal_chunk_ids_per_time;
416
417 for chunk_ids in per_start_time.values_mut() {
424 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
425 }
426 for chunk_ids in per_end_time.values_mut() {
427 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
428 }
429 }
430 }
431 }
432
433 diffs.extend(
434 chunk_ids_dangling
435 .into_iter()
436 .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
437 .map(ChunkStoreDiff::deletion),
438 );
439 }
440
441 if !chunk_ids_to_be_removed.is_empty() {
442 diffs.extend(self.remove_chunks(
443 chunk_ids_to_be_removed,
444 Some((start_time, options.time_budget)),
445 ));
446 }
447
448 diffs
449 }
450 }
451
452 pub(crate) fn remove_chunk(&mut self, chunk_id: ChunkId) -> Vec<ChunkStoreDiff> {
458 re_tracing::profile_function!();
459
460 let Some(chunk) = self.chunks_per_chunk_id.get(&chunk_id) else {
461 return Vec::new();
462 };
463
464 let mut chunk_ids_to_be_removed =
465 RemovableChunkIdPerTimePerComponentPerTimelinePerEntity::default();
466
467 {
468 let chunk_ids_to_be_removed = chunk_ids_to_be_removed
469 .entry(chunk.entity_path().clone())
470 .or_default();
471
472 for (timeline, time_range_per_component) in chunk.time_range_per_component() {
473 let chunk_ids_to_be_removed = chunk_ids_to_be_removed.entry(timeline).or_default();
474
475 for (component, time_range) in time_range_per_component {
476 let chunk_ids_to_be_removed =
477 chunk_ids_to_be_removed.entry(component).or_default();
478
479 chunk_ids_to_be_removed
480 .entry(time_range.min())
481 .or_default()
482 .push(chunk.id());
483 chunk_ids_to_be_removed
484 .entry(time_range.max())
485 .or_default()
486 .push(chunk.id());
487 }
488 }
489 }
490
491 self.remove_chunks(chunk_ids_to_be_removed, None)
492 }
493
494 pub(crate) fn remove_chunks(
501 &mut self,
502 chunk_ids_to_be_removed: RemovableChunkIdPerTimePerComponentPerTimelinePerEntity,
503 time_budget: Option<(Instant, Duration)>,
504 ) -> Vec<ChunkStoreDiff> {
505 re_tracing::profile_function!();
506
507 let mut chunk_ids_removed = HashSet::default();
511
512 for (entity_path, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
518 re_tracing::profile_scope!("chunk-id");
519
520 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_timeline) = self
521 .temporal_chunk_ids_per_entity_per_component
522 .entry(entity_path.clone())
523 else {
524 continue;
525 };
526
527 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_timeline_componentless) =
528 self.temporal_chunk_ids_per_entity.entry(entity_path)
529 else {
530 continue;
531 };
532
533 for (timeline, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
534 re_tracing::profile_scope!("timeline");
535 {
537 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_time_componentless) =
538 temporal_chunk_ids_per_timeline_componentless
539 .get_mut()
540 .entry(timeline)
541 else {
542 continue;
543 };
544
545 let ChunkIdSetPerTime {
546 max_interval_length: _,
547 per_start_time,
548 per_end_time,
549 } = temporal_chunk_ids_per_time_componentless.get_mut();
550
551 for chunk_ids_to_be_removed in chunk_ids_to_be_removed.values() {
558 for (&time, chunk_ids) in chunk_ids_to_be_removed {
559 if let BTreeMapEntry::Occupied(mut chunk_id_set) =
560 per_start_time.entry(time)
561 {
562 for chunk_id in chunk_ids {
563 chunk_id_set.get_mut().remove(chunk_id);
564 }
565 if chunk_id_set.get().is_empty() {
566 chunk_id_set.remove_entry();
567 }
568 }
569
570 if let BTreeMapEntry::Occupied(mut chunk_id_set) =
571 per_end_time.entry(time)
572 {
573 for chunk_id in chunk_ids {
574 chunk_id_set.get_mut().remove(chunk_id);
575 }
576 if chunk_id_set.get().is_empty() {
577 chunk_id_set.remove_entry();
578 }
579 }
580
581 chunk_ids_removed.extend(chunk_ids);
582 }
583
584 if let Some((start_time, time_budget)) = time_budget
585 && start_time.elapsed() >= time_budget
586 {
587 break;
588 }
589 }
590
591 if per_start_time.is_empty() && per_end_time.is_empty() {
592 temporal_chunk_ids_per_time_componentless.remove_entry();
593 }
594 }
595
596 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_component) =
602 temporal_chunk_ids_per_timeline.get_mut().entry(timeline)
603 else {
604 continue;
605 };
606
607 for (component_descr, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
608 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_time) =
609 temporal_chunk_ids_per_component
610 .get_mut()
611 .entry(component_descr)
612 else {
613 continue;
614 };
615
616 let ChunkIdSetPerTime {
617 max_interval_length: _,
618 per_start_time,
619 per_end_time,
620 } = temporal_chunk_ids_per_time.get_mut();
621
622 for (time, chunk_ids) in chunk_ids_to_be_removed {
629 if let BTreeMapEntry::Occupied(mut chunk_id_set) =
630 per_start_time.entry(time)
631 {
632 for chunk_id in chunk_ids
633 .iter()
634 .filter(|chunk_id| chunk_ids_removed.contains(*chunk_id))
635 {
636 chunk_id_set.get_mut().remove(chunk_id);
637 }
638 if chunk_id_set.get().is_empty() {
639 chunk_id_set.remove_entry();
640 }
641 }
642
643 if let BTreeMapEntry::Occupied(mut chunk_id_set) = per_end_time.entry(time)
644 {
645 for chunk_id in chunk_ids
646 .iter()
647 .filter(|chunk_id| chunk_ids_removed.contains(*chunk_id))
648 {
649 chunk_id_set.get_mut().remove(chunk_id);
650 }
651 if chunk_id_set.get().is_empty() {
652 chunk_id_set.remove_entry();
653 }
654 }
655 }
656
657 if per_start_time.is_empty() && per_end_time.is_empty() {
658 temporal_chunk_ids_per_time.remove_entry();
659 }
660 }
661
662 if temporal_chunk_ids_per_component.get().is_empty() {
663 temporal_chunk_ids_per_component.remove_entry();
664 }
665 }
666
667 if temporal_chunk_ids_per_timeline.get().is_empty() {
668 temporal_chunk_ids_per_timeline.remove_entry();
669 }
670
671 if temporal_chunk_ids_per_timeline_componentless
672 .get()
673 .is_empty()
674 {
675 temporal_chunk_ids_per_timeline_componentless.remove_entry();
676 }
677 }
678
679 {
680 let min_row_ids_removed = chunk_ids_removed.iter().filter_map(|chunk_id| {
681 let chunk = self.chunks_per_chunk_id.get(chunk_id)?;
682 chunk.row_id_range().map(|(min, _)| min)
683 });
684 for row_id in min_row_ids_removed {
685 if self.chunk_ids_per_min_row_id.remove(&row_id).is_none() {
686 re_log::warn!(
687 %row_id,
688 "Row ID marked for removal was not found, there's bug in the Chunk Store"
689 );
690 }
691 }
692 }
693
694 {
695 re_tracing::profile_scope!("last collect");
696 chunk_ids_removed
697 .into_iter()
698 .filter_map(|chunk_id| self.chunks_per_chunk_id.remove(&chunk_id))
699 .inspect(|chunk| {
700 self.temporal_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
701 })
702 .map(ChunkStoreDiff::deletion)
703 .collect()
704 }
705 }
706}