re_entity_db/
entity_db.rs

1use std::fmt::{Debug, Formatter};
2use std::sync::Arc;
3
4use nohash_hasher::IntMap;
5use re_chunk::{
6    Chunk, ChunkBuilder, ChunkId, ChunkResult, ComponentIdentifier, LatestAtQuery, RowId, TimeInt,
7    TimePoint, Timeline, TimelineName,
8};
9use re_chunk_store::{
10    ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreDiffKind, ChunkStoreEvent,
11    ChunkStoreHandle, ChunkStoreSubscriber as _, GarbageCollectionOptions, GarbageCollectionTarget,
12};
13use re_log_channel::LogSource;
14use re_log_encoding::RrdManifest;
15use re_log_types::{
16    AbsoluteTimeRange, AbsoluteTimeRangeF, ApplicationId, EntityPath, EntityPathHash, LogMsg,
17    RecordingId, SetStoreInfo, StoreId, StoreInfo, StoreKind, TimeType,
18};
19use re_query::{
20    QueryCache, QueryCacheHandle, StorageEngine, StorageEngineArcReadGuard, StorageEngineReadGuard,
21};
22
23use crate::ingestion_statistics::IngestionStatistics;
24use crate::rrd_manifest_index::RrdManifestIndex;
25use crate::{Error, TimeHistogramPerTimeline};
26
27// ----------------------------------------------------------------------------
28
29/// See [`GarbageCollectionOptions::time_budget`].
30pub const DEFAULT_GC_TIME_BUDGET: std::time::Duration = std::time::Duration::from_micros(3500); // empirical
31
32// ----------------------------------------------------------------------------¨
33
34/// What class of [`EntityDb`] is this?
35///
36/// The class is used to semantically group recordings in the UI (e.g. in the recording panel) and
37/// to determine how to source the default blueprint. For example, `DatasetSegment` dbs might have
38/// their default blueprint sourced remotely.
39#[derive(Debug, PartialEq, Eq)]
40pub enum EntityDbClass<'a> {
41    /// This is a regular local recording (e.g. loaded from a `.rrd` file or logged to the viewer).
42    LocalRecording,
43
44    /// This is an official rerun example recording.
45    ExampleRecording,
46
47    /// This is a recording loaded from a remote dataset segment.
48    DatasetSegment(&'a re_uri::DatasetSegmentUri),
49
50    /// This is a blueprint.
51    Blueprint,
52}
53
54impl EntityDbClass<'_> {
55    pub fn is_example(&self) -> bool {
56        matches!(self, EntityDbClass::ExampleRecording)
57    }
58}
59
60// ---
61
62/// An in-memory database built from a stream of [`LogMsg`]es.
63///
64/// NOTE: all mutation is to be done via public functions!
65#[derive(Clone)] // Useful for tests
66pub struct EntityDb {
67    /// Store id associated with this [`EntityDb`]. Must be identical to the `storage_engine`'s
68    /// store id.
69    store_id: StoreId,
70
71    /// Set by whomever created this [`EntityDb`].
72    ///
73    /// Clones of an [`EntityDb`] gets a `None` source.
74    pub data_source: Option<re_log_channel::LogSource>,
75
76    rrd_manifest_index: RrdManifestIndex,
77
78    /// Comes in a special message, [`LogMsg::SetStoreInfo`].
79    set_store_info: Option<SetStoreInfo>,
80
81    /// Keeps track of the last time data was inserted into this store (viewer wall-clock).
82    last_modified_at: web_time::Instant,
83
84    /// The highest `RowId` in the store,
85    /// which corresponds to the last edit time.
86    /// Ignores deletions.
87    latest_row_id: Option<RowId>,
88
89    /// In many places we just store the hashes, so we need a way to translate back.
90    entity_path_from_hash: IntMap<EntityPathHash, EntityPath>,
91
92    /// A time histogram of all entities, for every timeline.
93    time_histogram_per_timeline: crate::TimeHistogramPerTimeline,
94
95    /// The [`StorageEngine`] that backs this [`EntityDb`].
96    ///
97    /// This object and all its internal fields are **never** allowed to be publicly exposed,
98    /// whether that is directly or through methods, _even if that's just shared references_.
99    ///
100    /// The only way to get access to the [`StorageEngine`] from the outside is to use
101    /// [`EntityDb::storage_engine`], which returns a read-only guard.
102    /// The design statically guarantees the absence of deadlocks and race conditions that normally
103    /// results from letting store and cache handles arbitrarily loose all across the codebase.
104    storage_engine: StorageEngine,
105
106    stats: IngestionStatistics,
107}
108
109impl Debug for EntityDb {
110    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("EntityDb")
112            .field("store_id", &self.store_id)
113            .field("data_source", &self.data_source)
114            .field("set_store_info", &self.set_store_info)
115            .finish()
116    }
117}
118
119impl EntityDb {
120    pub fn new(store_id: StoreId) -> Self {
121        Self::with_store_config(store_id, ChunkStoreConfig::from_env().unwrap_or_default())
122    }
123
124    pub fn with_store_config(store_id: StoreId, store_config: ChunkStoreConfig) -> Self {
125        let store = ChunkStoreHandle::new(ChunkStore::new(store_id.clone(), store_config));
126        let cache = QueryCacheHandle::new(QueryCache::new(store.clone()));
127
128        // Safety: these handles are never going to be leaked outside of the `EntityDb`.
129        #[expect(unsafe_code)]
130        let storage_engine = unsafe { StorageEngine::new(store, cache) };
131
132        Self {
133            store_id,
134            data_source: None,
135            rrd_manifest_index: Default::default(),
136            set_store_info: None,
137            last_modified_at: web_time::Instant::now(),
138            latest_row_id: None,
139            entity_path_from_hash: Default::default(),
140            time_histogram_per_timeline: Default::default(),
141            storage_engine,
142            stats: IngestionStatistics::default(),
143        }
144    }
145
146    #[inline]
147    pub fn tree(&self) -> &crate::EntityTree {
148        &self.rrd_manifest_index.entity_tree
149    }
150
151    /// Formats the entity tree into a human-readable text representation with component schema information.
152    pub fn format_with_components(&self) -> String {
153        let mut text = String::new();
154
155        let storage_engine = self.storage_engine();
156        let store = storage_engine.store();
157
158        self.tree().visit_children_recursively(|entity_path| {
159            if entity_path.is_root() {
160                return;
161            }
162            let depth = entity_path.len() - 1;
163            let indent = "  ".repeat(depth);
164            text.push_str(&format!("{indent}{entity_path}\n"));
165            let Some(components) = store.all_components_for_entity_sorted(entity_path) else {
166                return;
167            };
168            for component in components {
169                let component_indent = "  ".repeat(depth + 1);
170                if let Some(component_descr) =
171                    store.entity_component_descriptor(entity_path, component)
172                    && let Some(component_type) = &component_descr.component_type
173                {
174                    if let Some(datatype) = store.lookup_datatype(component_type) {
175                        text.push_str(&format!(
176                            "{}{}: {}\n",
177                            component_indent,
178                            component_type.short_name(),
179                            re_arrow_util::format_data_type(&datatype)
180                        ));
181                    } else {
182                        text.push_str(&format!(
183                            "{}{}\n",
184                            component_indent,
185                            component_type.short_name()
186                        ));
187                    }
188                } else {
189                    // Fallback to component identifier
190                    text.push_str(&format!("{component_indent}{component}\n"));
191                }
192            }
193        });
194        text
195    }
196
197    /// Returns a read-only guard to the backing [`StorageEngine`].
198    #[inline]
199    pub fn storage_engine(&self) -> StorageEngineReadGuard<'_> {
200        self.storage_engine.read()
201    }
202
203    /// Returns a reference to the backing [`StorageEngine`].
204    ///
205    /// This can be used to obtain a clone of the [`StorageEngine`].
206    ///
207    /// # Safety
208    ///
209    /// Trying to lock the [`StorageEngine`] (whether read or write) while the computation of a viewer's
210    /// frame is already in progress will lead to data inconsistencies, livelocks and deadlocks.
211    /// The viewer runs a synchronous work-stealing scheduler (`rayon`) as well as an asynchronous
212    /// one (`tokio`): when and where locks are taken is entirely non-deterministic (even unwanted reentrancy
213    /// is a possibility).
214    ///
215    /// Don't use this unless you know what you're doing. Use [`Self::storage_engine`] instead.
216    #[expect(unsafe_code)]
217    pub unsafe fn storage_engine_raw(&self) -> &StorageEngine {
218        &self.storage_engine
219    }
220
221    /// Returns a read-only guard to the backing [`StorageEngine`].
222    ///
223    /// That guard can be cloned at will and has a static lifetime.
224    ///
225    /// It is not possible to insert any more data in this [`EntityDb`] until the returned guard,
226    /// and any clones, have been dropped.
227    #[inline]
228    pub fn storage_engine_arc(&self) -> StorageEngineArcReadGuard {
229        self.storage_engine.read_arc()
230    }
231
232    #[inline]
233    pub fn rrd_manifest_index(&self) -> &RrdManifestIndex {
234        &self.rrd_manifest_index
235    }
236
237    #[inline]
238    pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
239        self.set_store_info.as_ref()
240    }
241
242    #[inline]
243    pub fn store_info(&self) -> Option<&StoreInfo> {
244        self.store_info_msg().map(|msg| &msg.info)
245    }
246
247    #[inline]
248    pub fn application_id(&self) -> &ApplicationId {
249        self.store_id().application_id()
250    }
251
252    #[inline]
253    pub fn recording_id(&self) -> &RecordingId {
254        self.store_id().recording_id()
255    }
256
257    #[inline]
258    pub fn store_kind(&self) -> StoreKind {
259        self.store_id().kind()
260    }
261
262    #[inline]
263    pub fn store_id(&self) -> &StoreId {
264        &self.store_id
265    }
266
267    /// Returns the [`EntityDbClass`] of this entity db.
268    pub fn store_class(&self) -> EntityDbClass<'_> {
269        match self.store_kind() {
270            StoreKind::Blueprint => EntityDbClass::Blueprint,
271
272            StoreKind::Recording => match &self.data_source {
273                Some(LogSource::RrdHttpStream { url, .. })
274                    if url.starts_with("https://app.rerun.io") =>
275                {
276                    EntityDbClass::ExampleRecording
277                }
278
279                Some(LogSource::RedapGrpcStream { uri, .. }) => EntityDbClass::DatasetSegment(uri),
280
281                _ => EntityDbClass::LocalRecording,
282            },
283        }
284    }
285
286    /// Read one of the built-in `RecordingInfo` properties.
287    pub fn recording_info_property<C: re_types_core::Component>(
288        &self,
289        component: ComponentIdentifier,
290    ) -> Option<C> {
291        debug_assert!(
292            component.starts_with("RecordingInfo:"),
293            "This function should only be used for built-in RecordingInfo components, which are the only recording properties at {}",
294            EntityPath::properties()
295        );
296
297        self.latest_at_component::<C>(
298            &EntityPath::properties(),
299            &LatestAtQuery::latest(TimelineName::log_tick()),
300            component,
301        )
302        .map(|(_, value)| value)
303    }
304
305    /// Use can use this both for setting the built-in `RecordingInfo` components,
306    /// and for setting custom properties on the recording.
307    pub fn set_recording_property<Component: re_types_core::Component>(
308        &mut self,
309        entity_path: EntityPath,
310        component_descr: re_types_core::ComponentDescriptor,
311        value: &Component,
312    ) -> Result<(), Error> {
313        debug_assert_eq!(component_descr.component_type, Some(Component::name()));
314        debug_assert!(entity_path.starts_with(&EntityPath::properties()));
315        debug_assert!(
316            (entity_path == EntityPath::properties())
317                == (component_descr.archetype == Some("rerun.archetypes.RecordingInfo".into())),
318            "RecordingInfo should be logged at {}. Custom properties should be under a child entity",
319            EntityPath::properties()
320        );
321
322        let chunk = ChunkBuilder::new(ChunkId::new(), entity_path)
323            .with_component(RowId::new(), TimePoint::STATIC, component_descr, value)
324            .map_err(|err| Error::Chunk(err.into()))?
325            .build()?;
326
327        self.add_chunk(&Arc::new(chunk))?;
328
329        Ok(())
330    }
331
332    pub fn timeline_type(&self, timeline_name: &TimelineName) -> TimeType {
333        self.storage_engine()
334            .store()
335            .time_column_type(timeline_name)
336            .unwrap_or_else(|| {
337                if timeline_name == &TimelineName::log_time() {
338                    Timeline::log_time().typ()
339                } else if timeline_name == &TimelineName::log_tick() {
340                    Timeline::log_tick().typ()
341                } else {
342                    re_log::warn_once!("Timeline {timeline_name:?} not found");
343                    TimeType::Sequence
344                }
345            })
346    }
347
348    /// Queries for the given components using latest-at semantics.
349    ///
350    /// See [`re_query::LatestAtResults`] for more information about how to handle the results.
351    ///
352    /// This is a cached API -- data will be lazily cached upon access.
353    #[inline]
354    pub fn latest_at(
355        &self,
356        query: &re_chunk_store::LatestAtQuery,
357        entity_path: &EntityPath,
358        components: impl IntoIterator<Item = ComponentIdentifier>,
359    ) -> re_query::LatestAtResults {
360        self.storage_engine
361            .read()
362            .cache()
363            .latest_at(query, entity_path, components)
364    }
365
366    /// Get the latest index and value for a given dense [`re_types_core::Component`].
367    ///
368    /// This assumes that the row we get from the store contains at most one instance for this
369    /// component; it will log a warning otherwise.
370    ///
371    /// This should only be used for "mono-components" such as `Transform` and `Tensor`.
372    ///
373    /// This is a best-effort helper, it will merely log errors on failure.
374    #[inline]
375    pub fn latest_at_component<C: re_types_core::Component>(
376        &self,
377        entity_path: &EntityPath,
378        query: &re_chunk_store::LatestAtQuery,
379        component: ComponentIdentifier,
380    ) -> Option<((TimeInt, RowId), C)> {
381        let results = self
382            .storage_engine
383            .read()
384            .cache()
385            .latest_at(query, entity_path, [component]);
386        results
387            .component_mono(component)
388            .map(|value| (results.index(), value))
389    }
390
391    /// Get the latest index and value for a given dense [`re_types_core::Component`].
392    ///
393    /// This assumes that the row we get from the store contains at most one instance for this
394    /// component; it will log a warning otherwise.
395    ///
396    /// This should only be used for "mono-components" such as `Transform` and `Tensor`.
397    ///
398    /// This is a best-effort helper, and will quietly swallow any errors.
399    #[inline]
400    pub fn latest_at_component_quiet<C: re_types_core::Component>(
401        &self,
402        entity_path: &EntityPath,
403        query: &re_chunk_store::LatestAtQuery,
404        component: ComponentIdentifier,
405    ) -> Option<((TimeInt, RowId), C)> {
406        let results = self
407            .storage_engine
408            .read()
409            .cache()
410            .latest_at(query, entity_path, [component]);
411
412        results
413            .component_mono_quiet(component)
414            .map(|value| (results.index(), value))
415    }
416
417    #[inline]
418    pub fn latest_at_component_at_closest_ancestor<C: re_types_core::Component>(
419        &self,
420        entity_path: &EntityPath,
421        query: &re_chunk_store::LatestAtQuery,
422        component: ComponentIdentifier,
423    ) -> Option<(EntityPath, (TimeInt, RowId), C)> {
424        re_tracing::profile_function!();
425
426        let mut cur_entity_path = Some(entity_path.clone());
427        while let Some(entity_path) = cur_entity_path {
428            if let Some((index, value)) = self.latest_at_component(&entity_path, query, component) {
429                return Some((entity_path, index, value));
430            }
431            cur_entity_path = entity_path.parent();
432        }
433
434        None
435    }
436
437    /// If this entity db is the result of a clone, which store was it cloned from?
438    ///
439    /// A cloned store always gets a new unique ID.
440    ///
441    /// We currently only use entity db cloning for blueprints:
442    /// when we activate a _default_ blueprint that was received on the wire (e.g. from a recording),
443    /// we clone it and make the clone the _active_ blueprint.
444    /// This means all active blueprints are clones.
445    #[inline]
446    pub fn cloned_from(&self) -> Option<&StoreId> {
447        let info = self.store_info()?;
448        info.cloned_from.as_ref()
449    }
450
451    pub fn timelines(&self) -> std::collections::BTreeMap<TimelineName, Timeline> {
452        self.storage_engine().store().timelines()
453    }
454
455    /// When do we have data on each timeline?
456    pub fn timeline_histograms(&self) -> &TimeHistogramPerTimeline {
457        &self.time_histogram_per_timeline
458    }
459
460    /// Returns the time range of data on the given timeline, ignoring any static times.
461    pub fn time_range_for(&self, timeline: &TimelineName) -> Option<AbsoluteTimeRange> {
462        self.storage_engine().store().time_range(timeline)
463    }
464
465    /// Histogram of all events on the timeeline, of all entities.
466    pub fn time_histogram(&self, timeline: &TimelineName) -> Option<&crate::TimeHistogram> {
467        self.time_histogram_per_timeline.get(timeline)
468    }
469
470    #[inline]
471    pub fn num_rows(&self) -> u64 {
472        self.storage_engine.read().store().stats().total().num_rows
473    }
474
475    /// Return the current `ChunkStoreGeneration`. This can be used to determine whether the
476    /// database has been modified since the last time it was queried.
477    #[inline]
478    pub fn generation(&self) -> re_chunk_store::ChunkStoreGeneration {
479        self.storage_engine.read().store().generation()
480    }
481
482    #[inline]
483    pub fn last_modified_at(&self) -> web_time::Instant {
484        self.last_modified_at
485    }
486
487    /// The highest `RowId` in the store,
488    /// which corresponds to the last edit time.
489    /// Ignores deletions.
490    #[inline]
491    pub fn latest_row_id(&self) -> Option<RowId> {
492        self.latest_row_id
493    }
494
495    #[inline]
496    pub fn is_empty(&self) -> bool {
497        self.set_store_info.is_none() && self.num_rows() == 0
498    }
499
500    /// A sorted list of all the entity paths in this database.
501    pub fn entity_paths(&self) -> Vec<&EntityPath> {
502        use itertools::Itertools as _;
503        self.entity_path_from_hash.values().sorted().collect()
504    }
505
506    #[inline]
507    pub fn ingestion_stats(&self) -> &IngestionStatistics {
508        &self.stats
509    }
510
511    #[inline]
512    pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
513        self.entity_path_from_hash.get(entity_path_hash)
514    }
515
516    /// Returns `true` also for entities higher up in the hierarchy.
517    #[inline]
518    pub fn is_known_entity(&self, entity_path: &EntityPath) -> bool {
519        self.tree().subtree(entity_path).is_some()
520    }
521
522    /// If you log `world/points`, then that is a logged entity, but `world` is not,
523    /// unless you log something to `world` too.
524    #[inline]
525    pub fn is_logged_entity(&self, entity_path: &EntityPath) -> bool {
526        self.entity_path_from_hash.contains_key(&entity_path.hash())
527    }
528
529    pub fn add_rrd_manifest_message(&mut self, rrd_manifest: RrdManifest) {
530        re_tracing::profile_function!();
531        re_log::debug!("Received RrdManifest for {:?}", self.store_id());
532
533        if let Err(err) = self
534            .time_histogram_per_timeline
535            .on_rrd_manifest(&rrd_manifest)
536        {
537            re_log::error!("Failed to ingest RRD Manifest: {err}");
538        }
539
540        if let Err(err) = self.rrd_manifest_index.append(rrd_manifest) {
541            re_log::error!("Failed to load RRD Manifest: {err}");
542        }
543    }
544
545    pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> {
546        re_tracing::profile_function!();
547
548        debug_assert_eq!(msg.store_id(), self.store_id());
549
550        let store_events = match &msg {
551            LogMsg::SetStoreInfo(msg) => {
552                self.set_store_info(msg.clone());
553                vec![]
554            }
555
556            LogMsg::ArrowMsg(_, arrow_msg) => {
557                self.last_modified_at = web_time::Instant::now();
558
559                let chunk_batch = re_sorbet::ChunkBatch::try_from(&arrow_msg.batch)
560                    .map_err(re_chunk::ChunkError::from)?;
561                let mut chunk = re_chunk::Chunk::from_chunk_batch(&chunk_batch)?;
562                chunk.sort_if_unsorted();
563                self.add_chunk_with_timestamp_metadata(
564                    &Arc::new(chunk),
565                    &chunk_batch.sorbet_schema().timestamps,
566                )?
567            }
568
569            LogMsg::BlueprintActivationCommand(_) => {
570                // Not for us to handle
571                vec![]
572            }
573        };
574
575        Ok(store_events)
576    }
577
578    /// Used mostly for tests
579    pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> {
580        self.add_chunk_with_timestamp_metadata(chunk, &Default::default())
581    }
582
583    fn add_chunk_with_timestamp_metadata(
584        &mut self,
585        chunk: &Arc<Chunk>,
586        chunk_timestamps: &re_sorbet::TimestampMetadata,
587    ) -> Result<Vec<ChunkStoreEvent>, Error> {
588        let store_events = self.storage_engine.write().store().insert_chunk(chunk)?;
589
590        self.entity_path_from_hash
591            .entry(chunk.entity_path().hash())
592            .or_insert_with(|| chunk.entity_path().clone());
593
594        if self.latest_row_id < chunk.row_id_range().map(|(_, row_id_max)| row_id_max) {
595            self.latest_row_id = chunk.row_id_range().map(|(_, row_id_max)| row_id_max);
596        }
597
598        self.rrd_manifest_index.mark_as_loaded(chunk.id());
599
600        self.on_store_events(&store_events);
601
602        // We inform the stats last, since it measures e2e latency.
603        // We only care about latency metrics during ingestion (adding a chunk)
604        // which is why we only call it here, and not inside of `on_store_events`
605        // (we need the `chunk_timestamps`).
606        self.stats.on_events(chunk_timestamps, &store_events);
607
608        Ok(store_events)
609    }
610
611    /// We call this on any changes, before returning the store events to the outsider caller.
612    fn on_store_events(&mut self, store_events: &[ChunkStoreEvent]) {
613        re_tracing::profile_function!();
614
615        let mut engine = self.storage_engine.write();
616
617        engine.cache().on_events(store_events);
618
619        let engine = engine.downgrade();
620
621        self.rrd_manifest_index.on_events(store_events);
622
623        // Update our internal views by notifying them of resulting [`ChunkStoreEvent`]s.
624        self.time_histogram_per_timeline
625            .on_events(&self.rrd_manifest_index, store_events);
626        self.rrd_manifest_index
627            .entity_tree
628            .on_store_additions(store_events);
629
630        // It is possible for writes to trigger deletions: specifically in the case of
631        // overwritten static data leading to dangling chunks.
632        let entity_paths_with_deletions = store_events
633            .iter()
634            .filter(|event| event.kind == ChunkStoreDiffKind::Deletion)
635            .map(|event| event.chunk.entity_path().clone())
636            .collect();
637
638        {
639            re_tracing::profile_scope!("on_store_deletions");
640            self.rrd_manifest_index.entity_tree.on_store_deletions(
641                &engine,
642                &entity_paths_with_deletions,
643                store_events,
644            );
645        }
646    }
647
648    pub fn set_store_info(&mut self, store_info: SetStoreInfo) {
649        self.set_store_info = Some(store_info);
650    }
651
652    /// Free up some RAM by forgetting the older parts of all timelines.
653    pub fn purge_fraction_of_ram(
654        &mut self,
655        fraction_to_purge: f32,
656        time_cursor: Option<(Timeline, TimeInt)>,
657    ) -> Vec<ChunkStoreEvent> {
658        re_tracing::profile_function!();
659
660        assert!((0.0..=1.0).contains(&fraction_to_purge));
661
662        let store_events = self.gc(&GarbageCollectionOptions {
663            target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _),
664            protect_latest: 1,
665            time_budget: DEFAULT_GC_TIME_BUDGET,
666
667            // TODO(emilk): we could protect the data that is currently being viewed
668            // (e.g. when paused in the live camera example).
669            // To be perfect it would need margins (because of latest-at), i.e. we would need to know
670            // exactly how far back the latest-at is of each component at the current time…
671            // …but maybe it doesn't have to be perfect.
672            protected_time_ranges: Default::default(),
673
674            furthest_from: time_cursor.map(|(timeline, time)| (*timeline.name(), time)),
675        });
676
677        if store_events.is_empty() {
678            // If we weren't able to collect any data, then we need to GC the cache itself in order
679            // to regain some space.
680            // See <https://github.com/rerun-io/rerun/issues/7369#issuecomment-2335164098> for the
681            // complete rationale.
682            self.storage_engine
683                .write()
684                .cache()
685                .purge_fraction_of_ram(fraction_to_purge);
686        } else {
687            self.on_store_events(&store_events);
688        }
689
690        store_events
691    }
692
693    pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> {
694        re_tracing::profile_function!();
695
696        let (store_events, stats_diff) = self.storage_engine.write().store().gc(gc_options);
697
698        re_log::trace!(
699            num_row_ids_dropped = store_events.len(),
700            size_bytes_dropped = re_format::format_bytes(stats_diff.total().total_size_bytes as _),
701            "purged datastore"
702        );
703
704        self.on_store_events(&store_events);
705
706        store_events
707    }
708
709    /// Drop all events in the given time range from the given timeline.
710    ///
711    /// Used to implement undo (erase the last event from the blueprint db).
712    pub fn drop_time_range(
713        &mut self,
714        timeline: &TimelineName,
715        drop_range: AbsoluteTimeRange,
716    ) -> Vec<ChunkStoreEvent> {
717        re_tracing::profile_function!();
718
719        let store_events = self
720            .storage_engine
721            .write()
722            .store()
723            .drop_time_range(timeline, drop_range);
724
725        self.on_store_events(&store_events);
726
727        store_events
728    }
729
730    /// Unconditionally drops all the data for a given [`EntityPath`] .
731    ///
732    /// This is _not_ recursive. Children of this entity will not be affected.
733    ///
734    /// To drop the entire subtree below an entity, see: [`Self::drop_entity_path_recursive`].
735    pub fn drop_entity_path(&mut self, entity_path: &EntityPath) {
736        re_tracing::profile_function!();
737
738        let store_events = self
739            .storage_engine
740            .write()
741            .store()
742            .drop_entity_path(entity_path);
743
744        self.on_store_events(&store_events);
745    }
746
747    /// Unconditionally drops all the data for a given [`EntityPath`] and all its children.
748    pub fn drop_entity_path_recursive(&mut self, entity_path: &EntityPath) {
749        re_tracing::profile_function!();
750
751        let mut to_drop = vec![entity_path.clone()];
752
753        if let Some(tree) = self.tree().subtree(entity_path) {
754            tree.visit_children_recursively(|path| {
755                to_drop.push(path.clone());
756            });
757        }
758
759        for entity_path in to_drop {
760            self.drop_entity_path(&entity_path);
761        }
762    }
763
764    /// Export the contents of the current database to a sequence of messages.
765    ///
766    /// If `time_selection` is specified, then only data for that specific timeline over that
767    /// specific time range will be accounted for.
768    pub fn to_messages(
769        &self,
770        time_selection: Option<(TimelineName, AbsoluteTimeRangeF)>,
771    ) -> impl Iterator<Item = ChunkResult<LogMsg>> + '_ {
772        re_tracing::profile_function!();
773
774        let engine = self.storage_engine.read();
775
776        let set_store_info_msg = self
777            .store_info_msg()
778            .map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone())));
779
780        let data_messages = {
781            let time_filter = time_selection.map(|(timeline, range)| {
782                (
783                    timeline,
784                    AbsoluteTimeRange::new(range.min.floor(), range.max.ceil()),
785                )
786            });
787
788            let mut chunks: Vec<Arc<Chunk>> = engine
789                .store()
790                .iter_chunks()
791                .filter(move |chunk| {
792                    if chunk.is_static() {
793                        return true; // always keep all static data
794                    }
795
796                    let Some((timeline, time_range)) = time_filter else {
797                        return true; // no filter -> keep all data
798                    };
799
800                    // TODO(cmc): chunk.slice_time_selection(time_selection)
801                    chunk
802                        .timelines()
803                        .get(&timeline)
804                        .is_some_and(|time_column| time_range.intersects(time_column.time_range()))
805                })
806                .cloned() // refcount
807                .collect();
808
809            // Try to roughly preserve the order of the chunks
810            // from how they were originally logged.
811            // See https://github.com/rerun-io/rerun/issues/7175 for why.
812            chunks.sort_by_key(|chunk| chunk.row_id_range().map(|(min, _)| min));
813
814            chunks.into_iter().map(|chunk| {
815                chunk
816                    .to_arrow_msg()
817                    .map(|msg| LogMsg::ArrowMsg(self.store_id().clone(), msg))
818            })
819        };
820
821        // If this is a blueprint, make sure to include the `BlueprintActivationCommand` message.
822        // We generally use `to_messages` to export a blueprint via "save". In that
823        // case, we want to make the blueprint active and default when it's reloaded.
824        // TODO(jleibs): Coupling this with the stored file instead of injecting seems
825        // architecturally weird. Would be great if we didn't need this in `.rbl` files
826        // at all.
827        let blueprint_ready = if self.store_kind() == StoreKind::Blueprint {
828            let activate_cmd =
829                re_log_types::BlueprintActivationCommand::make_active(self.store_id().clone());
830
831            itertools::Either::Left(std::iter::once(Ok(activate_cmd.into())))
832        } else {
833            itertools::Either::Right(std::iter::empty())
834        };
835
836        set_store_info_msg
837            .into_iter()
838            .chain(data_messages)
839            .chain(blueprint_ready)
840    }
841
842    /// Make a clone of this [`EntityDb`], assigning it a new [`StoreId`].
843    pub fn clone_with_new_id(&self, new_id: StoreId) -> Result<Self, Error> {
844        re_tracing::profile_function!();
845
846        let mut new_db = Self::new(new_id.clone());
847
848        new_db.last_modified_at = self.last_modified_at;
849        new_db.latest_row_id = self.latest_row_id;
850
851        // We do NOT clone the `data_source`, because the reason we clone an entity db
852        // is so that we can modify it, and then it would be wrong to say its from the same source.
853        // Specifically: if we load a blueprint from an `.rdd`, then modify it heavily and save it,
854        // it would be wrong to claim that this was the blueprint from that `.rrd`,
855        // and it would confuse the user.
856        // TODO(emilk): maybe we should use a special `Cloned` data source,
857        // wrapping either the original source, the original StoreId, or both.
858
859        if let Some(store_info) = self.store_info() {
860            let mut new_info = store_info.clone();
861            new_info.store_id = new_id;
862            new_info.cloned_from = Some(self.store_id().clone());
863
864            new_db.set_store_info(SetStoreInfo {
865                row_id: *RowId::new(),
866                info: new_info,
867            });
868        }
869
870        let engine = self.storage_engine.read();
871        for chunk in engine.store().iter_chunks() {
872            new_db.add_chunk(&Arc::clone(chunk))?;
873        }
874
875        Ok(new_db)
876    }
877}
878
879/// ## Stats
880impl EntityDb {
881    /// Returns the stats for the static store of the entity and all its children, recursively.
882    ///
883    /// This excludes temporal data.
884    pub fn subtree_stats_static(
885        &self,
886        engine: &StorageEngineReadGuard<'_>,
887        entity_path: &EntityPath,
888    ) -> ChunkStoreChunkStats {
889        re_tracing::profile_function!();
890
891        let Some(subtree) = self.tree().subtree(entity_path) else {
892            return Default::default();
893        };
894
895        let mut stats = ChunkStoreChunkStats::default();
896        subtree.visit_children_recursively(|path| {
897            stats += engine.store().entity_stats_static(path);
898        });
899
900        stats
901    }
902
903    /// Returns the stats for the entity and all its children on the given timeline, recursively.
904    ///
905    /// This excludes static data.
906    pub fn subtree_stats_on_timeline(
907        &self,
908        engine: &StorageEngineReadGuard<'_>,
909        entity_path: &EntityPath,
910        timeline: &TimelineName,
911    ) -> ChunkStoreChunkStats {
912        re_tracing::profile_function!();
913
914        let Some(subtree) = self.tree().subtree(entity_path) else {
915            return Default::default();
916        };
917
918        let mut stats = ChunkStoreChunkStats::default();
919        subtree.visit_children_recursively(|path| {
920            stats += engine.store().entity_stats_on_timeline(path, timeline);
921        });
922
923        stats
924    }
925
926    /// Returns true if an entity or any of its children have any data on the given timeline.
927    ///
928    /// This includes static data.
929    pub fn subtree_has_data_on_timeline(
930        &self,
931        engine: &StorageEngineReadGuard<'_>,
932        timeline: &TimelineName,
933        entity_path: &EntityPath,
934    ) -> bool {
935        re_tracing::profile_function!();
936
937        let Some(subtree) = self.tree().subtree(entity_path) else {
938            return false;
939        };
940
941        subtree
942            .find_first_child_recursive(|path| {
943                self.rrd_manifest_index
944                    .entity_has_data_on_timeline(path, timeline)
945                    || engine.store().entity_has_data_on_timeline(timeline, path)
946            })
947            .is_some()
948    }
949
950    /// Returns true if an entity or any of its children have any temporal data on the given timeline.
951    ///
952    /// This ignores static data.
953    pub fn subtree_has_temporal_data_on_timeline(
954        &self,
955        engine: &StorageEngineReadGuard<'_>,
956        timeline: &TimelineName,
957        entity_path: &EntityPath,
958    ) -> bool {
959        re_tracing::profile_function!();
960
961        let Some(subtree) = self.tree().subtree(entity_path) else {
962            return false;
963        };
964
965        subtree
966            .find_first_child_recursive(|path| {
967                self.rrd_manifest_index
968                    .entity_has_temporal_data_on_timeline(path, timeline)
969                    || engine
970                        .store()
971                        .entity_has_temporal_data_on_timeline(timeline, path)
972            })
973            .is_some()
974    }
975
976    /// Returns true if an entity has any temporal data on the given timeline.
977    ///
978    /// This ignores static data.
979    pub fn entity_has_temporal_data_on_timeline(
980        &self,
981        engine: &StorageEngineReadGuard<'_>,
982        timeline: &TimelineName,
983        entity_path: &EntityPath,
984    ) -> bool {
985        re_tracing::profile_function!();
986
987        self.rrd_manifest_index
988            .entity_has_temporal_data_on_timeline(entity_path, timeline)
989            || engine
990                .store()
991                .entity_has_temporal_data_on_timeline(timeline, entity_path)
992    }
993}
994
995impl re_byte_size::SizeBytes for EntityDb {
996    #[inline]
997    fn heap_size_bytes(&self) -> u64 {
998        // TODO(emilk): size of entire EntityDb, including secondary indices etc
999        self.storage_engine
1000            .read()
1001            .store()
1002            .stats()
1003            .total()
1004            .total_size_bytes
1005    }
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010    use std::sync::Arc;
1011
1012    use re_chunk::{Chunk, RowId};
1013    use re_log_types::example_components::{MyPoint, MyPoints};
1014    use re_log_types::{StoreId, TimePoint, Timeline};
1015
1016    use super::*;
1017
1018    #[test]
1019    fn format_with_components() -> anyhow::Result<()> {
1020        re_log::setup_logging();
1021
1022        let mut db = EntityDb::new(StoreId::random(
1023            re_log_types::StoreKind::Recording,
1024            "test_app",
1025        ));
1026
1027        let timeline_frame = Timeline::new_sequence("frame");
1028
1029        // Add some test data
1030        {
1031            let row_id = RowId::new();
1032            let timepoint = TimePoint::from_iter([(timeline_frame, 10)]);
1033            let point = MyPoint::new(1.0, 2.0);
1034            let chunk = Chunk::builder("parent/child1/grandchild")
1035                .with_component_batches(
1036                    row_id,
1037                    timepoint,
1038                    [(MyPoints::descriptor_points(), &[point] as _)],
1039                )
1040                .build()?;
1041
1042            db.add_chunk(&Arc::new(chunk))?;
1043        }
1044
1045        assert_eq!(
1046            db.format_with_components(),
1047            "/parent\n  /parent/child1\n    /parent/child1/grandchild\n      example.MyPoint: Struct[2]\n"
1048        );
1049
1050        Ok(())
1051    }
1052}