Skip to main content

re_chunk_store/
store.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::sync::atomic::AtomicU64;
4
5use ahash::{HashMap, HashSet};
6use arrow::datatypes::DataType as ArrowDataType;
7use itertools::Itertools as _;
8use nohash_hasher::IntMap;
9use parking_lot::RwLock;
10use re_log::debug_assert;
11
12use re_chunk::{Chunk, ChunkId, ComponentIdentifier, RowId, TimelineName};
13use re_log_types::{EntityPath, StoreId, TimeInt, TimeType};
14use re_types_core::{ComponentDescriptor, ComponentType};
15
16use crate::{ChunkDirectLineage, ChunkStoreChunkStats, ChunkStoreError, ChunkStoreResult};
17
18// ---
19
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct ChunkStoreConfig {
22    /// If `true` (the default), the store will emit events when its contents are modified in
23    /// any way (insertion, GC), that can be subscribed to.
24    ///
25    /// Leaving this disabled can lead to major performance improvements on the ingestion path
26    /// in some workloads, provided that the subscribers aren't needed (e.g. headless mode).
27    pub enable_changelog: bool,
28
29    /// What is the threshold, in bytes, after which a [`Chunk`] cannot be compacted any further?
30    ///
31    /// This is a multi-dimensional trade-off:
32    /// * Larger chunks lead to less fixed overhead introduced by metadata, indices and such. Good.
33    /// * Larger chunks lead to slower query execution on some unhappy paths. Bad.
34    /// * Larger chunks lead to slower and slower compaction as chunks grow larger. Bad.
35    /// * Larger chunks lead to coarser garbage collection. Good or bad depending on use case.
36    /// * Larger chunks lead to less precision in e.g. the time panel. Bad.
37    ///
38    /// Empirical testing shows that the space overhead gains rapidly diminish beyond ~1000 rows,
39    /// which is the default row threshold.
40    /// The default byte threshold is set to 8MiB, which is a reasonable unit of work when e.g.
41    /// sending chunks over the network.
42    pub chunk_max_bytes: u64,
43
44    /// What is the threshold, in rows, after which a [`Chunk`] cannot be compacted any further?
45    ///
46    /// This specifically applies to time-sorted chunks.
47    /// See also [`ChunkStoreConfig::chunk_max_rows_if_unsorted`].
48    ///
49    /// This is a multi-dimensional trade-off:
50    /// * Larger chunks lead to less fixed overhead introduced by metadata, indices and such. Good.
51    /// * Larger chunks lead to slower query execution on some unhappy paths. Bad.
52    /// * Larger chunks lead to slower and slower compaction as chunks grow larger. Bad.
53    /// * Larger chunks lead to coarser garbage collection. Good or bad depending on use case.
54    /// * Larger chunks lead to less precision in e.g. the time panel. Bad.
55    ///
56    /// Empirical testing shows that the space overhead gains rapidly diminish beyond ~1000 rows,
57    /// which is the default row threshold.
58    /// The default byte threshold is set to 8MiB, which is a reasonable unit of work when e.g.
59    /// sending chunks over the network.
60    pub chunk_max_rows: u64,
61
62    /// What is the threshold, in rows, after which a [`Chunk`] cannot be compacted any further?
63    ///
64    /// This specifically applies to _non_ time-sorted chunks.
65    /// See also [`ChunkStoreConfig::chunk_max_rows`].
66    ///
67    /// This is a multi-dimensional trade-off:
68    /// * Larger chunks lead to less fixed overhead introduced by metadata, indices and such. Good.
69    /// * Larger chunks lead to slower query execution on some unhappy paths. Bad.
70    /// * Larger chunks lead to slower and slower compaction as chunks grow larger. Bad.
71    /// * Larger chunks lead to coarser garbage collection. Good or bad depending on use case.
72    /// * Larger chunks lead to less precision in e.g. the time panel. Bad.
73    ///
74    /// Empirical testing shows that the space overhead gains rapidly diminish beyond ~1000 rows,
75    /// which is the default row threshold.
76    /// The default byte threshold is set to 8MiB, which is a reasonable unit of work when e.g.
77    /// sending chunks over the network.
78    pub chunk_max_rows_if_unsorted: u64,
79    //
80    // TODO(cmc): It could make sense to have time-range-based thresholds in here, since the time
81    // range covered by a chunk has direct effects on A) the complexity of backward walks and
82    // B) in downstream subscribers (e.g. the precision of the time panel).
83    //
84    // In practice this is highly recording-dependent, and would require either to make it
85    // user-configurable per-recording, or use heuristics to compute it on the fly.
86    //
87    // The added complexity just isn't worth it at the moment.
88    // Maybe at some point.
89}
90
91impl Default for ChunkStoreConfig {
92    #[inline]
93    fn default() -> Self {
94        Self::DEFAULT
95    }
96}
97
98impl re_byte_size::SizeBytes for ChunkStoreConfig {
99    fn heap_size_bytes(&self) -> u64 {
100        0
101    }
102
103    #[inline]
104    fn is_pod() -> bool {
105        true
106    }
107}
108
109impl ChunkStoreConfig {
110    /// Default configuration, applicable to most use cases, according to empirical testing.
111    pub const DEFAULT: Self = Self {
112        enable_changelog: true,
113
114        // This gives us 96 bytes per row (assuming a default limit of 4096 rows), which is enough to
115        // fit a couple scalar columns, a RowId column, a handful of timeline columns, all the
116        // necessary offsets, etc.
117        //
118        // A few megabytes turned out to be way too costly to concatenate in real-time in the
119        // Viewer (see <https://github.com/rerun-io/rerun/issues/7222>).
120        chunk_max_bytes: 12 * 8 * 4096,
121
122        // Empirical testing shows that 4096 is the threshold after which we really start to get
123        // dimishing returns space and compute wise.
124        chunk_max_rows: 4096,
125
126        chunk_max_rows_if_unsorted: 1024,
127    };
128
129    /// [`Self::DEFAULT`], but with compaction entirely disabled.
130    pub const COMPACTION_DISABLED: Self = Self {
131        chunk_max_bytes: 0,
132        chunk_max_rows: 0,
133        chunk_max_rows_if_unsorted: 0,
134        ..Self::DEFAULT
135    };
136
137    /// [`Self::DEFAULT`], but with changelog disabled.
138    pub const CHANGELOG_DISABLED: Self = Self {
139        enable_changelog: false,
140        ..Self::DEFAULT
141    };
142
143    /// All features disabled.
144    pub const ALL_DISABLED: Self = Self {
145        enable_changelog: false,
146        chunk_max_bytes: 0,
147        chunk_max_rows: 0,
148        chunk_max_rows_if_unsorted: 0,
149    };
150
151    /// Environment variable to configure [`Self::enable_changelog`].
152    pub const ENV_STORE_ENABLE_CHANGELOG: &'static str = "RERUN_STORE_ENABLE_CHANGELOG";
153
154    /// Environment variable to configure [`Self::chunk_max_bytes`].
155    pub const ENV_CHUNK_MAX_BYTES: &'static str = "RERUN_CHUNK_MAX_BYTES";
156
157    /// Environment variable to configure [`Self::chunk_max_rows`].
158    pub const ENV_CHUNK_MAX_ROWS: &'static str = "RERUN_CHUNK_MAX_ROWS";
159
160    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
161    //
162    // NOTE: Shared with the same env-var on the batcher side, for consistency.
163    pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
164
165    /// Creates a new `ChunkStoreConfig` using the default values, optionally overridden
166    /// through the environment.
167    ///
168    /// See [`Self::apply_env`].
169    #[inline]
170    pub fn from_env() -> ChunkStoreResult<Self> {
171        Self::default().apply_env()
172    }
173
174    /// Returns a copy of `self`, overriding existing fields with values from the environment if
175    /// they are present.
176    ///
177    /// See [`Self::ENV_STORE_ENABLE_CHANGELOG`], [`Self::ENV_CHUNK_MAX_BYTES`], [`Self::ENV_CHUNK_MAX_ROWS`]
178    /// and [`Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED`].
179    pub fn apply_env(&self) -> ChunkStoreResult<Self> {
180        let mut new = self.clone();
181
182        if let Ok(s) = std::env::var(Self::ENV_STORE_ENABLE_CHANGELOG) {
183            new.enable_changelog = s.parse().map_err(|err| ChunkStoreError::ParseConfig {
184                name: Self::ENV_STORE_ENABLE_CHANGELOG,
185                value: s.clone(),
186                err: Box::new(err),
187            })?;
188        }
189
190        if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_BYTES) {
191            new.chunk_max_bytes = s.parse().map_err(|err| ChunkStoreError::ParseConfig {
192                name: Self::ENV_CHUNK_MAX_BYTES,
193                value: s.clone(),
194                err: Box::new(err),
195            })?;
196        }
197
198        if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS) {
199            new.chunk_max_rows = s.parse().map_err(|err| ChunkStoreError::ParseConfig {
200                name: Self::ENV_CHUNK_MAX_ROWS,
201                value: s.clone(),
202                err: Box::new(err),
203            })?;
204        }
205
206        if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
207            new.chunk_max_rows_if_unsorted =
208                s.parse().map_err(|err| ChunkStoreError::ParseConfig {
209                    name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
210                    value: s.clone(),
211                    err: Box::new(err),
212                })?;
213        }
214
215        Ok(new)
216    }
217}
218
219#[test]
220fn chunk_store_config() {
221    // Detect breaking changes in our environment variables.
222
223    // SAFETY: it's a test
224    #[expect(unsafe_code)]
225    unsafe {
226        std::env::set_var("RERUN_STORE_ENABLE_CHANGELOG", "false");
227        std::env::set_var("RERUN_CHUNK_MAX_BYTES", "42");
228        std::env::set_var("RERUN_CHUNK_MAX_ROWS", "666");
229        std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "999");
230    };
231
232    let config = ChunkStoreConfig::from_env().unwrap();
233
234    let expected = ChunkStoreConfig {
235        enable_changelog: false,
236        chunk_max_bytes: 42,
237        chunk_max_rows: 666,
238        chunk_max_rows_if_unsorted: 999,
239    };
240
241    assert_eq!(expected, config);
242}
243
244// ---
245
246pub type ChunkIdSet = BTreeSet<ChunkId>;
247
248#[derive(Debug, Default, Clone)]
249pub struct ChunkIdSetPerTime {
250    /// Keeps track of the longest interval being currently stored in the two maps below.
251    ///
252    /// This is used to bound the backwards linear walk when looking for overlapping chunks in
253    /// latest-at queries.
254    ///
255    /// This is purely additive: this value is never decremented for any reason, whether it's GC,
256    /// chunk splitting, or whatever else.
257    ///
258    /// See [`ChunkStore::latest_at`] implementation comments for more details.
259    pub(crate) max_interval_length: u64,
260
261    /// *Both physical & virtual* [`ChunkId`]s organized by their _most specific_ start time.
262    ///
263    /// What "most specific" means depends on the context in which the [`ChunkIdSetPerTime`]
264    /// was instantiated, e.g.:
265    /// * For an `(entity, timeline, component)` index, that would be the first timestamp at which this
266    ///   [`Chunk`] contains data for this particular component on this particular timeline (see
267    ///   [`Chunk::time_range_per_component`]).
268    /// * For an `(entity, timeline)` index, that would be the first timestamp at which this [`Chunk`]
269    ///   contains data for any component on this particular timeline (see [`re_chunk::TimeColumn::time_range`]).
270    ///
271    /// This index includes virtual/offloaded chunks, and therefore is purely additive: garbage collection
272    /// will never remove values from this set.
273    pub(crate) per_start_time: BTreeMap<TimeInt, ChunkIdSet>,
274
275    /// *Both physical & virtual* [`ChunkId`]s organized by their _most specific_ end time.
276    ///
277    /// What "most specific" means depends on the context in which the [`ChunkIdSetPerTime`]
278    /// was instantiated, e.g.:
279    /// * For an `(entity, timeline, component)` index, that would be the last timestamp at which this
280    ///   [`Chunk`] contains data for this particular component on this particular timeline (see
281    ///   [`Chunk::time_range_per_component`]).
282    /// * For an `(entity, timeline)` index, that would be the last timestamp at which this [`Chunk`]
283    ///   contains data for any component on this particular timeline (see [`re_chunk::TimeColumn::time_range`]).
284    ///
285    /// This index includes virtual/offloaded chunks, and therefore is purely additive: garbage collection
286    /// will never remove values from this set.
287    pub(crate) per_end_time: BTreeMap<TimeInt, ChunkIdSet>,
288}
289
290impl re_byte_size::SizeBytes for ChunkIdSetPerTime {
291    fn heap_size_bytes(&self) -> u64 {
292        let Self {
293            max_interval_length,
294            per_start_time,
295            per_end_time,
296        } = self;
297
298        max_interval_length.heap_size_bytes()
299            + per_start_time.heap_size_bytes()
300            + per_end_time.heap_size_bytes()
301    }
302}
303
304pub type ChunkIdSetPerTimePerComponent = IntMap<ComponentIdentifier, ChunkIdSetPerTime>;
305
306pub type ChunkIdSetPerTimePerComponentPerTimeline =
307    IntMap<TimelineName, ChunkIdSetPerTimePerComponent>;
308
309pub type ChunkIdSetPerTimePerComponentPerTimelinePerEntity =
310    IntMap<EntityPath, ChunkIdSetPerTimePerComponentPerTimeline>;
311
312pub type ChunkIdPerComponent = IntMap<ComponentIdentifier, ChunkId>;
313
314pub type ChunkIdPerComponentPerEntity = IntMap<EntityPath, ChunkIdPerComponent>;
315
316pub type ChunkIdSetPerTimePerTimeline = IntMap<TimelineName, ChunkIdSetPerTime>;
317
318pub type ChunkIdSetPerTimePerTimelinePerEntity = IntMap<EntityPath, ChunkIdSetPerTimePerTimeline>;
319
320// ---
321
322#[derive(Debug, Clone)]
323pub struct ColumnMetadata {
324    /// Whether this column represents static data.
325    pub is_static: bool,
326
327    /// Whether this column represents a `Clear`-related component.
328    ///
329    /// `Clear`: [`re_types_core::archetypes::Clear`]
330    pub is_tombstone: bool,
331
332    /// Whether this column contains either no data or only contains null and/or empty values (`[]`).
333    pub is_semantically_empty: bool,
334}
335
336/// Internal state that needs to be maintained in order to compute [`ColumnMetadata`].
337#[derive(Debug, Clone)]
338pub struct ColumnMetadataState {
339    /// Whether this column contains either no data or only contains null and/or empty values (`[]`).
340    ///
341    /// This is purely additive: once false, it will always be false. Even in case of garbage
342    /// collection.
343    pub is_semantically_empty: bool,
344}
345
346impl re_byte_size::SizeBytes for ColumnMetadataState {
347    fn heap_size_bytes(&self) -> u64 {
348        let Self {
349            is_semantically_empty,
350        } = self;
351
352        is_semantically_empty.heap_size_bytes()
353    }
354}
355
356/// Incremented on each edit.
357#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
358pub struct ChunkStoreGeneration {
359    insert_id: u64,
360    gc_id: u64,
361}
362
363/// A ref-counted, inner-mutable handle to a [`ChunkStore`].
364///
365/// Cheap to clone.
366///
367/// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see:
368/// * [`ChunkStoreHandle::read_arc`]
369/// * [`ChunkStoreHandle::write_arc`]
370#[derive(Clone)]
371pub struct ChunkStoreHandle(Arc<parking_lot::RwLock<ChunkStore>>);
372
373impl std::fmt::Display for ChunkStoreHandle {
374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375        f.write_fmt(format_args!("{}", self.0.read()))
376    }
377}
378
379impl ChunkStoreHandle {
380    #[inline]
381    pub fn new(store: ChunkStore) -> Self {
382        Self(Arc::new(parking_lot::RwLock::new(store)))
383    }
384
385    #[inline]
386    pub fn into_inner(self) -> Arc<parking_lot::RwLock<ChunkStore>> {
387        self.0
388    }
389}
390
391impl ChunkStoreHandle {
392    #[inline]
393    pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
394        self.0.read_recursive()
395    }
396
397    #[inline]
398    pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, ChunkStore>> {
399        self.0.try_read_recursive()
400    }
401
402    #[inline]
403    pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
404        self.0.write()
405    }
406
407    #[inline]
408    pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, ChunkStore>> {
409        self.0.try_write()
410    }
411
412    #[inline]
413    pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
414        parking_lot::RwLock::read_arc_recursive(&self.0)
415    }
416
417    #[inline]
418    pub fn try_read_arc(
419        &self,
420    ) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore>> {
421        parking_lot::RwLock::try_read_recursive_arc(&self.0)
422    }
423
424    #[inline]
425    pub fn write_arc(
426        &self,
427    ) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
428        parking_lot::RwLock::write_arc(&self.0)
429    }
430
431    #[inline]
432    pub fn try_write_arc(
433        &self,
434    ) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore>> {
435        parking_lot::RwLock::try_write_arc(&self.0)
436    }
437}
438
439/// This keeps track of all missing virtual [`ChunkId`]s and all
440/// used physical [`ChunkId`]s.
441#[derive(Clone, Debug, Default)]
442pub struct QueriedChunkIdTracker {
443    /// Used physical chunks.
444    pub used_physical: HashSet<ChunkId>,
445
446    /// Missing virtual chunks.
447    ///
448    /// Chunks are considered missing when they are required to compute the results of a query, but cannot be
449    /// found in local memory. This set is automatically populated anytime that happens.
450    ///
451    /// Note, these are NOT necessarily _root_ chunks.
452    /// Use [`ChunkStore::find_root_chunks`] to get those.
453    //
454    // TODO(cmc): Once lineage tracking is in place, make sure that this only reports missing
455    // chunks using their root-level IDs, so downstream consumers don't have to redundantly build
456    // their own tracking. And document it so.
457    pub missing_virtual: HashSet<ChunkId>,
458}
459
460impl re_byte_size::SizeBytes for QueriedChunkIdTracker {
461    fn heap_size_bytes(&self) -> u64 {
462        let Self {
463            used_physical,
464            missing_virtual,
465        } = self;
466
467        used_physical.heap_size_bytes() + missing_virtual.heap_size_bytes()
468    }
469}
470
471/// A complete chunk store: covers all timelines, all entities, everything.
472///
473/// The chunk store _always_ works at the chunk level, whether it is for write & read queries or
474/// garbage collection. It is completely oblivious to individual rows.
475///
476/// Use the `Display` implementation for a detailed view of the internals.
477#[derive(Debug)]
478pub struct ChunkStore {
479    pub(crate) id: StoreId,
480
481    /// The configuration of the chunk store (e.g. compaction settings).
482    pub(crate) config: ChunkStoreConfig,
483
484    /// Keeps track of the _latest_ datatype for each time column.
485    ///
486    /// This index is purely additive: it is never affected by garbage collection in any way.
487    ///
488    /// See also [`Self::time_column_type`].
489    pub(crate) time_type_registry: IntMap<TimelineName, TimeType>,
490
491    // TODO(grtlr): Can we slim this map down by getting rid of `ColumnIdentifier`-level here?
492    pub(crate) per_column_metadata: IntMap<
493        EntityPath,
494        IntMap<ComponentIdentifier, (ComponentDescriptor, ColumnMetadataState, ArrowDataType)>,
495    >,
496
497    /// All the *physical* chunks currently loaded in the store, mapped by their respective IDs.
498    ///
499    /// Physical chunks are chunks that are actively loaded into the store's volatile memory.
500    ///
501    /// During garbage collection, physical chunks are offloaded from memory and become virtual
502    /// chunks instead. At the same time, their IDs are removed from this set, which is how we
503    /// distinguish virtual from physical chunks.
504    ///
505    /// Virtual chunks are still indexed by the store, but querying for them will not yield any data,
506    /// just hints that some data is missing and must first be re-inserted by the caller.
507    pub(crate) physical_chunks_per_chunk_id: BTreeMap<ChunkId, Arc<Chunk>>,
508
509    /// All *physical* [`ChunkId`]s currently in the store, indexed by the smallest [`RowId`] in
510    /// each of them.
511    ///
512    /// This is effectively all chunks in global data order. Used for garbage collection.
513    ///
514    /// During garbage collection, physical chunks are offloaded from memory and become virtual
515    /// chunks instead. At the same time, their IDs are removed from this set, which is how we
516    /// distinguish virtual from physical chunks.
517    pub(crate) physical_chunk_ids_per_min_row_id: BTreeMap<RowId, ChunkId>,
518
519    /// Keeps track of where each individual chunks, both virtual & physical, came from.
520    ///
521    /// Due to compaction, a chunk's lineage often forms a tree rather than a straight line.
522    /// The lineage tree always ends in one of two ways:
523    /// * A reference to volatile memory, from which the chunk came from, and that cannot ever be
524    ///   reached again.
525    /// * A reference to an RRD manifest, from which the chunk was virtually loaded from, and where
526    ///   it can still be reached, provided that the associated Redap server still exists.
527    ///
528    /// This is purely additive: never garbage collected.
529    pub(crate) chunks_lineage: HashMap<ChunkId, ChunkDirectLineage>,
530
531    /// Anytime a chunk gets split during insertion, this is recorded here.
532    ///
533    /// The key is the ID of the source chunk, before splitting, which never made it into the store.
534    /// The values are the IDs of the resulting split chunks, which were actually inserted.
535    ///
536    /// Splitting cannot be recursive, and therefore there is never any requirement to traverse
537    /// this datastructure recursively.
538    ///
539    /// So why is this useful? We use this data on the write path in order to detect when a chunk that
540    /// was previously inserted, and split into smaller chunks, is being inserted *again*, e.g. because
541    /// it had been offloaded due to memory pressure and is now making a comeback.
542    /// What might happen in these sort of scenarios, is that some of the resulting splits were
543    /// garbage collected away, but not all of them, and now we end up with tiny overlaps all over
544    /// the store which, while they don't impact semantics in any way, are annoying for at least 2 reasons:
545    /// * performance of the query engine
546    /// * hard to reason about for downstream consumers building secondary datastructures (e.g. video cache)
547    ///
548    /// `HashMap<OriginalChunkId, SplitChunkIds>`
549    pub(crate) dangling_splits: HashMap<ChunkId, Vec<ChunkId>>,
550
551    /// All chunks that were split on-ingestion.
552    ///
553    /// This is like [`Self::dangling_splits`], but is only ever added to.
554    ///
555    /// This is only used for sanity checks.
556    pub(crate) split_on_ingest: HashSet<ChunkId>,
557
558    /// Anytime a chunk gets compacted with another during insertion, this is recorded here.
559    ///
560    /// The key can be either one of two things:
561    /// * The ID of an already stored physical chunk, that was elected for compaction.
562    /// * The ID of the chunk being inserted, before compaction, which never made it into the store.
563    ///
564    /// The value is the ID of the resulting compacted chunk, which was actually inserted.
565    ///
566    /// Compaction is a recursive process: you should probably traverse this datastructure *recursively*.
567    ///
568    /// So why is this useful? We use this data on the write path in order to detect when a chunk that
569    /// was previously inserted, and (potentially recursively) compacted with another chunk, is being
570    /// inserted *again*, e.g. because it had been offloaded due to memory pressure and is now making a comeback.
571    /// When that happens, the data for that chunk would effectively be duplicated across the chunk and
572    /// the pre-existing compacted data.
573    /// While that doesn't impact semantics in any way, it's still annoying for at least 2 reasons:
574    /// * performance of the query engine
575    /// * hard to reason about for downstream consumers building secondary datastructures (e.g. video cache)
576    ///
577    /// This is purely additive: never garbage collected.
578    ///
579    /// `HashMap<OriginalChunkId, CompactedChunkId>`
580    pub(crate) leaky_compactions: HashMap<ChunkId, ChunkId>,
581
582    /// All *physical & virtual* temporal [`ChunkId`]s for all entities on all timelines, further
583    /// indexed by [`ComponentIdentifier`].
584    ///
585    /// This index is purely additive: it is never affected by garbage collection in any way.
586    /// This implies that the chunk IDs present in this set might be either physical/loaded or
587    /// virtual/offloaded.
588    /// When leveraging this index, make sure you understand whether you expect loaded chunks,
589    /// unloaded chunks, or both. Leverage [`Self::physical_chunks_per_chunk_id`] to know which is which.
590    ///
591    /// See also:
592    /// * [`Self::temporal_chunk_ids_per_entity`].
593    /// * [`Self::static_chunk_ids_per_entity`].
594    pub(crate) temporal_chunk_ids_per_entity_per_component:
595        ChunkIdSetPerTimePerComponentPerTimelinePerEntity,
596
597    /// All *physical & virtual* temporal [`ChunkId`]s for all entities on all timelines, without the
598    /// [`ComponentType`] index.
599    ///
600    /// This index is purely additive: it is never affected by garbage collection in any way.
601    /// This implies that the chunk IDs present in this set might be either physical/loaded or
602    /// virtual/offloaded.
603    /// When leveraging this index, make sure you understand whether you expect loaded chunks,
604    /// unloaded chunks, or both. Leverage [`Self::physical_chunks_per_chunk_id`] to know which is which.
605    ///
606    /// See also:
607    /// * [`Self::temporal_chunk_ids_per_entity_per_component`].
608    /// * [`Self::static_chunk_ids_per_entity`].
609    pub(crate) temporal_chunk_ids_per_entity: ChunkIdSetPerTimePerTimelinePerEntity,
610
611    /// Accumulated size statitistics for all *physical* temporal [`Chunk`]s currently present in the store.
612    ///
613    /// This is too costly to be computed from scratch every frame, and therefore materialized here.
614    ///
615    /// *This exclusively covers physical/loaded chunks*. During GC, these statistics are decremented
616    /// as you'd expect.
617    pub(crate) temporal_physical_chunks_stats: ChunkStoreChunkStats,
618
619    /// Static data. Never garbage collected.
620    ///
621    /// Static data unconditionally shadows temporal data at query time.
622    ///
623    /// Existing temporal will not be removed. Events won't be fired.
624    pub(crate) static_chunk_ids_per_entity: ChunkIdPerComponentPerEntity,
625
626    /// Accumulated size statitistics for all *physical* static [`Chunk`]s currently present in the store.
627    ///
628    /// This is too costly to be computed from scratch every frame, and is therefore materialized here.
629    pub(crate) static_chunks_stats: ChunkStoreChunkStats,
630
631    /// Calling [`ChunkStore::take_tracked_chunk_ids`] will atomically return the contents of this
632    /// struct as well as clearing it.
633    pub(crate) queried_chunk_id_tracker: RwLock<QueriedChunkIdTracker>,
634
635    /// Monotonically increasing ID for insertions.
636    pub(crate) insert_id: u64,
637
638    /// Monotonically increasing ID for GCs.
639    pub(crate) gc_id: u64,
640
641    /// Monotonically increasing ID for store events.
642    pub(crate) event_id: AtomicU64,
643}
644
645impl Drop for ChunkStore {
646    fn drop(&mut self) {
647        // First and foremost, notify per-store subscribers that an entire store was just dropped,
648        // and therefore they can just drop entire chunks of their own state.
649        Self::drop_per_store_subscribers(&self.id());
650
651        if self.config.enable_changelog {
652            // Then, if the changelog is enabled, trigger a full GC: this will notify all remaining
653            // subscribers of all the chunks that were dropped by dropping the store itself.
654            _ = self.gc(&crate::GarbageCollectionOptions::gc_everything());
655        }
656    }
657}
658
659impl Clone for ChunkStore {
660    #[inline]
661    fn clone(&self) -> Self {
662        re_tracing::profile_function!();
663        Self {
664            id: self.id.clone(),
665            config: self.config.clone(),
666            time_type_registry: self.time_type_registry.clone(),
667            per_column_metadata: self.per_column_metadata.clone(),
668            physical_chunks_per_chunk_id: self.physical_chunks_per_chunk_id.clone(),
669            chunks_lineage: self.chunks_lineage.clone(),
670            dangling_splits: self.dangling_splits.clone(),
671            split_on_ingest: self.split_on_ingest.clone(),
672            leaky_compactions: self.leaky_compactions.clone(),
673            physical_chunk_ids_per_min_row_id: self.physical_chunk_ids_per_min_row_id.clone(),
674            temporal_chunk_ids_per_entity_per_component: self
675                .temporal_chunk_ids_per_entity_per_component
676                .clone(),
677            temporal_chunk_ids_per_entity: self.temporal_chunk_ids_per_entity.clone(),
678            temporal_physical_chunks_stats: self.temporal_physical_chunks_stats,
679            static_chunk_ids_per_entity: self.static_chunk_ids_per_entity.clone(),
680            static_chunks_stats: self.static_chunks_stats,
681            queried_chunk_id_tracker: Default::default(),
682            insert_id: Default::default(),
683            gc_id: Default::default(),
684            event_id: Default::default(),
685        }
686    }
687}
688
689impl std::fmt::Display for ChunkStore {
690    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691        let Self {
692            id,
693            config,
694            time_type_registry: _,
695            per_column_metadata: _,
696            physical_chunks_per_chunk_id: chunks_per_chunk_id,
697            physical_chunk_ids_per_min_row_id: chunk_ids_per_min_row_id,
698            chunks_lineage,
699            dangling_splits: _,
700            split_on_ingest: _,
701            leaky_compactions: _,
702            temporal_chunk_ids_per_entity_per_component: _,
703            temporal_chunk_ids_per_entity: _,
704            temporal_physical_chunks_stats,
705            static_chunk_ids_per_entity: _,
706            static_chunks_stats,
707            queried_chunk_id_tracker: _,
708            insert_id: _,
709            gc_id: _,
710            event_id: _,
711        } = self;
712
713        f.write_str("ChunkStore {\n")?;
714
715        f.write_str(&indent::indent_all_by(4, format!("id: {id:?}\n")))?;
716        f.write_str(&indent::indent_all_by(4, format!("config: {config:?}\n")))?;
717
718        f.write_str(&indent::indent_all_by(4, "stats: {\n"))?;
719        f.write_str(&indent::indent_all_by(
720            8,
721            format!("{}", *static_chunks_stats + *temporal_physical_chunks_stats),
722        ))?;
723        f.write_str(&indent::indent_all_by(4, "}\n"))?;
724
725        f.write_str(&indent::indent_all_by(4, "physical chunks: [\n"))?;
726        for chunk_id in chunk_ids_per_min_row_id.values() {
727            if let Some(chunk) = chunks_per_chunk_id.get(chunk_id) {
728                f.write_str(&indent::indent_all_by(
729                    8,
730                    format!("{}\n", self.format_lineage(chunk_id)),
731                ))?;
732
733                if let Some(width) = f.width() {
734                    let chunk_width = width.saturating_sub(8);
735                    f.write_str(&indent::indent_all_by(8, format!("{chunk:chunk_width$}\n")))?;
736                } else {
737                    f.write_str(&indent::indent_all_by(8, format!("{chunk}\n")))?;
738                }
739            } else {
740                f.write_str(&indent::indent_all_by(8, "<not_found>\n"))?;
741            }
742        }
743        f.write_str(&indent::indent_all_by(4, "]\n"))?;
744
745        f.write_str(&indent::indent_all_by(4, "virtual chunks: [\n"))?;
746        for chunk_id in chunks_lineage.keys().sorted() {
747            if chunks_per_chunk_id.contains_key(chunk_id) {
748                continue;
749            }
750
751            f.write_str(&indent::indent_all_by(
752                8,
753                format!("{}\n", self.format_lineage(chunk_id)),
754            ))?;
755        }
756        f.write_str(&indent::indent_all_by(4, "]\n"))?;
757
758        f.write_str("}")?;
759
760        Ok(())
761    }
762}
763
764// ---
765
766impl ChunkStore {
767    /// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
768    ///
769    /// See also:
770    /// * [`ChunkStore::new`]
771    /// * [`ChunkStore::from_rrd_filepath`]
772    #[inline]
773    pub fn new(id: StoreId, config: ChunkStoreConfig) -> Self {
774        Self {
775            id,
776            config,
777            time_type_registry: Default::default(),
778            per_column_metadata: Default::default(),
779            physical_chunk_ids_per_min_row_id: Default::default(),
780            chunks_lineage: Default::default(),
781            dangling_splits: Default::default(),
782            split_on_ingest: Default::default(),
783            leaky_compactions: Default::default(),
784            physical_chunks_per_chunk_id: Default::default(),
785            temporal_chunk_ids_per_entity_per_component: Default::default(),
786            temporal_chunk_ids_per_entity: Default::default(),
787            temporal_physical_chunks_stats: Default::default(),
788            static_chunk_ids_per_entity: Default::default(),
789            static_chunks_stats: Default::default(),
790            queried_chunk_id_tracker: Default::default(),
791            insert_id: 0,
792            gc_id: 0,
793            event_id: AtomicU64::new(0),
794        }
795    }
796
797    /// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
798    ///
799    /// Pre-wraps the result in a [`ChunkStoreHandle`].
800    ///
801    /// See also:
802    /// * [`ChunkStore::from_rrd_filepath`]
803    #[inline]
804    pub fn new_handle(id: StoreId, config: ChunkStoreConfig) -> ChunkStoreHandle {
805        ChunkStoreHandle::new(Self::new(id, config))
806    }
807
808    #[inline]
809    pub fn id(&self) -> StoreId {
810        self.id.clone()
811    }
812
813    /// Return the current [`ChunkStoreGeneration`]. This can be used to determine whether the
814    /// database has been modified since the last time it was queried.
815    #[inline]
816    pub fn generation(&self) -> ChunkStoreGeneration {
817        ChunkStoreGeneration {
818            insert_id: self.insert_id,
819            gc_id: self.gc_id,
820        }
821    }
822
823    /// See [`ChunkStoreConfig`] for more information about configuration.
824    #[inline]
825    pub fn config(&self) -> &ChunkStoreConfig {
826        &self.config
827    }
828
829    /// Iterate over all *physical* chunks in the store, in ascending [`ChunkId`] order.
830    #[inline]
831    pub fn iter_physical_chunks(&self) -> impl Iterator<Item = &Arc<Chunk>> + '_ {
832        self.physical_chunks_per_chunk_id.values()
833    }
834
835    /// Get a *physical* chunk based on its ID.
836    #[inline]
837    pub fn physical_chunk(&self, physical_chunk_id: &ChunkId) -> Option<&Arc<Chunk>> {
838        self.physical_chunks_per_chunk_id.get(physical_chunk_id)
839    }
840
841    /// Get a *physical* chunk based on its ID and track the chunk as either
842    /// used or missing, to signal that it should be kept or fetched.
843    #[track_caller]
844    pub fn use_physical_chunk_or_report_missing(&self, id: &ChunkId) -> Option<&Arc<Chunk>> {
845        debug_assert!(
846            !self.split_on_ingest.contains(id),
847            "Asked for a physical chunk, but this chunk was split on ingestion and was never physical: {id}"
848        );
849
850        let chunk = self.physical_chunk(id);
851
852        if chunk.is_some() {
853            self.report_used_physical_chunk_id(*id);
854        } else {
855            self.report_missing_virtual_chunk_id(*id);
856        }
857
858        chunk
859    }
860
861    /// Get the number of *physical* chunks in the store.
862    #[inline]
863    pub fn num_physical_chunks(&self) -> usize {
864        self.physical_chunks_per_chunk_id.len()
865    }
866
867    /// All the currently loaded chunks
868    pub fn physical_chunks(&self) -> impl Iterator<Item = &Arc<Chunk>> + '_ {
869        self.physical_chunks_per_chunk_id.values()
870    }
871
872    /// Lookup the _latest_ [`TimeType`] used by a specific [`TimelineName`].
873    #[inline]
874    pub fn time_column_type(&self, timeline_name: &TimelineName) -> Option<TimeType> {
875        self.time_type_registry.get(timeline_name).copied()
876    }
877
878    /// Lookup the [`ColumnMetadata`] for a specific [`EntityPath`] and [`re_types_core::Component`].
879    pub fn lookup_column_metadata(
880        &self,
881        entity_path: &EntityPath,
882        component: ComponentIdentifier,
883    ) -> Option<ColumnMetadata> {
884        let ColumnMetadataState {
885            is_semantically_empty,
886        } = self
887            .per_column_metadata
888            .get(entity_path)
889            .and_then(|per_identifier| per_identifier.get(&component))
890            .map(|(_, metadata_state, _)| metadata_state)?;
891
892        let is_static = self
893            .static_chunk_ids_per_entity
894            .get(entity_path)
895            .is_some_and(|per_component| per_component.get(&component).is_some());
896
897        use re_types_core::Archetype as _;
898        let is_tombstone = re_types_core::archetypes::Clear::all_components()
899            .iter()
900            .any(|descr| descr.component == component);
901
902        Some(ColumnMetadata {
903            is_static,
904            is_tombstone,
905            is_semantically_empty: *is_semantically_empty,
906        })
907    }
908
909    /// Get the [`ComponentType`] and [`ArrowDataType`] for a specific [`EntityPath`] and [`ComponentIdentifier`].
910    pub fn lookup_component_type(
911        &self,
912        entity_path: &EntityPath,
913        component: ComponentIdentifier,
914    ) -> Option<(Option<ComponentType>, ArrowDataType)> {
915        let (component_descr, _, datatype) = self
916            .per_column_metadata
917            .get(entity_path)
918            .and_then(|per_identifier| per_identifier.get(&component))?;
919        Some((component_descr.component_type, datatype.clone()))
920    }
921
922    /// Checks whether any column in the store with the given [`ComponentType`] has a datatype
923    /// that differs from `expected_datatype`.
924    ///
925    /// This iterates over all entities, so it should not be called in a hot path.
926    pub fn has_mismatched_datatype_for_component_type(
927        &self,
928        component_type: &ComponentType,
929        expected_datatype: &ArrowDataType,
930    ) -> Option<&ArrowDataType> {
931        for per_component in self.per_column_metadata.values() {
932            for (descr, _, datatype) in per_component.values() {
933                if descr.component_type.as_ref() == Some(component_type)
934                    && datatype != expected_datatype
935                {
936                    return Some(datatype);
937                }
938            }
939        }
940        None
941    }
942
943    /// Returns and iterator over [`ChunkId`]s that were detected as
944    /// used or missing since the last time since method was called.
945    ///
946    /// Chunks are considered missing when they are required to compute the results of a query, but cannot be
947    /// found in local memory.
948    ///
949    /// Calling this method is destructive: the internal set is cleared on every call, and will grow back as
950    /// new queries are run.
951    /// Callers are expected to call this once per frame in order to know which chunks were missing during
952    /// the previous frame.
953    ///
954    /// The returned [`ChunkId`]s can live anywhere within the lineage tree, and therefore might
955    /// not be usable for downstream consumers that did not track even compaction/split-off events.
956    /// Use [`Self::find_root_chunks`] to find the original chunks that those IDs descended from.
957    pub fn take_tracked_chunk_ids(&self) -> QueriedChunkIdTracker {
958        std::mem::take(&mut self.queried_chunk_id_tracker.write())
959    }
960
961    /// See [`Self::take_tracked_chunk_ids`] for more details.
962    pub fn tracked_chunk_ids(&self) -> QueriedChunkIdTracker {
963        self.queried_chunk_id_tracker.read().clone()
964    }
965
966    /// Signal that the chunk was used and should not be evicted by gc.
967    pub fn report_used_physical_chunk_id(&self, chunk_id: ChunkId) {
968        debug_assert!(self.physical_chunk(&chunk_id).is_some());
969
970        self.queried_chunk_id_tracker
971            .write()
972            .used_physical
973            .insert(chunk_id);
974    }
975
976    /// Signal that a chunk is missing and should be fetched when possible.
977    #[track_caller]
978    pub fn report_missing_virtual_chunk_id(&self, chunk_id: ChunkId) {
979        debug_assert!(
980            self.chunks_lineage.contains_key(&chunk_id),
981            "A chunk was reported missing, with no known lineage: {chunk_id}"
982        );
983        if self.split_on_ingest.contains(&chunk_id) {
984            if cfg!(debug_assertions) {
985                re_log::warn_once!(
986                    "Tried to report a chunk missing that was the source of a split (manual)"
987                );
988            }
989            re_log::debug_once!(
990                "Tried to report a chunk missing that was the source of a split: {chunk_id} (manual)"
991            );
992        }
993
994        self.queried_chunk_id_tracker
995            .write()
996            .missing_virtual
997            .insert(chunk_id);
998    }
999
1000    /// How many missing chunk IDs are currently registered?
1001    ///
1002    /// See also [`ChunkStore::take_tracked_chunk_ids`].
1003    pub fn num_missing_chunk_ids(&self) -> usize {
1004        self.queried_chunk_id_tracker.read().missing_virtual.len()
1005    }
1006}
1007
1008// ---
1009
1010impl ChunkStore {
1011    /// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
1012    ///
1013    /// The stores will be prefilled with the data at the specified path.
1014    ///
1015    /// See also:
1016    /// * [`ChunkStore::new`]
1017    #[cfg(not(target_arch = "wasm32"))]
1018    pub fn from_rrd_filepath(
1019        store_config: &ChunkStoreConfig,
1020        path_to_rrd: impl AsRef<std::path::Path>,
1021    ) -> anyhow::Result<BTreeMap<StoreId, Self>> {
1022        let path_to_rrd = path_to_rrd.as_ref();
1023
1024        re_tracing::profile_function!(path_to_rrd.to_string_lossy());
1025
1026        use anyhow::Context as _;
1027
1028        let mut stores = BTreeMap::new();
1029
1030        let rrd_file = std::fs::File::open(path_to_rrd)
1031            .with_context(|| format!("couldn't open {path_to_rrd:?}"))?;
1032
1033        let decoder = re_log_encoding::Decoder::decode_eager(std::io::BufReader::new(rrd_file))
1034            .with_context(|| format!("couldn't decode {path_to_rrd:?}"))?;
1035
1036        // TODO(cmc): offload the decoding to a background thread.
1037        for res in decoder {
1038            let msg = res.with_context(|| format!("couldn't decode message {path_to_rrd:?}"))?;
1039            match msg {
1040                re_log_types::LogMsg::SetStoreInfo(info) => {
1041                    stores.entry(info.info.store_id.clone()).or_insert_with(|| {
1042                        Self::new(info.info.store_id.clone(), store_config.clone())
1043                    });
1044                }
1045
1046                re_log_types::LogMsg::ArrowMsg(store_id, msg) => {
1047                    let Some(store) = stores.get_mut(&store_id) else {
1048                        anyhow::bail!("unknown store ID: {store_id:?}");
1049                    };
1050
1051                    let chunk = Chunk::from_arrow_msg(&msg)
1052                        .with_context(|| format!("couldn't decode chunk {path_to_rrd:?}"))?;
1053
1054                    store
1055                        .insert_chunk(&Arc::new(chunk))
1056                        .with_context(|| format!("couldn't insert chunk {path_to_rrd:?}"))?;
1057                }
1058
1059                re_log_types::LogMsg::BlueprintActivationCommand(_) => {}
1060            }
1061        }
1062
1063        Ok(stores)
1064    }
1065
1066    /// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
1067    ///
1068    /// The stores will be prefilled with the data in the given `log_msgs`.
1069    ///
1070    /// See also:
1071    /// * [`ChunkStore::new`]
1072    pub fn from_log_msgs(
1073        store_config: &ChunkStoreConfig,
1074        log_msgs: impl IntoIterator<Item = re_log_types::LogMsg>,
1075    ) -> anyhow::Result<BTreeMap<StoreId, Self>> {
1076        re_tracing::profile_function!();
1077
1078        use anyhow::Context as _;
1079
1080        let mut stores = BTreeMap::new();
1081
1082        // TODO(cmc): offload the decoding to a background thread.
1083        let log_msgs = log_msgs.into_iter();
1084        for msg in log_msgs {
1085            match msg {
1086                re_log_types::LogMsg::SetStoreInfo(info) => {
1087                    stores.entry(info.info.store_id.clone()).or_insert_with(|| {
1088                        Self::new(info.info.store_id.clone(), store_config.clone())
1089                    });
1090                }
1091
1092                re_log_types::LogMsg::ArrowMsg(store_id, msg) => {
1093                    let Some(store) = stores.get_mut(&store_id) else {
1094                        anyhow::bail!("unknown store ID: {store_id:?}");
1095                    };
1096
1097                    let chunk = Chunk::from_arrow_msg(&msg)
1098                        .with_context(|| "couldn't decode chunk".to_owned())?;
1099
1100                    store
1101                        .insert_chunk(&Arc::new(chunk))
1102                        .with_context(|| "couldn't insert chunk".to_owned())?;
1103                }
1104
1105                re_log_types::LogMsg::BlueprintActivationCommand(_) => {}
1106            }
1107        }
1108
1109        Ok(stores)
1110    }
1111
1112    /// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
1113    ///
1114    /// Wraps the results in [`ChunkStoreHandle`]s.
1115    ///
1116    /// The stores will be prefilled with the data at the specified path.
1117    ///
1118    /// See also:
1119    /// * [`ChunkStore::new_handle`]
1120    #[cfg(not(target_arch = "wasm32"))]
1121    pub fn handle_from_rrd_filepath(
1122        store_config: &ChunkStoreConfig,
1123        path_to_rrd: impl AsRef<std::path::Path>,
1124    ) -> anyhow::Result<BTreeMap<StoreId, ChunkStoreHandle>> {
1125        Ok(Self::from_rrd_filepath(store_config, path_to_rrd)?
1126            .into_iter()
1127            .map(|(store_id, store)| (store_id, ChunkStoreHandle::new(store)))
1128            .collect())
1129    }
1130}