1use std::{
2 collections::{btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeSet},
3 time::Duration,
4};
5
6use ahash::{HashMap, HashSet};
7use nohash_hasher::IntMap;
8use re_byte_size::SizeBytes;
9use web_time::Instant;
10
11use re_chunk::{Chunk, ChunkId, TimelineName};
12use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt};
13use re_types_core::ComponentName;
14
15use crate::{
16 store::ChunkIdSetPerTime, ChunkStore, ChunkStoreChunkStats, ChunkStoreDiff, ChunkStoreDiffKind,
17 ChunkStoreEvent, ChunkStoreStats,
18};
19
20#[allow(unused_imports)]
22use crate::RowId;
23
24#[derive(Debug, Clone, Copy)]
27pub enum GarbageCollectionTarget {
28 DropAtLeastFraction(f64),
32
33 Everything,
35}
36
37#[derive(Debug, Clone)]
38pub struct GarbageCollectionOptions {
39 pub target: GarbageCollectionTarget,
41
42 pub time_budget: Duration,
52
53 pub protect_latest: usize,
55
56 pub protected_time_ranges: IntMap<TimelineName, ResolvedTimeRange>,
58}
59
60impl GarbageCollectionOptions {
61 pub fn gc_everything() -> Self {
62 Self {
63 target: GarbageCollectionTarget::Everything,
64 time_budget: std::time::Duration::MAX,
65 protect_latest: 0,
66 protected_time_ranges: Default::default(),
67 }
68 }
69
70 pub fn is_chunk_protected(&self, chunk: &Chunk) -> bool {
72 for (timeline, protected_time_range) in &self.protected_time_ranges {
73 if let Some(time_column) = chunk.timelines().get(timeline) {
74 if time_column.time_range().intersects(*protected_time_range) {
75 return true;
76 }
77 }
78 }
79 false
80 }
81}
82
83impl std::fmt::Display for GarbageCollectionTarget {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 match self {
86 Self::DropAtLeastFraction(p) => {
87 write!(f, "DropAtLeast({:.3}%)", *p * 100.0)
88 }
89 Self::Everything => write!(f, "Everything"),
90 }
91 }
92}
93
94pub type RemovableChunkIdPerTimePerComponentPerTimelinePerEntity =
95 IntMap<EntityPath, IntMap<TimelineName, IntMap<ComponentName, HashMap<TimeInt, Vec<ChunkId>>>>>;
96
97impl ChunkStore {
98 pub fn gc(
122 &mut self,
123 options: &GarbageCollectionOptions,
124 ) -> (Vec<ChunkStoreEvent>, ChunkStoreStats) {
125 re_tracing::profile_function!();
126
127 self.gc_id += 1;
128
129 let stats_before = self.stats();
130
131 let total_size_bytes_before = stats_before.total().total_size_bytes as f64;
132 let total_num_chunks_before = stats_before.total().num_chunks;
133 let total_num_rows_before = stats_before.total().num_rows;
134
135 let protected_chunk_ids = self.find_all_protected_chunk_ids(options.protect_latest);
136
137 let diffs = match options.target {
138 GarbageCollectionTarget::DropAtLeastFraction(p) => {
139 assert!((0.0..=1.0).contains(&p));
140
141 let num_bytes_to_drop = total_size_bytes_before * p;
142 let target_size_bytes = total_size_bytes_before - num_bytes_to_drop;
143
144 re_log::trace!(
145 kind = "gc",
146 id = self.gc_id,
147 %options.target,
148 total_num_chunks_before = re_format::format_uint(total_num_chunks_before),
149 total_num_rows_before = re_format::format_uint(total_num_rows_before),
150 total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
151 target_size_bytes = re_format::format_bytes(target_size_bytes),
152 drop_at_least_num_bytes = re_format::format_bytes(num_bytes_to_drop),
153 "starting GC"
154 );
155
156 self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_chunk_ids)
157 }
158 GarbageCollectionTarget::Everything => {
159 re_log::trace!(
160 kind = "gc",
161 id = self.gc_id,
162 %options.target,
163 total_num_rows_before = re_format::format_uint(total_num_rows_before),
164 total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
165 "starting GC"
166 );
167
168 self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_chunk_ids)
169 }
170 };
171
172 let stats_after = self.stats();
173 let total_size_bytes_after = stats_after.total().total_size_bytes as f64;
174 let total_num_chunks_after = stats_after.total().num_chunks;
175 let total_num_rows_after = stats_after.total().num_rows;
176
177 re_log::trace!(
178 kind = "gc",
179 id = self.gc_id,
180 %options.target,
181 total_num_chunks_before = re_format::format_uint(total_num_chunks_before),
182 total_num_rows_before = re_format::format_uint(total_num_rows_before),
183 total_size_bytes_before = re_format::format_bytes(total_size_bytes_before),
184 total_num_chunks_after = re_format::format_uint(total_num_chunks_after),
185 total_num_rows_after = re_format::format_uint(total_num_rows_after),
186 total_size_bytes_after = re_format::format_bytes(total_size_bytes_after),
187 "GC done"
188 );
189
190 let events = if self.config.enable_changelog {
191 let events: Vec<_> = diffs
192 .into_iter()
193 .map(|diff| ChunkStoreEvent {
194 store_id: self.id.clone(),
195 store_generation: self.generation(),
196 event_id: self
197 .event_id
198 .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
199 diff,
200 })
201 .collect();
202 {
203 if cfg!(debug_assertions) {
204 let any_event_other_than_deletion = events
205 .iter()
206 .any(|e| e.kind != ChunkStoreDiffKind::Deletion);
207 assert!(!any_event_other_than_deletion);
208 }
209
210 Self::on_events(&events);
211 }
212
213 events
214 } else {
215 Vec::new()
216 };
217
218 (events, stats_before - stats_after)
219 }
220
221 fn find_all_protected_chunk_ids(&self, target_count: usize) -> BTreeSet<ChunkId> {
226 re_tracing::profile_function!();
227
228 if target_count == 0 {
229 return Default::default();
230 }
231
232 self.temporal_chunk_ids_per_entity_per_component
233 .values()
234 .flat_map(|temporal_chunk_ids_per_timeline| {
235 temporal_chunk_ids_per_timeline.iter().flat_map(
236 |(_timeline, temporal_chunk_ids_per_component)| {
237 temporal_chunk_ids_per_component.iter().flat_map(
238 |(_, temporal_chunk_ids_per_time)| {
239 temporal_chunk_ids_per_time
240 .per_start_time
241 .last_key_value()
242 .map(|(_, chunk_ids)| chunk_ids.iter().copied())
243 .into_iter()
244 .flatten()
245 .chain(
246 temporal_chunk_ids_per_time
247 .per_end_time
248 .last_key_value()
249 .map(|(_, chunk_ids)| chunk_ids.iter().copied())
250 .into_iter()
251 .flatten(),
252 )
253 .collect::<BTreeSet<_>>()
254 .into_iter()
255 .rev()
256 .take(target_count)
257 },
258 )
259 },
260 )
261 })
262 .collect()
263 }
264
265 fn gc_drop_at_least_num_bytes(
266 &mut self,
267 options: &GarbageCollectionOptions,
268 mut num_bytes_to_drop: f64,
269 protected_chunk_ids: &BTreeSet<ChunkId>,
270 ) -> Vec<ChunkStoreDiff> {
271 re_tracing::profile_function!(re_format::format_bytes(num_bytes_to_drop));
272
273 let mut chunk_ids_to_be_removed =
274 RemovableChunkIdPerTimePerComponentPerTimelinePerEntity::default();
275 let mut chunk_ids_dangling = HashSet::default();
276
277 let start_time = Instant::now();
278
279 {
280 re_tracing::profile_scope!("mark");
281
282 for chunk_id in self
283 .chunk_ids_per_min_row_id
284 .values()
285 .flatten()
286 .filter(|chunk_id| !protected_chunk_ids.contains(chunk_id))
287 {
288 if let Some(chunk) = self.chunks_per_chunk_id.get(chunk_id) {
289 if options.is_chunk_protected(chunk) {
290 continue;
291 }
292
293 num_bytes_to_drop -= <Chunk as SizeBytes>::total_size_bytes(chunk) as f64;
296
297 let entity_path = chunk.entity_path();
300 let per_timeline = chunk_ids_to_be_removed
301 .entry(entity_path.clone())
302 .or_default();
303 for (&timeline, time_column) in chunk.timelines() {
304 let per_component = per_timeline.entry(timeline).or_default();
305 for component_name in chunk.component_names() {
306 let per_time = per_component.entry(component_name).or_default();
307
308 let time_range = time_column.time_range();
311 per_time
312 .entry(time_range.min())
313 .or_default()
314 .push(chunk.id());
315 if time_range.min() != time_range.max() {
316 per_time
317 .entry(time_range.max())
318 .or_default()
319 .push(chunk.id());
320 }
321 }
322 }
323 } else {
324 chunk_ids_dangling.insert(*chunk_id);
325 }
326
327 if start_time.elapsed() >= options.time_budget / 4 || num_bytes_to_drop <= 0.0 {
331 break;
332 }
333 }
334 }
335
336 {
337 re_tracing::profile_scope!("sweep");
338
339 let Self {
340 id: _,
341 info: _,
342 config: _,
343 time_type_registry: _,
344 type_registry: _,
345 per_column_metadata: _, chunks_per_chunk_id,
347 chunk_ids_per_min_row_id,
348 temporal_chunk_ids_per_entity_per_component,
349 temporal_chunk_ids_per_entity,
350 temporal_chunks_stats: _,
351 static_chunk_ids_per_entity: _, static_chunks_stats: _, insert_id: _,
354 gc_id: _,
355 event_id: _,
356 } = self;
357
358 let mut diffs = Vec::new();
359
360 debug_assert!(
368 chunk_ids_dangling.is_empty(),
369 "detected dangling chunks -- there's a GC bug"
370 );
371 if !chunk_ids_dangling.is_empty() {
372 re_tracing::profile_scope!("dangling");
373
374 chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
375 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
376 !chunk_ids.is_empty()
377 });
378
379 for temporal_chunk_ids_per_timeline in temporal_chunk_ids_per_entity.values_mut() {
381 for temporal_chunk_ids_per_time in temporal_chunk_ids_per_timeline.values_mut()
382 {
383 let ChunkIdSetPerTime {
384 max_interval_length: _,
385 per_start_time,
386 per_end_time,
387 } = temporal_chunk_ids_per_time;
388
389 for chunk_ids in per_start_time.values_mut() {
396 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
397 }
398 for chunk_ids in per_end_time.values_mut() {
399 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
400 }
401 }
402 }
403
404 for temporal_chunk_ids_per_component in
406 temporal_chunk_ids_per_entity_per_component.values_mut()
407 {
408 for temporal_chunk_ids_per_timeline in
409 temporal_chunk_ids_per_component.values_mut()
410 {
411 for temporal_chunk_ids_per_time in
412 temporal_chunk_ids_per_timeline.values_mut()
413 {
414 let ChunkIdSetPerTime {
415 max_interval_length: _,
416 per_start_time,
417 per_end_time,
418 } = temporal_chunk_ids_per_time;
419
420 for chunk_ids in per_start_time.values_mut() {
427 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
428 }
429 for chunk_ids in per_end_time.values_mut() {
430 chunk_ids.retain(|chunk_id| !chunk_ids_dangling.contains(chunk_id));
431 }
432 }
433 }
434 }
435
436 diffs.extend(
437 chunk_ids_dangling
438 .into_iter()
439 .filter_map(|chunk_id| chunks_per_chunk_id.remove(&chunk_id))
440 .map(ChunkStoreDiff::deletion),
441 );
442 }
443
444 if !chunk_ids_to_be_removed.is_empty() {
445 diffs.extend(self.remove_chunks(
446 chunk_ids_to_be_removed,
447 Some((start_time, options.time_budget)),
448 ));
449 }
450
451 diffs
452 }
453 }
454
455 pub(crate) fn remove_chunk(&mut self, chunk_id: ChunkId) -> Vec<ChunkStoreDiff> {
461 let Some(chunk) = self.chunks_per_chunk_id.get(&chunk_id) else {
462 return Vec::new();
463 };
464
465 let mut chunk_ids_to_be_removed =
466 RemovableChunkIdPerTimePerComponentPerTimelinePerEntity::default();
467
468 {
469 let chunk_ids_to_be_removed = chunk_ids_to_be_removed
470 .entry(chunk.entity_path().clone())
471 .or_default();
472
473 for (timeline, time_range_per_component) in chunk.time_range_per_component() {
474 let chunk_ids_to_be_removed = chunk_ids_to_be_removed.entry(timeline).or_default();
475
476 for (component_name, per_desc) in time_range_per_component {
477 for (_component_desc, time_range) in per_desc {
478 let chunk_ids_to_be_removed =
479 chunk_ids_to_be_removed.entry(component_name).or_default();
480
481 chunk_ids_to_be_removed
482 .entry(time_range.min())
483 .or_default()
484 .push(chunk.id());
485 chunk_ids_to_be_removed
486 .entry(time_range.max())
487 .or_default()
488 .push(chunk.id());
489 }
490 }
491 }
492 }
493
494 self.remove_chunks(chunk_ids_to_be_removed, None)
495 }
496
497 pub(crate) fn remove_chunks(
504 &mut self,
505 chunk_ids_to_be_removed: RemovableChunkIdPerTimePerComponentPerTimelinePerEntity,
506 time_budget: Option<(Instant, Duration)>,
507 ) -> Vec<ChunkStoreDiff> {
508 re_tracing::profile_function!();
509
510 let mut chunk_ids_removed = HashSet::default();
514
515 for (entity_path, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
521 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_timeline) = self
522 .temporal_chunk_ids_per_entity_per_component
523 .entry(entity_path.clone())
524 else {
525 continue;
526 };
527
528 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_timeline_componentless) =
529 self.temporal_chunk_ids_per_entity.entry(entity_path)
530 else {
531 continue;
532 };
533
534 for (timeline, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
535 {
537 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_time_componentless) =
538 temporal_chunk_ids_per_timeline_componentless
539 .get_mut()
540 .entry(timeline)
541 else {
542 continue;
543 };
544
545 let ChunkIdSetPerTime {
546 max_interval_length: _,
547 per_start_time,
548 per_end_time,
549 } = temporal_chunk_ids_per_time_componentless.get_mut();
550
551 for chunk_ids_to_be_removed in chunk_ids_to_be_removed.values() {
558 for (&time, chunk_ids) in chunk_ids_to_be_removed {
559 if let BTreeMapEntry::Occupied(mut chunk_id_set) =
560 per_start_time.entry(time)
561 {
562 for chunk_id in chunk_ids {
563 chunk_id_set.get_mut().remove(chunk_id);
564 }
565 if chunk_id_set.get().is_empty() {
566 chunk_id_set.remove_entry();
567 }
568 }
569
570 if let BTreeMapEntry::Occupied(mut chunk_id_set) =
571 per_end_time.entry(time)
572 {
573 for chunk_id in chunk_ids {
574 chunk_id_set.get_mut().remove(chunk_id);
575 }
576 if chunk_id_set.get().is_empty() {
577 chunk_id_set.remove_entry();
578 }
579 }
580
581 chunk_ids_removed.extend(chunk_ids);
582 }
583
584 if let Some((start_time, time_budget)) = time_budget {
585 if start_time.elapsed() >= time_budget {
586 break;
587 }
588 }
589 }
590
591 if per_start_time.is_empty() && per_end_time.is_empty() {
592 temporal_chunk_ids_per_time_componentless.remove_entry();
593 }
594 }
595
596 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_component) =
602 temporal_chunk_ids_per_timeline.get_mut().entry(timeline)
603 else {
604 continue;
605 };
606
607 for (component_name, chunk_ids_to_be_removed) in chunk_ids_to_be_removed {
608 let HashMapEntry::Occupied(mut temporal_chunk_ids_per_time) =
609 temporal_chunk_ids_per_component
610 .get_mut()
611 .entry(component_name)
612 else {
613 continue;
614 };
615
616 let ChunkIdSetPerTime {
617 max_interval_length: _,
618 per_start_time,
619 per_end_time,
620 } = temporal_chunk_ids_per_time.get_mut();
621
622 for (time, chunk_ids) in chunk_ids_to_be_removed {
629 if let BTreeMapEntry::Occupied(mut chunk_id_set) =
630 per_start_time.entry(time)
631 {
632 for chunk_id in chunk_ids
633 .iter()
634 .filter(|chunk_id| chunk_ids_removed.contains(*chunk_id))
635 {
636 chunk_id_set.get_mut().remove(chunk_id);
637 }
638 if chunk_id_set.get().is_empty() {
639 chunk_id_set.remove_entry();
640 }
641 }
642
643 if let BTreeMapEntry::Occupied(mut chunk_id_set) = per_end_time.entry(time)
644 {
645 for chunk_id in chunk_ids
646 .iter()
647 .filter(|chunk_id| chunk_ids_removed.contains(*chunk_id))
648 {
649 chunk_id_set.get_mut().remove(chunk_id);
650 }
651 if chunk_id_set.get().is_empty() {
652 chunk_id_set.remove_entry();
653 }
654 }
655 }
656
657 if per_start_time.is_empty() && per_end_time.is_empty() {
658 temporal_chunk_ids_per_time.remove_entry();
659 }
660 }
661
662 if temporal_chunk_ids_per_component.get().is_empty() {
663 temporal_chunk_ids_per_component.remove_entry();
664 }
665 }
666
667 if temporal_chunk_ids_per_timeline.get().is_empty() {
668 temporal_chunk_ids_per_timeline.remove_entry();
669 }
670
671 if temporal_chunk_ids_per_timeline_componentless
672 .get()
673 .is_empty()
674 {
675 temporal_chunk_ids_per_timeline_componentless.remove_entry();
676 }
677 }
678
679 self.chunk_ids_per_min_row_id.retain(|_row_id, chunk_ids| {
680 chunk_ids.retain(|chunk_id| !chunk_ids_removed.contains(chunk_id));
681 !chunk_ids.is_empty()
682 });
683
684 chunk_ids_removed
685 .into_iter()
686 .filter_map(|chunk_id| self.chunks_per_chunk_id.remove(&chunk_id))
687 .inspect(|chunk| {
688 self.temporal_chunks_stats -= ChunkStoreChunkStats::from_chunk(chunk);
689 })
690 .map(ChunkStoreDiff::deletion)
691 .collect()
692 }
693}