1use std::{collections::BTreeMap, time::Duration};
2
3use ahash::{HashMap, HashSet};
4use web_time::Instant;
5
6use re_log_types::{
7 DataCell, EntityPath, EntityPathHash, ResolvedTimeRange, RowId, TimeInt, TimePoint, Timeline,
8 VecDequeRemovalExt as _,
9};
10use re_types_core::{ComponentName, SizeBytes as _};
11
12use crate::{
13 store::{IndexedBucketInner, IndexedTable},
14 DataStore, DataStoreStats, StoreDiff, StoreDiffKind, StoreEvent,
15};
16
17#[derive(Debug, Clone, Copy)]
20pub enum GarbageCollectionTarget {
21 DropAtLeastFraction(f64),
25
26 Everything,
28}
29
30#[derive(Debug, Clone)]
31pub struct GarbageCollectionOptions {
32 pub target: GarbageCollectionTarget,
34
35 pub time_budget: Duration,
45
46 pub protect_latest: usize,
48
49 pub purge_empty_tables: bool,
51
52 pub dont_protect_components: HashSet<ComponentName>,
54
55 pub dont_protect_timelines: HashSet<Timeline>,
57
58 pub enable_batching: bool,
62}
63
64impl GarbageCollectionOptions {
65 pub fn gc_everything() -> Self {
66 Self {
67 target: GarbageCollectionTarget::Everything,
68 time_budget: std::time::Duration::MAX,
69 protect_latest: 0,
70 purge_empty_tables: true,
71 dont_protect_components: Default::default(),
72 dont_protect_timelines: Default::default(),
73 enable_batching: false,
74 }
75 }
76}
77
78impl std::fmt::Display for GarbageCollectionTarget {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 Self::DropAtLeastFraction(p) => {
82 write!(f, "DropAtLeast({:.3}%)", *p * 100.0)
83 }
84 Self::Everything => write!(f, "Everything"),
85 }
86 }
87}
88
89impl DataStore {
90 pub fn gc(&mut self, options: &GarbageCollectionOptions) -> (Vec<StoreEvent>, DataStoreStats) {
120 re_tracing::profile_function!();
121
122 self.gc_id += 1;
123
124 let stats_before = DataStoreStats::from_store(self);
125
126 let (initial_num_rows, initial_num_bytes) = stats_before.total_rows_and_bytes();
127
128 let protected_rows = self.find_all_protected_rows(
129 options.protect_latest,
130 &options.dont_protect_components,
131 &options.dont_protect_timelines,
132 );
133
134 let mut diffs = match options.target {
135 GarbageCollectionTarget::DropAtLeastFraction(p) => {
136 assert!((0.0..=1.0).contains(&p));
137
138 let num_bytes_to_drop = initial_num_bytes * p;
139 let target_num_bytes = initial_num_bytes - num_bytes_to_drop;
140
141 re_log::trace!(
142 kind = "gc",
143 id = self.gc_id,
144 %options.target,
145 initial_num_rows = re_format::format_uint(initial_num_rows),
146 initial_num_bytes = re_format::format_bytes(initial_num_bytes),
147 target_num_bytes = re_format::format_bytes(target_num_bytes),
148 drop_at_least_num_bytes = re_format::format_bytes(num_bytes_to_drop),
149 "starting GC"
150 );
151
152 self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_rows)
153 }
154 GarbageCollectionTarget::Everything => {
155 re_log::trace!(
156 kind = "gc",
157 id = self.gc_id,
158 %options.target,
159 initial_num_rows = re_format::format_uint(initial_num_rows),
160 initial_num_bytes = re_format::format_bytes(initial_num_bytes),
161 "starting GC"
162 );
163
164 self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_rows)
165 }
166 };
167
168 if options.purge_empty_tables {
169 diffs.extend(self.purge_empty_tables());
170 }
171
172 #[cfg(debug_assertions)]
173 #[allow(clippy::unwrap_used)]
174 self.sanity_check().unwrap();
175
176 let stats_after = DataStoreStats::from_store(self);
178 let (new_num_rows, new_num_bytes) = stats_after.total_rows_and_bytes();
179
180 re_log::trace!(
181 kind = "gc",
182 id = self.gc_id,
183 %options.target,
184 initial_num_rows = re_format::format_uint(initial_num_rows),
185 initial_num_bytes = re_format::format_bytes(initial_num_bytes),
186 new_num_rows = re_format::format_uint(new_num_rows),
187 new_num_bytes = re_format::format_bytes(new_num_bytes),
188 "GC done"
189 );
190
191 let stats_diff = stats_before - stats_after;
192
193 let events: Vec<_> = diffs
194 .into_iter()
195 .map(|diff| StoreEvent {
196 store_id: self.id.clone(),
197 store_generation: self.generation(),
198 event_id: self
199 .event_id
200 .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
201 diff,
202 })
203 .collect();
204
205 {
206 if cfg!(debug_assertions) {
207 let any_event_other_than_deletion =
208 events.iter().any(|e| e.kind != StoreDiffKind::Deletion);
209 assert!(!any_event_other_than_deletion);
210 }
211
212 Self::on_events(&events);
213 }
214
215 (events, stats_diff)
216 }
217
218 fn gc_drop_at_least_num_bytes(
220 &mut self,
221 options: &GarbageCollectionOptions,
222 mut num_bytes_to_drop: f64,
223 protected_rows: &HashSet<RowId>,
224 ) -> Vec<StoreDiff> {
225 re_tracing::profile_function!();
226
227 let mut diffs = Vec::new();
228
229 let batch_size = (self.config.indexed_bucket_num_rows as usize).saturating_mul(2);
237 let batch_size = batch_size.clamp(64, 4096);
238
239 let mut batch: Vec<(TimePoint, (EntityPathHash, RowId))> = Vec::with_capacity(batch_size);
240 let mut batch_is_protected = false;
241
242 let Self {
243 metadata_registry,
244 tables,
245 ..
246 } = self;
247
248 let now = Instant::now();
249 for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry {
250 if protected_rows.contains(&row_id) {
251 batch_is_protected = true;
252 continue;
253 }
254
255 batch.push((timepoint.clone(), (*entity_path_hash, row_id)));
256 if batch.len() < batch_size {
257 continue;
258 }
259
260 let dropped = Self::drop_batch(
261 options,
262 tables,
263 &mut num_bytes_to_drop,
264 &batch,
265 batch_is_protected,
266 );
267
268 for dropped in dropped {
271 let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes()
272 + dropped.timepoint().total_size_bytes()
273 + dropped.entity_path.hash().total_size_bytes();
274 metadata_registry.heap_size_bytes = metadata_registry
275 .heap_size_bytes
276 .checked_sub(metadata_dropped_size_bytes)
277 .unwrap_or_else(|| {
278 re_log::debug!(
279 entity_path = %dropped.entity_path,
280 current = metadata_registry.heap_size_bytes,
281 removed = metadata_dropped_size_bytes,
282 "book keeping underflowed"
283 );
284 u64::MIN
285 });
286 num_bytes_to_drop -= metadata_dropped_size_bytes as f64;
287
288 diffs.push(dropped);
289 }
290
291 if now.elapsed() >= options.time_budget || num_bytes_to_drop <= 0.0 {
292 break;
293 }
294
295 batch.clear();
296 batch_is_protected = false;
297 }
298
299 {
301 let dropped = Self::drop_batch(
302 options,
303 tables,
304 &mut num_bytes_to_drop,
305 &batch,
306 batch_is_protected,
307 );
308
309 for dropped in dropped {
312 let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes()
313 + dropped.timepoint().total_size_bytes()
314 + dropped.entity_path.hash().total_size_bytes();
315 metadata_registry.heap_size_bytes = metadata_registry
316 .heap_size_bytes
317 .checked_sub(metadata_dropped_size_bytes)
318 .unwrap_or_else(|| {
319 re_log::debug!(
320 entity_path = %dropped.entity_path,
321 current = metadata_registry.heap_size_bytes,
322 removed = metadata_dropped_size_bytes,
323 "book keeping underflowed"
324 );
325 u64::MIN
326 });
327 num_bytes_to_drop -= metadata_dropped_size_bytes as f64;
328
329 diffs.push(dropped);
330 }
331 }
332
333 for diff in &diffs {
336 metadata_registry.remove(&diff.row_id);
337 }
338
339 diffs
340 }
341
342 #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
343 fn drop_batch(
344 options: &GarbageCollectionOptions,
345 tables: &mut BTreeMap<(EntityPathHash, Timeline), IndexedTable>,
346 num_bytes_to_drop: &mut f64,
347 batch: &[(TimePoint, (EntityPathHash, RowId))],
348 batch_is_protected: bool,
349 ) -> Vec<StoreDiff> {
350 let &GarbageCollectionOptions {
351 enable_batching, ..
352 } = options;
353
354 let mut diffs = Vec::new();
355
356 let max_row_id = batch.last().map(|(_, (_, row_id))| *row_id);
365
366 if enable_batching && max_row_id.is_some() && !batch_is_protected {
367 let max_row_id = max_row_id.unwrap_or(RowId::ZERO);
369
370 let mut batch_removed: HashMap<RowId, StoreDiff> = HashMap::default();
371 let mut cur_entity_path_hash = None;
372
373 for ((entity_path_hash, _), table) in &mut *tables {
376 let (removed, num_bytes_removed) = table.try_drop_bucket(max_row_id);
377
378 *num_bytes_to_drop -= num_bytes_removed as f64;
379
380 if cur_entity_path_hash != Some(*entity_path_hash) {
381 diffs.extend(batch_removed.drain().map(|(_, diff)| diff));
382
383 cur_entity_path_hash = Some(*entity_path_hash);
384 }
385
386 for mut removed in removed {
387 batch_removed
388 .entry(removed.row_id)
389 .and_modify(|diff| {
390 diff.times.extend(std::mem::take(&mut removed.times));
391 })
392 .or_insert(removed);
393 }
394 }
395
396 diffs.extend(batch_removed.drain().map(|(_, diff)| diff));
397 }
398
399 if *num_bytes_to_drop <= 0.0 {
400 return diffs;
401 }
402
403 for (timepoint, (entity_path_hash, row_id)) in batch {
404 let mut diff: Option<StoreDiff> = None;
405
406 for (&timeline, &time) in timepoint {
408 if let Some(table) = tables.get_mut(&(*entity_path_hash, timeline)) {
409 let (removed, num_bytes_removed) = table.try_drop_row(*row_id, time);
410 if let Some(inner) = diff.as_mut() {
411 if let Some(removed) = removed {
412 inner.times.extend(removed.times);
413 }
414 } else {
415 diff = removed;
416 }
417 *num_bytes_to_drop -= num_bytes_removed as f64;
418 }
419 }
420
421 diffs.extend(diff);
422
423 if *num_bytes_to_drop <= 0.0 {
424 break;
425 }
426 }
427
428 diffs
429 }
430
431 fn find_all_protected_rows(
441 &mut self,
442 target_count: usize,
443 dont_protect_components: &HashSet<ComponentName>,
444 dont_protect_timelines: &HashSet<Timeline>,
445 ) -> HashSet<RowId> {
446 re_tracing::profile_function!();
447
448 if target_count == 0 {
449 return Default::default();
450 }
451
452 self.sort_indices_if_needed();
454
455 let mut protected_rows: HashSet<RowId> = Default::default();
456
457 for ((_, timeline), table) in &self.tables {
459 if dont_protect_timelines.contains(timeline) {
460 continue;
461 }
462 let mut components_to_find: HashMap<ComponentName, usize> = table
463 .all_components
464 .iter()
465 .filter(|c| !dont_protect_components.contains(*c))
466 .map(|c| (*c, target_count))
467 .collect();
468
469 for bucket in table.buckets.values().rev() {
470 for (component, count) in &mut components_to_find {
471 if *count == 0 {
472 continue;
473 }
474 let inner = bucket.inner.read();
475 if let Some(column) = inner.columns.get(component) {
479 for row in column
480 .iter()
481 .enumerate()
482 .rev()
483 .filter_map(|(row_index, cell)| {
484 cell.as_ref().and_then(|_| inner.col_row_id.get(row_index))
485 })
486 .take(*count)
487 {
488 *count -= 1;
489 protected_rows.insert(*row);
490 }
491 }
492 }
493 }
494 }
495
496 protected_rows
497 }
498
499 fn purge_empty_tables(&mut self) -> impl Iterator<Item = StoreDiff> {
502 re_tracing::profile_function!();
503
504 let mut diffs: BTreeMap<RowId, StoreDiff> = BTreeMap::default();
505
506 self.tables.retain(|_, table| {
507 for bucket in table.buckets.values() {
509 let inner = bucket.inner.read();
510 for column in inner.columns.values() {
511 if column
512 .iter()
513 .any(|cell| cell.as_ref().map_or(false, |cell| cell.num_instances() > 0))
514 {
515 return true;
516 }
517 }
518 }
519
520 let entity_path = table.entity_path.clone();
523
524 for bucket in table.buckets.values() {
525 let mut inner = bucket.inner.write();
526
527 for i in 0..inner.col_row_id.len() {
528 let row_id = inner.col_row_id[i];
529 let time = inner.col_time[i];
530
531 let diff = diffs
532 .entry(row_id)
533 .or_insert_with(|| StoreDiff::deletion(row_id, entity_path.clone()));
534
535 diff.times
536 .push((bucket.timeline, TimeInt::new_temporal(time)));
537
538 for column in &mut inner.columns.values_mut() {
539 let cell = column[i].take();
540 if let Some(cell) = cell {
541 diff.insert(cell);
542 }
543 }
544 }
545 }
546
547 false
548 });
549
550 diffs.into_values()
551 }
552}
553
554impl IndexedTable {
555 fn try_drop_bucket(&mut self, max_row_id: RowId) -> (Vec<StoreDiff>, u64) {
557 re_tracing::profile_function!();
558
559 let entity_path = self.entity_path.clone();
560 let timeline = self.timeline;
561
562 let mut diffs: Vec<StoreDiff> = Vec::new();
563 let mut dropped_num_bytes = 0u64;
564 let mut dropped_num_rows = 0u64;
565
566 let mut dropped_bucket_times = HashSet::default();
567
568 for (bucket_time, bucket) in &self.buckets {
571 let inner = &mut *bucket.inner.write();
572
573 if inner.col_time.is_empty() || max_row_id < inner.max_row_id {
574 continue;
575 }
576
577 let IndexedBucketInner {
578 mut col_time,
579 mut col_row_id,
580 mut columns,
581 size_bytes,
582 ..
583 } = std::mem::take(inner);
584
585 dropped_bucket_times.insert(*bucket_time);
586
587 while let Some(row_id) = col_row_id.pop_front() {
588 let mut diff = StoreDiff::deletion(row_id, entity_path.clone());
589
590 if let Some(time) = col_time.pop_front() {
591 diff.times.push((timeline, TimeInt::new_temporal(time)));
592 }
593
594 for (component_name, column) in &mut columns {
595 if let Some(cell) = column.pop_front().flatten() {
596 diff.cells.insert(*component_name, cell);
597 }
598 }
599
600 diffs.push(diff);
601 }
602
603 dropped_num_bytes += size_bytes;
604 dropped_num_rows += col_time.len() as u64;
605 }
606
607 self.buckets
608 .retain(|bucket_time, _| !dropped_bucket_times.contains(bucket_time));
609
610 self.uphold_indexing_invariants();
611
612 self.buckets_num_rows -= dropped_num_rows;
613 self.buckets_size_bytes -= dropped_num_bytes;
614
615 (diffs, dropped_num_bytes)
616 }
617
618 fn try_drop_row(&mut self, row_id: RowId, time: TimeInt) -> (Option<StoreDiff>, u64) {
623 re_tracing::profile_function!();
624
625 let entity_path = self.entity_path.clone();
626 let timeline = self.timeline;
627
628 let table_has_more_than_one_bucket = self.buckets.len() > 1;
629
630 let (bucket_key, bucket) = self.find_bucket_mut(time);
631 let bucket_num_bytes = bucket.total_size_bytes();
632
633 let (diff, mut dropped_num_bytes) = {
634 let inner = &mut *bucket.inner.write();
635 inner.try_drop_row(row_id, timeline, &entity_path, time)
636 };
637
638 if table_has_more_than_one_bucket && bucket.num_rows() == 0 {
641 debug_assert!(
644 dropped_num_bytes <= bucket_num_bytes,
645 "Bucket contained more bytes than it thought"
646 );
647 dropped_num_bytes = bucket_num_bytes;
648 self.buckets.remove(&bucket_key);
649
650 self.uphold_indexing_invariants();
651 }
652
653 self.buckets_size_bytes -= dropped_num_bytes;
654 self.buckets_num_rows -= (dropped_num_bytes > 0) as u64;
655
656 (diff, dropped_num_bytes)
657 }
658}
659
660impl IndexedBucketInner {
661 fn try_drop_row(
666 &mut self,
667 row_id: RowId,
668 timeline: Timeline,
669 entity_path: &EntityPath,
670 time: TimeInt,
671 ) -> (Option<StoreDiff>, u64) {
672 self.sort();
673
674 let Self {
675 is_sorted,
676 time_range,
677 col_time,
678 col_insert_id,
679 col_row_id,
680 max_row_id,
681 columns,
682 size_bytes,
683 } = self;
684
685 let mut diff: Option<StoreDiff> = None;
686 let mut dropped_num_bytes = 0u64;
687
688 let mut row_index = col_time.partition_point(|&time2| time2 < time.as_i64());
689 while col_time.get(row_index) == Some(&time.as_i64()) {
690 if col_row_id[row_index] != row_id {
691 row_index += 1;
692 continue;
693 }
694
695 if col_time.len() == 1 {
697 *time_range = ResolvedTimeRange::EMPTY;
699 } else {
700 *is_sorted = row_index == 0 || row_index.saturating_add(1) == col_row_id.len();
701
702 if row_index == 0 {
704 time_range.set_min(col_time[1]);
706 }
707 if row_index + 1 == col_time.len() {
708 time_range.set_max(col_time[row_index - 1]);
710 }
711 }
712
713 let Some(removed_row_id) = col_row_id.swap_remove(row_index) else {
715 continue;
716 };
717 debug_assert_eq!(row_id, removed_row_id);
718 dropped_num_bytes += removed_row_id.total_size_bytes();
719
720 if let Some(row_time) = col_time.swap_remove(row_index) {
722 dropped_num_bytes += row_time.total_size_bytes();
723 }
724
725 if !col_insert_id.is_empty() {
727 if let Some(insert_id) = col_insert_id.swap_remove(row_index) {
728 dropped_num_bytes += insert_id.total_size_bytes();
729 }
730 }
731
732 for column in columns.values_mut() {
734 let cell = column.0.swap_remove(row_index).flatten();
735
736 dropped_num_bytes += cell.total_size_bytes();
739
740 if let Some(cell) = cell {
741 if let Some(inner) = diff.as_mut() {
742 inner.insert(cell);
743 } else {
744 let mut d = StoreDiff::deletion(removed_row_id, entity_path.clone());
745 d.at_timestamp(timeline, time);
746 d.insert(cell);
747 diff = Some(d);
748 }
749 }
750 }
751
752 if *max_row_id == removed_row_id {
753 *max_row_id = col_row_id.iter().max().copied().unwrap_or(RowId::ZERO);
756 }
757
758 break;
761 }
762
763 *size_bytes -= dropped_num_bytes;
764
765 (diff, dropped_num_bytes)
766 }
767}
768
769impl StoreDiff {
772 fn insert(&mut self, cell: DataCell) {
773 self.cells.insert(cell.component_name(), cell);
774 }
775}