re_chunk_store/store.rs
1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::sync::atomic::AtomicU64;
4
5use ahash::{HashMap, HashSet};
6use arrow::datatypes::DataType as ArrowDataType;
7use itertools::Itertools as _;
8use nohash_hasher::IntMap;
9use parking_lot::RwLock;
10use re_log::debug_assert;
11
12use re_chunk::{Chunk, ChunkId, ComponentIdentifier, RowId, TimelineName};
13use re_log_types::{EntityPath, StoreId, TimeInt, TimeType};
14use re_types_core::{ComponentDescriptor, ComponentType};
15
16use crate::{ChunkDirectLineage, ChunkStoreChunkStats, ChunkStoreError, ChunkStoreResult};
17
18// ---
19
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct ChunkStoreConfig {
22 /// If `true` (the default), the store will emit events when its contents are modified in
23 /// any way (insertion, GC), that can be subscribed to.
24 ///
25 /// Leaving this disabled can lead to major performance improvements on the ingestion path
26 /// in some workloads, provided that the subscribers aren't needed (e.g. headless mode).
27 pub enable_changelog: bool,
28
29 /// What is the threshold, in bytes, after which a [`Chunk`] cannot be compacted any further?
30 ///
31 /// This is a multi-dimensional trade-off:
32 /// * Larger chunks lead to less fixed overhead introduced by metadata, indices and such. Good.
33 /// * Larger chunks lead to slower query execution on some unhappy paths. Bad.
34 /// * Larger chunks lead to slower and slower compaction as chunks grow larger. Bad.
35 /// * Larger chunks lead to coarser garbage collection. Good or bad depending on use case.
36 /// * Larger chunks lead to less precision in e.g. the time panel. Bad.
37 ///
38 /// Empirical testing shows that the space overhead gains rapidly diminish beyond ~1000 rows,
39 /// which is the default row threshold.
40 /// The default byte threshold is set to 8MiB, which is a reasonable unit of work when e.g.
41 /// sending chunks over the network.
42 pub chunk_max_bytes: u64,
43
44 /// What is the threshold, in rows, after which a [`Chunk`] cannot be compacted any further?
45 ///
46 /// This specifically applies to time-sorted chunks.
47 /// See also [`ChunkStoreConfig::chunk_max_rows_if_unsorted`].
48 ///
49 /// This is a multi-dimensional trade-off:
50 /// * Larger chunks lead to less fixed overhead introduced by metadata, indices and such. Good.
51 /// * Larger chunks lead to slower query execution on some unhappy paths. Bad.
52 /// * Larger chunks lead to slower and slower compaction as chunks grow larger. Bad.
53 /// * Larger chunks lead to coarser garbage collection. Good or bad depending on use case.
54 /// * Larger chunks lead to less precision in e.g. the time panel. Bad.
55 ///
56 /// Empirical testing shows that the space overhead gains rapidly diminish beyond ~1000 rows,
57 /// which is the default row threshold.
58 /// The default byte threshold is set to 8MiB, which is a reasonable unit of work when e.g.
59 /// sending chunks over the network.
60 pub chunk_max_rows: u64,
61
62 /// What is the threshold, in rows, after which a [`Chunk`] cannot be compacted any further?
63 ///
64 /// This specifically applies to _non_ time-sorted chunks.
65 /// See also [`ChunkStoreConfig::chunk_max_rows`].
66 ///
67 /// This is a multi-dimensional trade-off:
68 /// * Larger chunks lead to less fixed overhead introduced by metadata, indices and such. Good.
69 /// * Larger chunks lead to slower query execution on some unhappy paths. Bad.
70 /// * Larger chunks lead to slower and slower compaction as chunks grow larger. Bad.
71 /// * Larger chunks lead to coarser garbage collection. Good or bad depending on use case.
72 /// * Larger chunks lead to less precision in e.g. the time panel. Bad.
73 ///
74 /// Empirical testing shows that the space overhead gains rapidly diminish beyond ~1000 rows,
75 /// which is the default row threshold.
76 /// The default byte threshold is set to 8MiB, which is a reasonable unit of work when e.g.
77 /// sending chunks over the network.
78 pub chunk_max_rows_if_unsorted: u64,
79 //
80 // TODO(cmc): It could make sense to have time-range-based thresholds in here, since the time
81 // range covered by a chunk has direct effects on A) the complexity of backward walks and
82 // B) in downstream subscribers (e.g. the precision of the time panel).
83 //
84 // In practice this is highly recording-dependent, and would require either to make it
85 // user-configurable per-recording, or use heuristics to compute it on the fly.
86 //
87 // The added complexity just isn't worth it at the moment.
88 // Maybe at some point.
89}
90
91impl Default for ChunkStoreConfig {
92 #[inline]
93 fn default() -> Self {
94 Self::DEFAULT
95 }
96}
97
98impl re_byte_size::SizeBytes for ChunkStoreConfig {
99 fn heap_size_bytes(&self) -> u64 {
100 0
101 }
102
103 #[inline]
104 fn is_pod() -> bool {
105 true
106 }
107}
108
109impl ChunkStoreConfig {
110 /// Default configuration, applicable to most use cases, according to empirical testing.
111 pub const DEFAULT: Self = Self {
112 enable_changelog: true,
113
114 // This gives us 96 bytes per row (assuming a default limit of 4096 rows), which is enough to
115 // fit a couple scalar columns, a RowId column, a handful of timeline columns, all the
116 // necessary offsets, etc.
117 //
118 // A few megabytes turned out to be way too costly to concatenate in real-time in the
119 // Viewer (see <https://github.com/rerun-io/rerun/issues/7222>).
120 chunk_max_bytes: 12 * 8 * 4096,
121
122 // Empirical testing shows that 4096 is the threshold after which we really start to get
123 // dimishing returns space and compute wise.
124 chunk_max_rows: 4096,
125
126 chunk_max_rows_if_unsorted: 1024,
127 };
128
129 /// [`Self::DEFAULT`], but with compaction entirely disabled.
130 pub const COMPACTION_DISABLED: Self = Self {
131 chunk_max_bytes: 0,
132 chunk_max_rows: 0,
133 chunk_max_rows_if_unsorted: 0,
134 ..Self::DEFAULT
135 };
136
137 /// [`Self::DEFAULT`], but with changelog disabled.
138 pub const CHANGELOG_DISABLED: Self = Self {
139 enable_changelog: false,
140 ..Self::DEFAULT
141 };
142
143 /// All features disabled.
144 pub const ALL_DISABLED: Self = Self {
145 enable_changelog: false,
146 chunk_max_bytes: 0,
147 chunk_max_rows: 0,
148 chunk_max_rows_if_unsorted: 0,
149 };
150
151 /// Environment variable to configure [`Self::enable_changelog`].
152 pub const ENV_STORE_ENABLE_CHANGELOG: &'static str = "RERUN_STORE_ENABLE_CHANGELOG";
153
154 /// Environment variable to configure [`Self::chunk_max_bytes`].
155 pub const ENV_CHUNK_MAX_BYTES: &'static str = "RERUN_CHUNK_MAX_BYTES";
156
157 /// Environment variable to configure [`Self::chunk_max_rows`].
158 pub const ENV_CHUNK_MAX_ROWS: &'static str = "RERUN_CHUNK_MAX_ROWS";
159
160 /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
161 //
162 // NOTE: Shared with the same env-var on the batcher side, for consistency.
163 pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
164
165 /// Creates a new `ChunkStoreConfig` using the default values, optionally overridden
166 /// through the environment.
167 ///
168 /// See [`Self::apply_env`].
169 #[inline]
170 pub fn from_env() -> ChunkStoreResult<Self> {
171 Self::default().apply_env()
172 }
173
174 /// Returns a copy of `self`, overriding existing fields with values from the environment if
175 /// they are present.
176 ///
177 /// See [`Self::ENV_STORE_ENABLE_CHANGELOG`], [`Self::ENV_CHUNK_MAX_BYTES`], [`Self::ENV_CHUNK_MAX_ROWS`]
178 /// and [`Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED`].
179 pub fn apply_env(&self) -> ChunkStoreResult<Self> {
180 let mut new = self.clone();
181
182 if let Ok(s) = std::env::var(Self::ENV_STORE_ENABLE_CHANGELOG) {
183 new.enable_changelog = s.parse().map_err(|err| ChunkStoreError::ParseConfig {
184 name: Self::ENV_STORE_ENABLE_CHANGELOG,
185 value: s.clone(),
186 err: Box::new(err),
187 })?;
188 }
189
190 if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_BYTES) {
191 new.chunk_max_bytes = s.parse().map_err(|err| ChunkStoreError::ParseConfig {
192 name: Self::ENV_CHUNK_MAX_BYTES,
193 value: s.clone(),
194 err: Box::new(err),
195 })?;
196 }
197
198 if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS) {
199 new.chunk_max_rows = s.parse().map_err(|err| ChunkStoreError::ParseConfig {
200 name: Self::ENV_CHUNK_MAX_ROWS,
201 value: s.clone(),
202 err: Box::new(err),
203 })?;
204 }
205
206 if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
207 new.chunk_max_rows_if_unsorted =
208 s.parse().map_err(|err| ChunkStoreError::ParseConfig {
209 name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
210 value: s.clone(),
211 err: Box::new(err),
212 })?;
213 }
214
215 Ok(new)
216 }
217}
218
219#[test]
220fn chunk_store_config() {
221 // Detect breaking changes in our environment variables.
222
223 // SAFETY: it's a test
224 #[expect(unsafe_code)]
225 unsafe {
226 std::env::set_var("RERUN_STORE_ENABLE_CHANGELOG", "false");
227 std::env::set_var("RERUN_CHUNK_MAX_BYTES", "42");
228 std::env::set_var("RERUN_CHUNK_MAX_ROWS", "666");
229 std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "999");
230 };
231
232 let config = ChunkStoreConfig::from_env().unwrap();
233
234 let expected = ChunkStoreConfig {
235 enable_changelog: false,
236 chunk_max_bytes: 42,
237 chunk_max_rows: 666,
238 chunk_max_rows_if_unsorted: 999,
239 };
240
241 assert_eq!(expected, config);
242}
243
244// ---
245
246pub type ChunkIdSet = BTreeSet<ChunkId>;
247
248#[derive(Debug, Default, Clone)]
249pub struct ChunkIdSetPerTime {
250 /// Keeps track of the longest interval being currently stored in the two maps below.
251 ///
252 /// This is used to bound the backwards linear walk when looking for overlapping chunks in
253 /// latest-at queries.
254 ///
255 /// This is purely additive: this value is never decremented for any reason, whether it's GC,
256 /// chunk splitting, or whatever else.
257 ///
258 /// See [`ChunkStore::latest_at`] implementation comments for more details.
259 pub(crate) max_interval_length: u64,
260
261 /// *Both physical & virtual* [`ChunkId`]s organized by their _most specific_ start time.
262 ///
263 /// What "most specific" means depends on the context in which the [`ChunkIdSetPerTime`]
264 /// was instantiated, e.g.:
265 /// * For an `(entity, timeline, component)` index, that would be the first timestamp at which this
266 /// [`Chunk`] contains data for this particular component on this particular timeline (see
267 /// [`Chunk::time_range_per_component`]).
268 /// * For an `(entity, timeline)` index, that would be the first timestamp at which this [`Chunk`]
269 /// contains data for any component on this particular timeline (see [`re_chunk::TimeColumn::time_range`]).
270 ///
271 /// This index includes virtual/offloaded chunks, and therefore is purely additive: garbage collection
272 /// will never remove values from this set.
273 pub(crate) per_start_time: BTreeMap<TimeInt, ChunkIdSet>,
274
275 /// *Both physical & virtual* [`ChunkId`]s organized by their _most specific_ end time.
276 ///
277 /// What "most specific" means depends on the context in which the [`ChunkIdSetPerTime`]
278 /// was instantiated, e.g.:
279 /// * For an `(entity, timeline, component)` index, that would be the last timestamp at which this
280 /// [`Chunk`] contains data for this particular component on this particular timeline (see
281 /// [`Chunk::time_range_per_component`]).
282 /// * For an `(entity, timeline)` index, that would be the last timestamp at which this [`Chunk`]
283 /// contains data for any component on this particular timeline (see [`re_chunk::TimeColumn::time_range`]).
284 ///
285 /// This index includes virtual/offloaded chunks, and therefore is purely additive: garbage collection
286 /// will never remove values from this set.
287 pub(crate) per_end_time: BTreeMap<TimeInt, ChunkIdSet>,
288}
289
290impl re_byte_size::SizeBytes for ChunkIdSetPerTime {
291 fn heap_size_bytes(&self) -> u64 {
292 let Self {
293 max_interval_length,
294 per_start_time,
295 per_end_time,
296 } = self;
297
298 max_interval_length.heap_size_bytes()
299 + per_start_time.heap_size_bytes()
300 + per_end_time.heap_size_bytes()
301 }
302}
303
304pub type ChunkIdSetPerTimePerComponent = IntMap<ComponentIdentifier, ChunkIdSetPerTime>;
305
306pub type ChunkIdSetPerTimePerComponentPerTimeline =
307 IntMap<TimelineName, ChunkIdSetPerTimePerComponent>;
308
309pub type ChunkIdSetPerTimePerComponentPerTimelinePerEntity =
310 IntMap<EntityPath, ChunkIdSetPerTimePerComponentPerTimeline>;
311
312pub type ChunkIdPerComponent = IntMap<ComponentIdentifier, ChunkId>;
313
314pub type ChunkIdPerComponentPerEntity = IntMap<EntityPath, ChunkIdPerComponent>;
315
316pub type ChunkIdSetPerTimePerTimeline = IntMap<TimelineName, ChunkIdSetPerTime>;
317
318pub type ChunkIdSetPerTimePerTimelinePerEntity = IntMap<EntityPath, ChunkIdSetPerTimePerTimeline>;
319
320// ---
321
322#[derive(Debug, Clone)]
323pub struct ColumnMetadata {
324 /// Whether this column represents static data.
325 pub is_static: bool,
326
327 /// Whether this column represents a `Clear`-related component.
328 ///
329 /// `Clear`: [`re_types_core::archetypes::Clear`]
330 pub is_tombstone: bool,
331
332 /// Whether this column contains either no data or only contains null and/or empty values (`[]`).
333 pub is_semantically_empty: bool,
334}
335
336/// Internal state that needs to be maintained in order to compute [`ColumnMetadata`].
337#[derive(Debug, Clone)]
338pub struct ColumnMetadataState {
339 /// Whether this column contains either no data or only contains null and/or empty values (`[]`).
340 ///
341 /// This is purely additive: once false, it will always be false. Even in case of garbage
342 /// collection.
343 pub is_semantically_empty: bool,
344}
345
346impl re_byte_size::SizeBytes for ColumnMetadataState {
347 fn heap_size_bytes(&self) -> u64 {
348 let Self {
349 is_semantically_empty,
350 } = self;
351
352 is_semantically_empty.heap_size_bytes()
353 }
354}
355
356/// Incremented on each edit.
357#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
358pub struct ChunkStoreGeneration {
359 insert_id: u64,
360 gc_id: u64,
361}
362
363/// A ref-counted, inner-mutable handle to a [`ChunkStore`].
364///
365/// Cheap to clone.
366///
367/// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see:
368/// * [`ChunkStoreHandle::read_arc`]
369/// * [`ChunkStoreHandle::write_arc`]
370#[derive(Clone)]
371pub struct ChunkStoreHandle(Arc<parking_lot::RwLock<ChunkStore>>);
372
373impl std::fmt::Display for ChunkStoreHandle {
374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375 f.write_fmt(format_args!("{}", self.0.read()))
376 }
377}
378
379impl ChunkStoreHandle {
380 #[inline]
381 pub fn new(store: ChunkStore) -> Self {
382 Self(Arc::new(parking_lot::RwLock::new(store)))
383 }
384
385 #[inline]
386 pub fn into_inner(self) -> Arc<parking_lot::RwLock<ChunkStore>> {
387 self.0
388 }
389}
390
391impl ChunkStoreHandle {
392 #[inline]
393 pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
394 self.0.read_recursive()
395 }
396
397 #[inline]
398 pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, ChunkStore>> {
399 self.0.try_read_recursive()
400 }
401
402 #[inline]
403 pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
404 self.0.write()
405 }
406
407 #[inline]
408 pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, ChunkStore>> {
409 self.0.try_write()
410 }
411
412 #[inline]
413 pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
414 parking_lot::RwLock::read_arc_recursive(&self.0)
415 }
416
417 #[inline]
418 pub fn try_read_arc(
419 &self,
420 ) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore>> {
421 parking_lot::RwLock::try_read_recursive_arc(&self.0)
422 }
423
424 #[inline]
425 pub fn write_arc(
426 &self,
427 ) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
428 parking_lot::RwLock::write_arc(&self.0)
429 }
430
431 #[inline]
432 pub fn try_write_arc(
433 &self,
434 ) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore>> {
435 parking_lot::RwLock::try_write_arc(&self.0)
436 }
437}
438
439/// This keeps track of all missing virtual [`ChunkId`]s and all
440/// used physical [`ChunkId`]s.
441#[derive(Clone, Debug, Default)]
442pub struct QueriedChunkIdTracker {
443 /// Used physical chunks.
444 pub used_physical: HashSet<ChunkId>,
445
446 /// Missing virtual chunks.
447 ///
448 /// Chunks are considered missing when they are required to compute the results of a query, but cannot be
449 /// found in local memory. This set is automatically populated anytime that happens.
450 ///
451 /// Note, these are NOT necessarily _root_ chunks.
452 /// Use [`ChunkStore::find_root_chunks`] to get those.
453 //
454 // TODO(cmc): Once lineage tracking is in place, make sure that this only reports missing
455 // chunks using their root-level IDs, so downstream consumers don't have to redundantly build
456 // their own tracking. And document it so.
457 pub missing_virtual: HashSet<ChunkId>,
458}
459
460impl re_byte_size::SizeBytes for QueriedChunkIdTracker {
461 fn heap_size_bytes(&self) -> u64 {
462 let Self {
463 used_physical,
464 missing_virtual,
465 } = self;
466
467 used_physical.heap_size_bytes() + missing_virtual.heap_size_bytes()
468 }
469}
470
471/// A complete chunk store: covers all timelines, all entities, everything.
472///
473/// The chunk store _always_ works at the chunk level, whether it is for write & read queries or
474/// garbage collection. It is completely oblivious to individual rows.
475///
476/// Use the `Display` implementation for a detailed view of the internals.
477#[derive(Debug)]
478pub struct ChunkStore {
479 pub(crate) id: StoreId,
480
481 /// The configuration of the chunk store (e.g. compaction settings).
482 pub(crate) config: ChunkStoreConfig,
483
484 /// Keeps track of the _latest_ datatype for each time column.
485 ///
486 /// This index is purely additive: it is never affected by garbage collection in any way.
487 ///
488 /// See also [`Self::time_column_type`].
489 pub(crate) time_type_registry: IntMap<TimelineName, TimeType>,
490
491 // TODO(grtlr): Can we slim this map down by getting rid of `ColumnIdentifier`-level here?
492 pub(crate) per_column_metadata: IntMap<
493 EntityPath,
494 IntMap<ComponentIdentifier, (ComponentDescriptor, ColumnMetadataState, ArrowDataType)>,
495 >,
496
497 /// All the *physical* chunks currently loaded in the store, mapped by their respective IDs.
498 ///
499 /// Physical chunks are chunks that are actively loaded into the store's volatile memory.
500 ///
501 /// During garbage collection, physical chunks are offloaded from memory and become virtual
502 /// chunks instead. At the same time, their IDs are removed from this set, which is how we
503 /// distinguish virtual from physical chunks.
504 ///
505 /// Virtual chunks are still indexed by the store, but querying for them will not yield any data,
506 /// just hints that some data is missing and must first be re-inserted by the caller.
507 pub(crate) physical_chunks_per_chunk_id: BTreeMap<ChunkId, Arc<Chunk>>,
508
509 /// All *physical* [`ChunkId`]s currently in the store, indexed by the smallest [`RowId`] in
510 /// each of them.
511 ///
512 /// This is effectively all chunks in global data order. Used for garbage collection.
513 ///
514 /// During garbage collection, physical chunks are offloaded from memory and become virtual
515 /// chunks instead. At the same time, their IDs are removed from this set, which is how we
516 /// distinguish virtual from physical chunks.
517 pub(crate) physical_chunk_ids_per_min_row_id: BTreeMap<RowId, ChunkId>,
518
519 /// Keeps track of where each individual chunks, both virtual & physical, came from.
520 ///
521 /// Due to compaction, a chunk's lineage often forms a tree rather than a straight line.
522 /// The lineage tree always ends in one of two ways:
523 /// * A reference to volatile memory, from which the chunk came from, and that cannot ever be
524 /// reached again.
525 /// * A reference to an RRD manifest, from which the chunk was virtually loaded from, and where
526 /// it can still be reached, provided that the associated Redap server still exists.
527 ///
528 /// This is purely additive: never garbage collected.
529 pub(crate) chunks_lineage: HashMap<ChunkId, ChunkDirectLineage>,
530
531 /// Anytime a chunk gets split during insertion, this is recorded here.
532 ///
533 /// The key is the ID of the source chunk, before splitting, which never made it into the store.
534 /// The values are the IDs of the resulting split chunks, which were actually inserted.
535 ///
536 /// Splitting cannot be recursive, and therefore there is never any requirement to traverse
537 /// this datastructure recursively.
538 ///
539 /// So why is this useful? We use this data on the write path in order to detect when a chunk that
540 /// was previously inserted, and split into smaller chunks, is being inserted *again*, e.g. because
541 /// it had been offloaded due to memory pressure and is now making a comeback.
542 /// What might happen in these sort of scenarios, is that some of the resulting splits were
543 /// garbage collected away, but not all of them, and now we end up with tiny overlaps all over
544 /// the store which, while they don't impact semantics in any way, are annoying for at least 2 reasons:
545 /// * performance of the query engine
546 /// * hard to reason about for downstream consumers building secondary datastructures (e.g. video cache)
547 ///
548 /// `HashMap<OriginalChunkId, SplitChunkIds>`
549 pub(crate) dangling_splits: HashMap<ChunkId, Vec<ChunkId>>,
550
551 /// All chunks that were split on-ingestion.
552 ///
553 /// This is like [`Self::dangling_splits`], but is only ever added to.
554 ///
555 /// This is only used for sanity checks.
556 pub(crate) split_on_ingest: HashSet<ChunkId>,
557
558 /// Anytime a chunk gets compacted with another during insertion, this is recorded here.
559 ///
560 /// The key can be either one of two things:
561 /// * The ID of an already stored physical chunk, that was elected for compaction.
562 /// * The ID of the chunk being inserted, before compaction, which never made it into the store.
563 ///
564 /// The value is the ID of the resulting compacted chunk, which was actually inserted.
565 ///
566 /// Compaction is a recursive process: you should probably traverse this datastructure *recursively*.
567 ///
568 /// So why is this useful? We use this data on the write path in order to detect when a chunk that
569 /// was previously inserted, and (potentially recursively) compacted with another chunk, is being
570 /// inserted *again*, e.g. because it had been offloaded due to memory pressure and is now making a comeback.
571 /// When that happens, the data for that chunk would effectively be duplicated across the chunk and
572 /// the pre-existing compacted data.
573 /// While that doesn't impact semantics in any way, it's still annoying for at least 2 reasons:
574 /// * performance of the query engine
575 /// * hard to reason about for downstream consumers building secondary datastructures (e.g. video cache)
576 ///
577 /// This is purely additive: never garbage collected.
578 ///
579 /// `HashMap<OriginalChunkId, CompactedChunkId>`
580 pub(crate) leaky_compactions: HashMap<ChunkId, ChunkId>,
581
582 /// All *physical & virtual* temporal [`ChunkId`]s for all entities on all timelines, further
583 /// indexed by [`ComponentIdentifier`].
584 ///
585 /// This index is purely additive: it is never affected by garbage collection in any way.
586 /// This implies that the chunk IDs present in this set might be either physical/loaded or
587 /// virtual/offloaded.
588 /// When leveraging this index, make sure you understand whether you expect loaded chunks,
589 /// unloaded chunks, or both. Leverage [`Self::physical_chunks_per_chunk_id`] to know which is which.
590 ///
591 /// See also:
592 /// * [`Self::temporal_chunk_ids_per_entity`].
593 /// * [`Self::static_chunk_ids_per_entity`].
594 pub(crate) temporal_chunk_ids_per_entity_per_component:
595 ChunkIdSetPerTimePerComponentPerTimelinePerEntity,
596
597 /// All *physical & virtual* temporal [`ChunkId`]s for all entities on all timelines, without the
598 /// [`ComponentType`] index.
599 ///
600 /// This index is purely additive: it is never affected by garbage collection in any way.
601 /// This implies that the chunk IDs present in this set might be either physical/loaded or
602 /// virtual/offloaded.
603 /// When leveraging this index, make sure you understand whether you expect loaded chunks,
604 /// unloaded chunks, or both. Leverage [`Self::physical_chunks_per_chunk_id`] to know which is which.
605 ///
606 /// See also:
607 /// * [`Self::temporal_chunk_ids_per_entity_per_component`].
608 /// * [`Self::static_chunk_ids_per_entity`].
609 pub(crate) temporal_chunk_ids_per_entity: ChunkIdSetPerTimePerTimelinePerEntity,
610
611 /// Accumulated size statitistics for all *physical* temporal [`Chunk`]s currently present in the store.
612 ///
613 /// This is too costly to be computed from scratch every frame, and therefore materialized here.
614 ///
615 /// *This exclusively covers physical/loaded chunks*. During GC, these statistics are decremented
616 /// as you'd expect.
617 pub(crate) temporal_physical_chunks_stats: ChunkStoreChunkStats,
618
619 /// Static data. Never garbage collected.
620 ///
621 /// Static data unconditionally shadows temporal data at query time.
622 ///
623 /// Existing temporal will not be removed. Events won't be fired.
624 pub(crate) static_chunk_ids_per_entity: ChunkIdPerComponentPerEntity,
625
626 /// Accumulated size statitistics for all *physical* static [`Chunk`]s currently present in the store.
627 ///
628 /// This is too costly to be computed from scratch every frame, and is therefore materialized here.
629 pub(crate) static_chunks_stats: ChunkStoreChunkStats,
630
631 /// Calling [`ChunkStore::take_tracked_chunk_ids`] will atomically return the contents of this
632 /// struct as well as clearing it.
633 pub(crate) queried_chunk_id_tracker: RwLock<QueriedChunkIdTracker>,
634
635 /// Monotonically increasing ID for insertions.
636 pub(crate) insert_id: u64,
637
638 /// Monotonically increasing ID for GCs.
639 pub(crate) gc_id: u64,
640
641 /// Monotonically increasing ID for store events.
642 pub(crate) event_id: AtomicU64,
643}
644
645impl Drop for ChunkStore {
646 fn drop(&mut self) {
647 // First and foremost, notify per-store subscribers that an entire store was just dropped,
648 // and therefore they can just drop entire chunks of their own state.
649 Self::drop_per_store_subscribers(&self.id());
650
651 if self.config.enable_changelog {
652 // Then, if the changelog is enabled, trigger a full GC: this will notify all remaining
653 // subscribers of all the chunks that were dropped by dropping the store itself.
654 _ = self.gc(&crate::GarbageCollectionOptions::gc_everything());
655 }
656 }
657}
658
659impl Clone for ChunkStore {
660 #[inline]
661 fn clone(&self) -> Self {
662 re_tracing::profile_function!();
663 Self {
664 id: self.id.clone(),
665 config: self.config.clone(),
666 time_type_registry: self.time_type_registry.clone(),
667 per_column_metadata: self.per_column_metadata.clone(),
668 physical_chunks_per_chunk_id: self.physical_chunks_per_chunk_id.clone(),
669 chunks_lineage: self.chunks_lineage.clone(),
670 dangling_splits: self.dangling_splits.clone(),
671 split_on_ingest: self.split_on_ingest.clone(),
672 leaky_compactions: self.leaky_compactions.clone(),
673 physical_chunk_ids_per_min_row_id: self.physical_chunk_ids_per_min_row_id.clone(),
674 temporal_chunk_ids_per_entity_per_component: self
675 .temporal_chunk_ids_per_entity_per_component
676 .clone(),
677 temporal_chunk_ids_per_entity: self.temporal_chunk_ids_per_entity.clone(),
678 temporal_physical_chunks_stats: self.temporal_physical_chunks_stats,
679 static_chunk_ids_per_entity: self.static_chunk_ids_per_entity.clone(),
680 static_chunks_stats: self.static_chunks_stats,
681 queried_chunk_id_tracker: Default::default(),
682 insert_id: Default::default(),
683 gc_id: Default::default(),
684 event_id: Default::default(),
685 }
686 }
687}
688
689impl std::fmt::Display for ChunkStore {
690 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691 let Self {
692 id,
693 config,
694 time_type_registry: _,
695 per_column_metadata: _,
696 physical_chunks_per_chunk_id: chunks_per_chunk_id,
697 physical_chunk_ids_per_min_row_id: chunk_ids_per_min_row_id,
698 chunks_lineage,
699 dangling_splits: _,
700 split_on_ingest: _,
701 leaky_compactions: _,
702 temporal_chunk_ids_per_entity_per_component: _,
703 temporal_chunk_ids_per_entity: _,
704 temporal_physical_chunks_stats,
705 static_chunk_ids_per_entity: _,
706 static_chunks_stats,
707 queried_chunk_id_tracker: _,
708 insert_id: _,
709 gc_id: _,
710 event_id: _,
711 } = self;
712
713 f.write_str("ChunkStore {\n")?;
714
715 f.write_str(&indent::indent_all_by(4, format!("id: {id:?}\n")))?;
716 f.write_str(&indent::indent_all_by(4, format!("config: {config:?}\n")))?;
717
718 f.write_str(&indent::indent_all_by(4, "stats: {\n"))?;
719 f.write_str(&indent::indent_all_by(
720 8,
721 format!("{}", *static_chunks_stats + *temporal_physical_chunks_stats),
722 ))?;
723 f.write_str(&indent::indent_all_by(4, "}\n"))?;
724
725 f.write_str(&indent::indent_all_by(4, "physical chunks: [\n"))?;
726 for chunk_id in chunk_ids_per_min_row_id.values() {
727 if let Some(chunk) = chunks_per_chunk_id.get(chunk_id) {
728 f.write_str(&indent::indent_all_by(
729 8,
730 format!("{}\n", self.format_lineage(chunk_id)),
731 ))?;
732
733 if let Some(width) = f.width() {
734 let chunk_width = width.saturating_sub(8);
735 f.write_str(&indent::indent_all_by(8, format!("{chunk:chunk_width$}\n")))?;
736 } else {
737 f.write_str(&indent::indent_all_by(8, format!("{chunk}\n")))?;
738 }
739 } else {
740 f.write_str(&indent::indent_all_by(8, "<not_found>\n"))?;
741 }
742 }
743 f.write_str(&indent::indent_all_by(4, "]\n"))?;
744
745 f.write_str(&indent::indent_all_by(4, "virtual chunks: [\n"))?;
746 for chunk_id in chunks_lineage.keys().sorted() {
747 if chunks_per_chunk_id.contains_key(chunk_id) {
748 continue;
749 }
750
751 f.write_str(&indent::indent_all_by(
752 8,
753 format!("{}\n", self.format_lineage(chunk_id)),
754 ))?;
755 }
756 f.write_str(&indent::indent_all_by(4, "]\n"))?;
757
758 f.write_str("}")?;
759
760 Ok(())
761 }
762}
763
764// ---
765
766impl ChunkStore {
767 /// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
768 ///
769 /// See also:
770 /// * [`ChunkStore::new`]
771 /// * [`ChunkStore::from_rrd_filepath`]
772 #[inline]
773 pub fn new(id: StoreId, config: ChunkStoreConfig) -> Self {
774 Self {
775 id,
776 config,
777 time_type_registry: Default::default(),
778 per_column_metadata: Default::default(),
779 physical_chunk_ids_per_min_row_id: Default::default(),
780 chunks_lineage: Default::default(),
781 dangling_splits: Default::default(),
782 split_on_ingest: Default::default(),
783 leaky_compactions: Default::default(),
784 physical_chunks_per_chunk_id: Default::default(),
785 temporal_chunk_ids_per_entity_per_component: Default::default(),
786 temporal_chunk_ids_per_entity: Default::default(),
787 temporal_physical_chunks_stats: Default::default(),
788 static_chunk_ids_per_entity: Default::default(),
789 static_chunks_stats: Default::default(),
790 queried_chunk_id_tracker: Default::default(),
791 insert_id: 0,
792 gc_id: 0,
793 event_id: AtomicU64::new(0),
794 }
795 }
796
797 /// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
798 ///
799 /// Pre-wraps the result in a [`ChunkStoreHandle`].
800 ///
801 /// See also:
802 /// * [`ChunkStore::from_rrd_filepath`]
803 #[inline]
804 pub fn new_handle(id: StoreId, config: ChunkStoreConfig) -> ChunkStoreHandle {
805 ChunkStoreHandle::new(Self::new(id, config))
806 }
807
808 #[inline]
809 pub fn id(&self) -> StoreId {
810 self.id.clone()
811 }
812
813 /// Return the current [`ChunkStoreGeneration`]. This can be used to determine whether the
814 /// database has been modified since the last time it was queried.
815 #[inline]
816 pub fn generation(&self) -> ChunkStoreGeneration {
817 ChunkStoreGeneration {
818 insert_id: self.insert_id,
819 gc_id: self.gc_id,
820 }
821 }
822
823 /// See [`ChunkStoreConfig`] for more information about configuration.
824 #[inline]
825 pub fn config(&self) -> &ChunkStoreConfig {
826 &self.config
827 }
828
829 /// Iterate over all *physical* chunks in the store, in ascending [`ChunkId`] order.
830 #[inline]
831 pub fn iter_physical_chunks(&self) -> impl Iterator<Item = &Arc<Chunk>> + '_ {
832 self.physical_chunks_per_chunk_id.values()
833 }
834
835 /// Get a *physical* chunk based on its ID.
836 #[inline]
837 pub fn physical_chunk(&self, physical_chunk_id: &ChunkId) -> Option<&Arc<Chunk>> {
838 self.physical_chunks_per_chunk_id.get(physical_chunk_id)
839 }
840
841 /// Get a *physical* chunk based on its ID and track the chunk as either
842 /// used or missing, to signal that it should be kept or fetched.
843 #[track_caller]
844 pub fn use_physical_chunk_or_report_missing(&self, id: &ChunkId) -> Option<&Arc<Chunk>> {
845 debug_assert!(
846 !self.split_on_ingest.contains(id),
847 "Asked for a physical chunk, but this chunk was split on ingestion and was never physical: {id}"
848 );
849
850 let chunk = self.physical_chunk(id);
851
852 if chunk.is_some() {
853 self.report_used_physical_chunk_id(*id);
854 } else {
855 self.report_missing_virtual_chunk_id(*id);
856 }
857
858 chunk
859 }
860
861 /// Get the number of *physical* chunks in the store.
862 #[inline]
863 pub fn num_physical_chunks(&self) -> usize {
864 self.physical_chunks_per_chunk_id.len()
865 }
866
867 /// All the currently loaded chunks
868 pub fn physical_chunks(&self) -> impl Iterator<Item = &Arc<Chunk>> + '_ {
869 self.physical_chunks_per_chunk_id.values()
870 }
871
872 /// Lookup the _latest_ [`TimeType`] used by a specific [`TimelineName`].
873 #[inline]
874 pub fn time_column_type(&self, timeline_name: &TimelineName) -> Option<TimeType> {
875 self.time_type_registry.get(timeline_name).copied()
876 }
877
878 /// Lookup the [`ColumnMetadata`] for a specific [`EntityPath`] and [`re_types_core::Component`].
879 pub fn lookup_column_metadata(
880 &self,
881 entity_path: &EntityPath,
882 component: ComponentIdentifier,
883 ) -> Option<ColumnMetadata> {
884 let ColumnMetadataState {
885 is_semantically_empty,
886 } = self
887 .per_column_metadata
888 .get(entity_path)
889 .and_then(|per_identifier| per_identifier.get(&component))
890 .map(|(_, metadata_state, _)| metadata_state)?;
891
892 let is_static = self
893 .static_chunk_ids_per_entity
894 .get(entity_path)
895 .is_some_and(|per_component| per_component.get(&component).is_some());
896
897 use re_types_core::Archetype as _;
898 let is_tombstone = re_types_core::archetypes::Clear::all_components()
899 .iter()
900 .any(|descr| descr.component == component);
901
902 Some(ColumnMetadata {
903 is_static,
904 is_tombstone,
905 is_semantically_empty: *is_semantically_empty,
906 })
907 }
908
909 /// Get the [`ComponentType`] and [`ArrowDataType`] for a specific [`EntityPath`] and [`ComponentIdentifier`].
910 pub fn lookup_component_type(
911 &self,
912 entity_path: &EntityPath,
913 component: ComponentIdentifier,
914 ) -> Option<(Option<ComponentType>, ArrowDataType)> {
915 let (component_descr, _, datatype) = self
916 .per_column_metadata
917 .get(entity_path)
918 .and_then(|per_identifier| per_identifier.get(&component))?;
919 Some((component_descr.component_type, datatype.clone()))
920 }
921
922 /// Checks whether any column in the store with the given [`ComponentType`] has a datatype
923 /// that differs from `expected_datatype`.
924 ///
925 /// This iterates over all entities, so it should not be called in a hot path.
926 pub fn has_mismatched_datatype_for_component_type(
927 &self,
928 component_type: &ComponentType,
929 expected_datatype: &ArrowDataType,
930 ) -> Option<&ArrowDataType> {
931 for per_component in self.per_column_metadata.values() {
932 for (descr, _, datatype) in per_component.values() {
933 if descr.component_type.as_ref() == Some(component_type)
934 && datatype != expected_datatype
935 {
936 return Some(datatype);
937 }
938 }
939 }
940 None
941 }
942
943 /// Returns and iterator over [`ChunkId`]s that were detected as
944 /// used or missing since the last time since method was called.
945 ///
946 /// Chunks are considered missing when they are required to compute the results of a query, but cannot be
947 /// found in local memory.
948 ///
949 /// Calling this method is destructive: the internal set is cleared on every call, and will grow back as
950 /// new queries are run.
951 /// Callers are expected to call this once per frame in order to know which chunks were missing during
952 /// the previous frame.
953 ///
954 /// The returned [`ChunkId`]s can live anywhere within the lineage tree, and therefore might
955 /// not be usable for downstream consumers that did not track even compaction/split-off events.
956 /// Use [`Self::find_root_chunks`] to find the original chunks that those IDs descended from.
957 pub fn take_tracked_chunk_ids(&self) -> QueriedChunkIdTracker {
958 std::mem::take(&mut self.queried_chunk_id_tracker.write())
959 }
960
961 /// See [`Self::take_tracked_chunk_ids`] for more details.
962 pub fn tracked_chunk_ids(&self) -> QueriedChunkIdTracker {
963 self.queried_chunk_id_tracker.read().clone()
964 }
965
966 /// Signal that the chunk was used and should not be evicted by gc.
967 pub fn report_used_physical_chunk_id(&self, chunk_id: ChunkId) {
968 debug_assert!(self.physical_chunk(&chunk_id).is_some());
969
970 self.queried_chunk_id_tracker
971 .write()
972 .used_physical
973 .insert(chunk_id);
974 }
975
976 /// Signal that a chunk is missing and should be fetched when possible.
977 #[track_caller]
978 pub fn report_missing_virtual_chunk_id(&self, chunk_id: ChunkId) {
979 debug_assert!(
980 self.chunks_lineage.contains_key(&chunk_id),
981 "A chunk was reported missing, with no known lineage: {chunk_id}"
982 );
983 if self.split_on_ingest.contains(&chunk_id) {
984 if cfg!(debug_assertions) {
985 re_log::warn_once!(
986 "Tried to report a chunk missing that was the source of a split (manual)"
987 );
988 }
989 re_log::debug_once!(
990 "Tried to report a chunk missing that was the source of a split: {chunk_id} (manual)"
991 );
992 }
993
994 self.queried_chunk_id_tracker
995 .write()
996 .missing_virtual
997 .insert(chunk_id);
998 }
999
1000 /// How many missing chunk IDs are currently registered?
1001 ///
1002 /// See also [`ChunkStore::take_tracked_chunk_ids`].
1003 pub fn num_missing_chunk_ids(&self) -> usize {
1004 self.queried_chunk_id_tracker.read().missing_virtual.len()
1005 }
1006}
1007
1008// ---
1009
1010impl ChunkStore {
1011 /// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
1012 ///
1013 /// The stores will be prefilled with the data at the specified path.
1014 ///
1015 /// See also:
1016 /// * [`ChunkStore::new`]
1017 #[cfg(not(target_arch = "wasm32"))]
1018 pub fn from_rrd_filepath(
1019 store_config: &ChunkStoreConfig,
1020 path_to_rrd: impl AsRef<std::path::Path>,
1021 ) -> anyhow::Result<BTreeMap<StoreId, Self>> {
1022 let path_to_rrd = path_to_rrd.as_ref();
1023
1024 re_tracing::profile_function!(path_to_rrd.to_string_lossy());
1025
1026 use anyhow::Context as _;
1027
1028 let mut stores = BTreeMap::new();
1029
1030 let rrd_file = std::fs::File::open(path_to_rrd)
1031 .with_context(|| format!("couldn't open {path_to_rrd:?}"))?;
1032
1033 let decoder = re_log_encoding::Decoder::decode_eager(std::io::BufReader::new(rrd_file))
1034 .with_context(|| format!("couldn't decode {path_to_rrd:?}"))?;
1035
1036 // TODO(cmc): offload the decoding to a background thread.
1037 for res in decoder {
1038 let msg = res.with_context(|| format!("couldn't decode message {path_to_rrd:?}"))?;
1039 match msg {
1040 re_log_types::LogMsg::SetStoreInfo(info) => {
1041 stores.entry(info.info.store_id.clone()).or_insert_with(|| {
1042 Self::new(info.info.store_id.clone(), store_config.clone())
1043 });
1044 }
1045
1046 re_log_types::LogMsg::ArrowMsg(store_id, msg) => {
1047 let Some(store) = stores.get_mut(&store_id) else {
1048 anyhow::bail!("unknown store ID: {store_id:?}");
1049 };
1050
1051 let chunk = Chunk::from_arrow_msg(&msg)
1052 .with_context(|| format!("couldn't decode chunk {path_to_rrd:?}"))?;
1053
1054 store
1055 .insert_chunk(&Arc::new(chunk))
1056 .with_context(|| format!("couldn't insert chunk {path_to_rrd:?}"))?;
1057 }
1058
1059 re_log_types::LogMsg::BlueprintActivationCommand(_) => {}
1060 }
1061 }
1062
1063 Ok(stores)
1064 }
1065
1066 /// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
1067 ///
1068 /// The stores will be prefilled with the data in the given `log_msgs`.
1069 ///
1070 /// See also:
1071 /// * [`ChunkStore::new`]
1072 pub fn from_log_msgs(
1073 store_config: &ChunkStoreConfig,
1074 log_msgs: impl IntoIterator<Item = re_log_types::LogMsg>,
1075 ) -> anyhow::Result<BTreeMap<StoreId, Self>> {
1076 re_tracing::profile_function!();
1077
1078 use anyhow::Context as _;
1079
1080 let mut stores = BTreeMap::new();
1081
1082 // TODO(cmc): offload the decoding to a background thread.
1083 let log_msgs = log_msgs.into_iter();
1084 for msg in log_msgs {
1085 match msg {
1086 re_log_types::LogMsg::SetStoreInfo(info) => {
1087 stores.entry(info.info.store_id.clone()).or_insert_with(|| {
1088 Self::new(info.info.store_id.clone(), store_config.clone())
1089 });
1090 }
1091
1092 re_log_types::LogMsg::ArrowMsg(store_id, msg) => {
1093 let Some(store) = stores.get_mut(&store_id) else {
1094 anyhow::bail!("unknown store ID: {store_id:?}");
1095 };
1096
1097 let chunk = Chunk::from_arrow_msg(&msg)
1098 .with_context(|| "couldn't decode chunk".to_owned())?;
1099
1100 store
1101 .insert_chunk(&Arc::new(chunk))
1102 .with_context(|| "couldn't insert chunk".to_owned())?;
1103 }
1104
1105 re_log_types::LogMsg::BlueprintActivationCommand(_) => {}
1106 }
1107 }
1108
1109 Ok(stores)
1110 }
1111
1112 /// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
1113 ///
1114 /// Wraps the results in [`ChunkStoreHandle`]s.
1115 ///
1116 /// The stores will be prefilled with the data at the specified path.
1117 ///
1118 /// See also:
1119 /// * [`ChunkStore::new_handle`]
1120 #[cfg(not(target_arch = "wasm32"))]
1121 pub fn handle_from_rrd_filepath(
1122 store_config: &ChunkStoreConfig,
1123 path_to_rrd: impl AsRef<std::path::Path>,
1124 ) -> anyhow::Result<BTreeMap<StoreId, ChunkStoreHandle>> {
1125 Ok(Self::from_rrd_filepath(store_config, path_to_rrd)?
1126 .into_iter()
1127 .map(|(store_id, store)| (store_id, ChunkStoreHandle::new(store)))
1128 .collect())
1129 }
1130}