re_data_store/
store_gc.rs

1use std::{collections::BTreeMap, time::Duration};
2
3use ahash::{HashMap, HashSet};
4use web_time::Instant;
5
6use re_log_types::{
7    DataCell, EntityPath, EntityPathHash, ResolvedTimeRange, RowId, TimeInt, TimePoint, Timeline,
8    VecDequeRemovalExt as _,
9};
10use re_types_core::{ComponentName, SizeBytes as _};
11
12use crate::{
13    store::{IndexedBucketInner, IndexedTable},
14    DataStore, DataStoreStats, StoreDiff, StoreDiffKind, StoreEvent,
15};
16
17// ---
18
19#[derive(Debug, Clone, Copy)]
20pub enum GarbageCollectionTarget {
21    /// Try to drop _at least_ the given fraction.
22    ///
23    /// The fraction must be a float in the range [0.0 : 1.0].
24    DropAtLeastFraction(f64),
25
26    /// GC Everything that isn't protected
27    Everything,
28}
29
30#[derive(Debug, Clone)]
31pub struct GarbageCollectionOptions {
32    /// What target threshold should the GC try to meet.
33    pub target: GarbageCollectionTarget,
34
35    /// How long the garbage collection in allowed to run for.
36    ///
37    /// Trades off latency for throughput:
38    /// - A smaller `time_budget` will clear less data in a shorter amount of time, allowing for a
39    ///   more responsive UI at the cost of more GC overhead and more frequent runs.
40    /// - A larger `time_budget` will clear more data in a longer amount of time, increasing the
41    ///   chance of UI freeze frames but decreasing GC overhead and running less often.
42    ///
43    /// The default is an unbounded time budget (i.e. throughput only).
44    pub time_budget: Duration,
45
46    /// How many component revisions to preserve on each timeline.
47    pub protect_latest: usize,
48
49    /// Whether to purge tables that no longer contain any data
50    pub purge_empty_tables: bool,
51
52    /// Components which should not be protected from GC when using `protect_latest`
53    pub dont_protect_components: HashSet<ComponentName>,
54
55    /// Timelines which should not be protected from GC when using `protect_latest`
56    pub dont_protect_timelines: HashSet<Timeline>,
57
58    /// Whether to enable batched bucket drops.
59    ///
60    /// Disabled by default as it is currently slower in most cases (somehow).
61    pub enable_batching: bool,
62}
63
64impl GarbageCollectionOptions {
65    pub fn gc_everything() -> Self {
66        Self {
67            target: GarbageCollectionTarget::Everything,
68            time_budget: std::time::Duration::MAX,
69            protect_latest: 0,
70            purge_empty_tables: true,
71            dont_protect_components: Default::default(),
72            dont_protect_timelines: Default::default(),
73            enable_batching: false,
74        }
75    }
76}
77
78impl std::fmt::Display for GarbageCollectionTarget {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            Self::DropAtLeastFraction(p) => {
82                write!(f, "DropAtLeast({:.3}%)", *p * 100.0)
83            }
84            Self::Everything => write!(f, "Everything"),
85        }
86    }
87}
88
89impl DataStore {
90    /// Triggers a garbage collection according to the desired `target`.
91    ///
92    /// Garbage collection's performance is bounded by the number of buckets in each table (for
93    /// each `RowId`, we have to find the corresponding bucket, which is roughly `O(log(n))`) as
94    /// well as the number of rows in each of those buckets (for each `RowId`, we have to sort the
95    /// corresponding bucket (roughly `O(n*log(n))`) and then find the corresponding row (roughly
96    /// `O(log(n))`.
97    /// The size of the data itself has no impact on performance.
98    ///
99    /// Returns the list of `RowId`s that were purged from the store.
100    ///
101    /// ## Semantics
102    ///
103    /// Garbage collection works on a row-level basis and is driven by [`RowId`] order,
104    /// i.e. the order defined by the clients' wall-clocks, allowing it to drop data across
105    /// the different timelines in a fair, deterministic manner.
106    /// Similarly, out-of-order data is supported out of the box.
107    ///
108    /// The garbage collector doesn't deallocate data in and of itself: all it does is drop the
109    /// store's internal references to that data (the `DataCell`s), which will be deallocated once
110    /// their reference count reaches 0.
111    ///
112    /// ## Limitations
113    ///
114    /// The garbage collector has limited support for latest-at semantics. The configuration option:
115    /// [`GarbageCollectionOptions::protect_latest`] will protect the N latest values of each
116    /// component on each timeline. The only practical guarantee this gives is that a latest-at query
117    /// with a value of max-int will be unchanged. However, latest-at queries from other arbitrary
118    /// points in time may provide different results pre- and post- GC.
119    pub fn gc(&mut self, options: &GarbageCollectionOptions) -> (Vec<StoreEvent>, DataStoreStats) {
120        re_tracing::profile_function!();
121
122        self.gc_id += 1;
123
124        let stats_before = DataStoreStats::from_store(self);
125
126        let (initial_num_rows, initial_num_bytes) = stats_before.total_rows_and_bytes();
127
128        let protected_rows = self.find_all_protected_rows(
129            options.protect_latest,
130            &options.dont_protect_components,
131            &options.dont_protect_timelines,
132        );
133
134        let mut diffs = match options.target {
135            GarbageCollectionTarget::DropAtLeastFraction(p) => {
136                assert!((0.0..=1.0).contains(&p));
137
138                let num_bytes_to_drop = initial_num_bytes * p;
139                let target_num_bytes = initial_num_bytes - num_bytes_to_drop;
140
141                re_log::trace!(
142                    kind = "gc",
143                    id = self.gc_id,
144                    %options.target,
145                    initial_num_rows = re_format::format_uint(initial_num_rows),
146                    initial_num_bytes = re_format::format_bytes(initial_num_bytes),
147                    target_num_bytes = re_format::format_bytes(target_num_bytes),
148                    drop_at_least_num_bytes = re_format::format_bytes(num_bytes_to_drop),
149                    "starting GC"
150                );
151
152                self.gc_drop_at_least_num_bytes(options, num_bytes_to_drop, &protected_rows)
153            }
154            GarbageCollectionTarget::Everything => {
155                re_log::trace!(
156                    kind = "gc",
157                    id = self.gc_id,
158                    %options.target,
159                    initial_num_rows = re_format::format_uint(initial_num_rows),
160                    initial_num_bytes = re_format::format_bytes(initial_num_bytes),
161                    "starting GC"
162                );
163
164                self.gc_drop_at_least_num_bytes(options, f64::INFINITY, &protected_rows)
165            }
166        };
167
168        if options.purge_empty_tables {
169            diffs.extend(self.purge_empty_tables());
170        }
171
172        #[cfg(debug_assertions)]
173        #[allow(clippy::unwrap_used)]
174        self.sanity_check().unwrap();
175
176        // NOTE: only temporal data and row metadata get purged!
177        let stats_after = DataStoreStats::from_store(self);
178        let (new_num_rows, new_num_bytes) = stats_after.total_rows_and_bytes();
179
180        re_log::trace!(
181            kind = "gc",
182            id = self.gc_id,
183            %options.target,
184            initial_num_rows = re_format::format_uint(initial_num_rows),
185            initial_num_bytes = re_format::format_bytes(initial_num_bytes),
186            new_num_rows = re_format::format_uint(new_num_rows),
187            new_num_bytes = re_format::format_bytes(new_num_bytes),
188            "GC done"
189        );
190
191        let stats_diff = stats_before - stats_after;
192
193        let events: Vec<_> = diffs
194            .into_iter()
195            .map(|diff| StoreEvent {
196                store_id: self.id.clone(),
197                store_generation: self.generation(),
198                event_id: self
199                    .event_id
200                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
201                diff,
202            })
203            .collect();
204
205        {
206            if cfg!(debug_assertions) {
207                let any_event_other_than_deletion =
208                    events.iter().any(|e| e.kind != StoreDiffKind::Deletion);
209                assert!(!any_event_other_than_deletion);
210            }
211
212            Self::on_events(&events);
213        }
214
215        (events, stats_diff)
216    }
217
218    /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
219    fn gc_drop_at_least_num_bytes(
220        &mut self,
221        options: &GarbageCollectionOptions,
222        mut num_bytes_to_drop: f64,
223        protected_rows: &HashSet<RowId>,
224    ) -> Vec<StoreDiff> {
225        re_tracing::profile_function!();
226
227        let mut diffs = Vec::new();
228
229        // The algorithm is straightforward:
230        // 1. Accumulate a bunch of `RowId`s in ascending order, starting from the beginning of time.
231        // 2. Check if any `RowId` in the batch is protected, in which case the entire batch is
232        //    considered protected and cannot be dropped all at once.
233        // 3. Send the batch to `drop_batch` to handle the actual deletion.
234        // 4. Removed the dropped rows from the metadata registry.
235
236        let batch_size = (self.config.indexed_bucket_num_rows as usize).saturating_mul(2);
237        let batch_size = batch_size.clamp(64, 4096);
238
239        let mut batch: Vec<(TimePoint, (EntityPathHash, RowId))> = Vec::with_capacity(batch_size);
240        let mut batch_is_protected = false;
241
242        let Self {
243            metadata_registry,
244            tables,
245            ..
246        } = self;
247
248        let now = Instant::now();
249        for (&row_id, (timepoint, entity_path_hash)) in &metadata_registry.registry {
250            if protected_rows.contains(&row_id) {
251                batch_is_protected = true;
252                continue;
253            }
254
255            batch.push((timepoint.clone(), (*entity_path_hash, row_id)));
256            if batch.len() < batch_size {
257                continue;
258            }
259
260            let dropped = Self::drop_batch(
261                options,
262                tables,
263                &mut num_bytes_to_drop,
264                &batch,
265                batch_is_protected,
266            );
267
268            // Only decrement the metadata size trackers if we're actually certain that we'll drop
269            // that RowId in the end.
270            for dropped in dropped {
271                let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes()
272                    + dropped.timepoint().total_size_bytes()
273                    + dropped.entity_path.hash().total_size_bytes();
274                metadata_registry.heap_size_bytes = metadata_registry
275                    .heap_size_bytes
276                    .checked_sub(metadata_dropped_size_bytes)
277                    .unwrap_or_else(|| {
278                        re_log::debug!(
279                            entity_path = %dropped.entity_path,
280                            current = metadata_registry.heap_size_bytes,
281                            removed = metadata_dropped_size_bytes,
282                            "book keeping underflowed"
283                        );
284                        u64::MIN
285                    });
286                num_bytes_to_drop -= metadata_dropped_size_bytes as f64;
287
288                diffs.push(dropped);
289            }
290
291            if now.elapsed() >= options.time_budget || num_bytes_to_drop <= 0.0 {
292                break;
293            }
294
295            batch.clear();
296            batch_is_protected = false;
297        }
298
299        // Handle leftovers.
300        {
301            let dropped = Self::drop_batch(
302                options,
303                tables,
304                &mut num_bytes_to_drop,
305                &batch,
306                batch_is_protected,
307            );
308
309            // Only decrement the metadata size trackers if we're actually certain that we'll drop
310            // that RowId in the end.
311            for dropped in dropped {
312                let metadata_dropped_size_bytes = dropped.row_id.total_size_bytes()
313                    + dropped.timepoint().total_size_bytes()
314                    + dropped.entity_path.hash().total_size_bytes();
315                metadata_registry.heap_size_bytes = metadata_registry
316                    .heap_size_bytes
317                    .checked_sub(metadata_dropped_size_bytes)
318                    .unwrap_or_else(|| {
319                        re_log::debug!(
320                            entity_path = %dropped.entity_path,
321                            current = metadata_registry.heap_size_bytes,
322                            removed = metadata_dropped_size_bytes,
323                            "book keeping underflowed"
324                        );
325                        u64::MIN
326                    });
327                num_bytes_to_drop -= metadata_dropped_size_bytes as f64;
328
329                diffs.push(dropped);
330            }
331        }
332
333        // Purge the removed rows from the metadata_registry.
334        // This is safe because the entire GC process is driven by RowId-order.
335        for diff in &diffs {
336            metadata_registry.remove(&diff.row_id);
337        }
338
339        diffs
340    }
341
342    #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
343    fn drop_batch(
344        options: &GarbageCollectionOptions,
345        tables: &mut BTreeMap<(EntityPathHash, Timeline), IndexedTable>,
346        num_bytes_to_drop: &mut f64,
347        batch: &[(TimePoint, (EntityPathHash, RowId))],
348        batch_is_protected: bool,
349    ) -> Vec<StoreDiff> {
350        let &GarbageCollectionOptions {
351            enable_batching, ..
352        } = options;
353
354        let mut diffs = Vec::new();
355
356        // The algorithm is straightforward:
357        // 1. If the batch isn't protected, find and drop all buckets that are guaranteed to
358        //    contain only rows older than the ones in the batch.
359        // 2. Check how many bytes were dropped; continue if we haven't met our objective.
360        // 3. Fallback to deletion of individual rows.
361        // 4. Check how many bytes were dropped; continue if we haven't met our objective.
362
363        // NOTE: The batch is already sorted by definition since it's extracted from the registry's btreemap.
364        let max_row_id = batch.last().map(|(_, (_, row_id))| *row_id);
365
366        if enable_batching && max_row_id.is_some() && !batch_is_protected {
367            // NOTE: unwrap cannot fail but just a precaution in case this code moves around…
368            let max_row_id = max_row_id.unwrap_or(RowId::ZERO);
369
370            let mut batch_removed: HashMap<RowId, StoreDiff> = HashMap::default();
371            let mut cur_entity_path_hash = None;
372
373            // NOTE: We _must_  go through all tables no matter what, since the batch might contain
374            // any number of distinct entities.
375            for ((entity_path_hash, _), table) in &mut *tables {
376                let (removed, num_bytes_removed) = table.try_drop_bucket(max_row_id);
377
378                *num_bytes_to_drop -= num_bytes_removed as f64;
379
380                if cur_entity_path_hash != Some(*entity_path_hash) {
381                    diffs.extend(batch_removed.drain().map(|(_, diff)| diff));
382
383                    cur_entity_path_hash = Some(*entity_path_hash);
384                }
385
386                for mut removed in removed {
387                    batch_removed
388                        .entry(removed.row_id)
389                        .and_modify(|diff| {
390                            diff.times.extend(std::mem::take(&mut removed.times));
391                        })
392                        .or_insert(removed);
393                }
394            }
395
396            diffs.extend(batch_removed.drain().map(|(_, diff)| diff));
397        }
398
399        if *num_bytes_to_drop <= 0.0 {
400            return diffs;
401        }
402
403        for (timepoint, (entity_path_hash, row_id)) in batch {
404            let mut diff: Option<StoreDiff> = None;
405
406            // find all tables that could possibly contain this `RowId`
407            for (&timeline, &time) in timepoint {
408                if let Some(table) = tables.get_mut(&(*entity_path_hash, timeline)) {
409                    let (removed, num_bytes_removed) = table.try_drop_row(*row_id, time);
410                    if let Some(inner) = diff.as_mut() {
411                        if let Some(removed) = removed {
412                            inner.times.extend(removed.times);
413                        }
414                    } else {
415                        diff = removed;
416                    }
417                    *num_bytes_to_drop -= num_bytes_removed as f64;
418                }
419            }
420
421            diffs.extend(diff);
422
423            if *num_bytes_to_drop <= 0.0 {
424                break;
425            }
426        }
427
428        diffs
429    }
430
431    /// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s.
432    //
433    // TODO(jleibs): More complex functionality might required expanding this to also
434    // *ignore* specific entities, components, timelines, etc. for this protection.
435    //
436    // TODO(jleibs): `RowId`s should never overlap between entities. Creating a single large
437    // HashSet might actually be sub-optimal here. Consider switching to a map of
438    // `EntityPath` -> `HashSet<RowId>`.
439    // Update: this is true-er than ever before now that RowIds are truly unique!
440    fn find_all_protected_rows(
441        &mut self,
442        target_count: usize,
443        dont_protect_components: &HashSet<ComponentName>,
444        dont_protect_timelines: &HashSet<Timeline>,
445    ) -> HashSet<RowId> {
446        re_tracing::profile_function!();
447
448        if target_count == 0 {
449            return Default::default();
450        }
451
452        // We need to sort to be able to determine latest-at.
453        self.sort_indices_if_needed();
454
455        let mut protected_rows: HashSet<RowId> = Default::default();
456
457        // Find all protected rows in regular indexed tables
458        for ((_, timeline), table) in &self.tables {
459            if dont_protect_timelines.contains(timeline) {
460                continue;
461            }
462            let mut components_to_find: HashMap<ComponentName, usize> = table
463                .all_components
464                .iter()
465                .filter(|c| !dont_protect_components.contains(*c))
466                .map(|c| (*c, target_count))
467                .collect();
468
469            for bucket in table.buckets.values().rev() {
470                for (component, count) in &mut components_to_find {
471                    if *count == 0 {
472                        continue;
473                    }
474                    let inner = bucket.inner.read();
475                    // TODO(jleibs): If the entire column for a component is empty, we should
476                    // make sure the column is dropped so we don't have to iterate over a
477                    // bunch of Nones.
478                    if let Some(column) = inner.columns.get(component) {
479                        for row in column
480                            .iter()
481                            .enumerate()
482                            .rev()
483                            .filter_map(|(row_index, cell)| {
484                                cell.as_ref().and_then(|_| inner.col_row_id.get(row_index))
485                            })
486                            .take(*count)
487                        {
488                            *count -= 1;
489                            protected_rows.insert(*row);
490                        }
491                    }
492                }
493            }
494        }
495
496        protected_rows
497    }
498
499    /// Remove any tables which contain only components which are empty.
500    // TODO(jleibs): We could optimize this further by also erasing empty columns.
501    fn purge_empty_tables(&mut self) -> impl Iterator<Item = StoreDiff> {
502        re_tracing::profile_function!();
503
504        let mut diffs: BTreeMap<RowId, StoreDiff> = BTreeMap::default();
505
506        self.tables.retain(|_, table| {
507            // If any bucket has a non-empty component in any column, we keep it…
508            for bucket in table.buckets.values() {
509                let inner = bucket.inner.read();
510                for column in inner.columns.values() {
511                    if column
512                        .iter()
513                        .any(|cell| cell.as_ref().map_or(false, |cell| cell.num_instances() > 0))
514                    {
515                        return true;
516                    }
517                }
518            }
519
520            // …otherwise we can drop it.
521
522            let entity_path = table.entity_path.clone();
523
524            for bucket in table.buckets.values() {
525                let mut inner = bucket.inner.write();
526
527                for i in 0..inner.col_row_id.len() {
528                    let row_id = inner.col_row_id[i];
529                    let time = inner.col_time[i];
530
531                    let diff = diffs
532                        .entry(row_id)
533                        .or_insert_with(|| StoreDiff::deletion(row_id, entity_path.clone()));
534
535                    diff.times
536                        .push((bucket.timeline, TimeInt::new_temporal(time)));
537
538                    for column in &mut inner.columns.values_mut() {
539                        let cell = column[i].take();
540                        if let Some(cell) = cell {
541                            diff.insert(cell);
542                        }
543                    }
544                }
545            }
546
547            false
548        });
549
550        diffs.into_values()
551    }
552}
553
554impl IndexedTable {
555    /// Try to drop an entire bucket at once if it doesn't contain any `RowId` greater than `max_row_id`.
556    fn try_drop_bucket(&mut self, max_row_id: RowId) -> (Vec<StoreDiff>, u64) {
557        re_tracing::profile_function!();
558
559        let entity_path = self.entity_path.clone();
560        let timeline = self.timeline;
561
562        let mut diffs: Vec<StoreDiff> = Vec::new();
563        let mut dropped_num_bytes = 0u64;
564        let mut dropped_num_rows = 0u64;
565
566        let mut dropped_bucket_times = HashSet::default();
567
568        // TODO(cmc): scaling linearly with the number of buckets could be improved, although this
569        // is quite fast in practice because of the early check.
570        for (bucket_time, bucket) in &self.buckets {
571            let inner = &mut *bucket.inner.write();
572
573            if inner.col_time.is_empty() || max_row_id < inner.max_row_id {
574                continue;
575            }
576
577            let IndexedBucketInner {
578                mut col_time,
579                mut col_row_id,
580                mut columns,
581                size_bytes,
582                ..
583            } = std::mem::take(inner);
584
585            dropped_bucket_times.insert(*bucket_time);
586
587            while let Some(row_id) = col_row_id.pop_front() {
588                let mut diff = StoreDiff::deletion(row_id, entity_path.clone());
589
590                if let Some(time) = col_time.pop_front() {
591                    diff.times.push((timeline, TimeInt::new_temporal(time)));
592                }
593
594                for (component_name, column) in &mut columns {
595                    if let Some(cell) = column.pop_front().flatten() {
596                        diff.cells.insert(*component_name, cell);
597                    }
598                }
599
600                diffs.push(diff);
601            }
602
603            dropped_num_bytes += size_bytes;
604            dropped_num_rows += col_time.len() as u64;
605        }
606
607        self.buckets
608            .retain(|bucket_time, _| !dropped_bucket_times.contains(bucket_time));
609
610        self.uphold_indexing_invariants();
611
612        self.buckets_num_rows -= dropped_num_rows;
613        self.buckets_size_bytes -= dropped_num_bytes;
614
615        (diffs, dropped_num_bytes)
616    }
617
618    /// Tries to drop the given `row_id` from the table, which is expected to be found at the
619    /// specified `time`.
620    ///
621    /// Returns how many bytes were actually dropped, or zero if the row wasn't found.
622    fn try_drop_row(&mut self, row_id: RowId, time: TimeInt) -> (Option<StoreDiff>, u64) {
623        re_tracing::profile_function!();
624
625        let entity_path = self.entity_path.clone();
626        let timeline = self.timeline;
627
628        let table_has_more_than_one_bucket = self.buckets.len() > 1;
629
630        let (bucket_key, bucket) = self.find_bucket_mut(time);
631        let bucket_num_bytes = bucket.total_size_bytes();
632
633        let (diff, mut dropped_num_bytes) = {
634            let inner = &mut *bucket.inner.write();
635            inner.try_drop_row(row_id, timeline, &entity_path, time)
636        };
637
638        // NOTE: We always need to keep at least one bucket alive, otherwise we have
639        // nowhere to write to.
640        if table_has_more_than_one_bucket && bucket.num_rows() == 0 {
641            // NOTE: We're dropping the bucket itself in this case, rather than just its
642            // contents.
643            debug_assert!(
644                dropped_num_bytes <= bucket_num_bytes,
645                "Bucket contained more bytes than it thought"
646            );
647            dropped_num_bytes = bucket_num_bytes;
648            self.buckets.remove(&bucket_key);
649
650            self.uphold_indexing_invariants();
651        }
652
653        self.buckets_size_bytes -= dropped_num_bytes;
654        self.buckets_num_rows -= (dropped_num_bytes > 0) as u64;
655
656        (diff, dropped_num_bytes)
657    }
658}
659
660impl IndexedBucketInner {
661    /// Tries to drop the given `row_id` from the table, which is expected to be found at the
662    /// specified `time`.
663    ///
664    /// Returns how many bytes were actually dropped, or zero if the row wasn't found.
665    fn try_drop_row(
666        &mut self,
667        row_id: RowId,
668        timeline: Timeline,
669        entity_path: &EntityPath,
670        time: TimeInt,
671    ) -> (Option<StoreDiff>, u64) {
672        self.sort();
673
674        let Self {
675            is_sorted,
676            time_range,
677            col_time,
678            col_insert_id,
679            col_row_id,
680            max_row_id,
681            columns,
682            size_bytes,
683        } = self;
684
685        let mut diff: Option<StoreDiff> = None;
686        let mut dropped_num_bytes = 0u64;
687
688        let mut row_index = col_time.partition_point(|&time2| time2 < time.as_i64());
689        while col_time.get(row_index) == Some(&time.as_i64()) {
690            if col_row_id[row_index] != row_id {
691                row_index += 1;
692                continue;
693            }
694
695            // Update the time_range min/max:
696            if col_time.len() == 1 {
697                // We removed the last row
698                *time_range = ResolvedTimeRange::EMPTY;
699            } else {
700                *is_sorted = row_index == 0 || row_index.saturating_add(1) == col_row_id.len();
701
702                // We have at least two rows, so we can safely [index] here:
703                if row_index == 0 {
704                    // We removed the first row, so the second row holds the new min
705                    time_range.set_min(col_time[1]);
706                }
707                if row_index + 1 == col_time.len() {
708                    // We removed the last row, so the penultimate row holds the new max
709                    time_range.set_max(col_time[row_index - 1]);
710                }
711            }
712
713            // col_row_id
714            let Some(removed_row_id) = col_row_id.swap_remove(row_index) else {
715                continue;
716            };
717            debug_assert_eq!(row_id, removed_row_id);
718            dropped_num_bytes += removed_row_id.total_size_bytes();
719
720            // col_time
721            if let Some(row_time) = col_time.swap_remove(row_index) {
722                dropped_num_bytes += row_time.total_size_bytes();
723            }
724
725            // col_insert_id (if present)
726            if !col_insert_id.is_empty() {
727                if let Some(insert_id) = col_insert_id.swap_remove(row_index) {
728                    dropped_num_bytes += insert_id.total_size_bytes();
729                }
730            }
731
732            // each data column
733            for column in columns.values_mut() {
734                let cell = column.0.swap_remove(row_index).flatten();
735
736                // TODO(#1809): once datatype deduplication is in, we should really not count
737                // autogenerated keys as part of the memory stats (same on write path).
738                dropped_num_bytes += cell.total_size_bytes();
739
740                if let Some(cell) = cell {
741                    if let Some(inner) = diff.as_mut() {
742                        inner.insert(cell);
743                    } else {
744                        let mut d = StoreDiff::deletion(removed_row_id, entity_path.clone());
745                        d.at_timestamp(timeline, time);
746                        d.insert(cell);
747                        diff = Some(d);
748                    }
749                }
750            }
751
752            if *max_row_id == removed_row_id {
753                // NOTE: We _have_ to fullscan here: the bucket is sorted by `(Time, RowId)`, there
754                // could very well be a greater lurking in a lesser entry.
755                *max_row_id = col_row_id.iter().max().copied().unwrap_or(RowId::ZERO);
756            }
757
758            // NOTE: A single `RowId` cannot possibly have more than one datapoint for
759            // a single timeline.
760            break;
761        }
762
763        *size_bytes -= dropped_num_bytes;
764
765        (diff, dropped_num_bytes)
766    }
767}
768
769// ---
770
771impl StoreDiff {
772    fn insert(&mut self, cell: DataCell) {
773        self.cells.insert(cell.component_name(), cell);
774    }
775}