1use std::fmt::{Debug, Formatter};
2use std::sync::Arc;
3
4use nohash_hasher::IntMap;
5use re_chunk::{
6 Chunk, ChunkBuilder, ChunkId, ChunkResult, ComponentIdentifier, LatestAtQuery, RowId, TimeInt,
7 TimePoint, Timeline, TimelineName,
8};
9use re_chunk_store::{
10 ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreDiffKind, ChunkStoreEvent,
11 ChunkStoreHandle, ChunkStoreSubscriber as _, GarbageCollectionOptions, GarbageCollectionTarget,
12};
13use re_log_channel::LogSource;
14use re_log_encoding::RrdManifest;
15use re_log_types::{
16 AbsoluteTimeRange, AbsoluteTimeRangeF, ApplicationId, EntityPath, EntityPathHash, LogMsg,
17 RecordingId, SetStoreInfo, StoreId, StoreInfo, StoreKind, TimeType,
18};
19use re_query::{
20 QueryCache, QueryCacheHandle, StorageEngine, StorageEngineArcReadGuard, StorageEngineReadGuard,
21};
22
23use crate::ingestion_statistics::IngestionStatistics;
24use crate::rrd_manifest_index::RrdManifestIndex;
25use crate::{Error, TimeHistogramPerTimeline};
26
27pub const DEFAULT_GC_TIME_BUDGET: std::time::Duration = std::time::Duration::from_micros(3500); #[derive(Debug, PartialEq, Eq)]
40pub enum EntityDbClass<'a> {
41 LocalRecording,
43
44 ExampleRecording,
46
47 DatasetSegment(&'a re_uri::DatasetSegmentUri),
49
50 Blueprint,
52}
53
54impl EntityDbClass<'_> {
55 pub fn is_example(&self) -> bool {
56 matches!(self, EntityDbClass::ExampleRecording)
57 }
58}
59
60#[derive(Clone)] pub struct EntityDb {
67 store_id: StoreId,
70
71 pub data_source: Option<re_log_channel::LogSource>,
75
76 rrd_manifest_index: RrdManifestIndex,
77
78 set_store_info: Option<SetStoreInfo>,
80
81 last_modified_at: web_time::Instant,
83
84 latest_row_id: Option<RowId>,
88
89 entity_path_from_hash: IntMap<EntityPathHash, EntityPath>,
91
92 time_histogram_per_timeline: crate::TimeHistogramPerTimeline,
94
95 storage_engine: StorageEngine,
105
106 stats: IngestionStatistics,
107}
108
109impl Debug for EntityDb {
110 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("EntityDb")
112 .field("store_id", &self.store_id)
113 .field("data_source", &self.data_source)
114 .field("set_store_info", &self.set_store_info)
115 .finish()
116 }
117}
118
119impl EntityDb {
120 pub fn new(store_id: StoreId) -> Self {
121 Self::with_store_config(store_id, ChunkStoreConfig::from_env().unwrap_or_default())
122 }
123
124 pub fn with_store_config(store_id: StoreId, store_config: ChunkStoreConfig) -> Self {
125 let store = ChunkStoreHandle::new(ChunkStore::new(store_id.clone(), store_config));
126 let cache = QueryCacheHandle::new(QueryCache::new(store.clone()));
127
128 #[expect(unsafe_code)]
130 let storage_engine = unsafe { StorageEngine::new(store, cache) };
131
132 Self {
133 store_id,
134 data_source: None,
135 rrd_manifest_index: Default::default(),
136 set_store_info: None,
137 last_modified_at: web_time::Instant::now(),
138 latest_row_id: None,
139 entity_path_from_hash: Default::default(),
140 time_histogram_per_timeline: Default::default(),
141 storage_engine,
142 stats: IngestionStatistics::default(),
143 }
144 }
145
146 #[inline]
147 pub fn tree(&self) -> &crate::EntityTree {
148 &self.rrd_manifest_index.entity_tree
149 }
150
151 pub fn format_with_components(&self) -> String {
153 let mut text = String::new();
154
155 let storage_engine = self.storage_engine();
156 let store = storage_engine.store();
157
158 self.tree().visit_children_recursively(|entity_path| {
159 if entity_path.is_root() {
160 return;
161 }
162 let depth = entity_path.len() - 1;
163 let indent = " ".repeat(depth);
164 text.push_str(&format!("{indent}{entity_path}\n"));
165 let Some(components) = store.all_components_for_entity_sorted(entity_path) else {
166 return;
167 };
168 for component in components {
169 let component_indent = " ".repeat(depth + 1);
170 if let Some(component_descr) =
171 store.entity_component_descriptor(entity_path, component)
172 && let Some(component_type) = &component_descr.component_type
173 {
174 if let Some(datatype) = store.lookup_datatype(component_type) {
175 text.push_str(&format!(
176 "{}{}: {}\n",
177 component_indent,
178 component_type.short_name(),
179 re_arrow_util::format_data_type(&datatype)
180 ));
181 } else {
182 text.push_str(&format!(
183 "{}{}\n",
184 component_indent,
185 component_type.short_name()
186 ));
187 }
188 } else {
189 text.push_str(&format!("{component_indent}{component}\n"));
191 }
192 }
193 });
194 text
195 }
196
197 #[inline]
199 pub fn storage_engine(&self) -> StorageEngineReadGuard<'_> {
200 self.storage_engine.read()
201 }
202
203 #[expect(unsafe_code)]
217 pub unsafe fn storage_engine_raw(&self) -> &StorageEngine {
218 &self.storage_engine
219 }
220
221 #[inline]
228 pub fn storage_engine_arc(&self) -> StorageEngineArcReadGuard {
229 self.storage_engine.read_arc()
230 }
231
232 #[inline]
233 pub fn rrd_manifest_index(&self) -> &RrdManifestIndex {
234 &self.rrd_manifest_index
235 }
236
237 #[inline]
238 pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
239 self.set_store_info.as_ref()
240 }
241
242 #[inline]
243 pub fn store_info(&self) -> Option<&StoreInfo> {
244 self.store_info_msg().map(|msg| &msg.info)
245 }
246
247 #[inline]
248 pub fn application_id(&self) -> &ApplicationId {
249 self.store_id().application_id()
250 }
251
252 #[inline]
253 pub fn recording_id(&self) -> &RecordingId {
254 self.store_id().recording_id()
255 }
256
257 #[inline]
258 pub fn store_kind(&self) -> StoreKind {
259 self.store_id().kind()
260 }
261
262 #[inline]
263 pub fn store_id(&self) -> &StoreId {
264 &self.store_id
265 }
266
267 pub fn store_class(&self) -> EntityDbClass<'_> {
269 match self.store_kind() {
270 StoreKind::Blueprint => EntityDbClass::Blueprint,
271
272 StoreKind::Recording => match &self.data_source {
273 Some(LogSource::RrdHttpStream { url, .. })
274 if url.starts_with("https://app.rerun.io") =>
275 {
276 EntityDbClass::ExampleRecording
277 }
278
279 Some(LogSource::RedapGrpcStream { uri, .. }) => EntityDbClass::DatasetSegment(uri),
280
281 _ => EntityDbClass::LocalRecording,
282 },
283 }
284 }
285
286 pub fn recording_info_property<C: re_types_core::Component>(
288 &self,
289 component: ComponentIdentifier,
290 ) -> Option<C> {
291 debug_assert!(
292 component.starts_with("RecordingInfo:"),
293 "This function should only be used for built-in RecordingInfo components, which are the only recording properties at {}",
294 EntityPath::properties()
295 );
296
297 self.latest_at_component::<C>(
298 &EntityPath::properties(),
299 &LatestAtQuery::latest(TimelineName::log_tick()),
300 component,
301 )
302 .map(|(_, value)| value)
303 }
304
305 pub fn set_recording_property<Component: re_types_core::Component>(
308 &mut self,
309 entity_path: EntityPath,
310 component_descr: re_types_core::ComponentDescriptor,
311 value: &Component,
312 ) -> Result<(), Error> {
313 debug_assert_eq!(component_descr.component_type, Some(Component::name()));
314 debug_assert!(entity_path.starts_with(&EntityPath::properties()));
315 debug_assert!(
316 (entity_path == EntityPath::properties())
317 == (component_descr.archetype == Some("rerun.archetypes.RecordingInfo".into())),
318 "RecordingInfo should be logged at {}. Custom properties should be under a child entity",
319 EntityPath::properties()
320 );
321
322 let chunk = ChunkBuilder::new(ChunkId::new(), entity_path)
323 .with_component(RowId::new(), TimePoint::STATIC, component_descr, value)
324 .map_err(|err| Error::Chunk(err.into()))?
325 .build()?;
326
327 self.add_chunk(&Arc::new(chunk))?;
328
329 Ok(())
330 }
331
332 pub fn timeline_type(&self, timeline_name: &TimelineName) -> TimeType {
333 self.storage_engine()
334 .store()
335 .time_column_type(timeline_name)
336 .unwrap_or_else(|| {
337 if timeline_name == &TimelineName::log_time() {
338 Timeline::log_time().typ()
339 } else if timeline_name == &TimelineName::log_tick() {
340 Timeline::log_tick().typ()
341 } else {
342 re_log::warn_once!("Timeline {timeline_name:?} not found");
343 TimeType::Sequence
344 }
345 })
346 }
347
348 #[inline]
354 pub fn latest_at(
355 &self,
356 query: &re_chunk_store::LatestAtQuery,
357 entity_path: &EntityPath,
358 components: impl IntoIterator<Item = ComponentIdentifier>,
359 ) -> re_query::LatestAtResults {
360 self.storage_engine
361 .read()
362 .cache()
363 .latest_at(query, entity_path, components)
364 }
365
366 #[inline]
375 pub fn latest_at_component<C: re_types_core::Component>(
376 &self,
377 entity_path: &EntityPath,
378 query: &re_chunk_store::LatestAtQuery,
379 component: ComponentIdentifier,
380 ) -> Option<((TimeInt, RowId), C)> {
381 let results = self
382 .storage_engine
383 .read()
384 .cache()
385 .latest_at(query, entity_path, [component]);
386 results
387 .component_mono(component)
388 .map(|value| (results.index(), value))
389 }
390
391 #[inline]
400 pub fn latest_at_component_quiet<C: re_types_core::Component>(
401 &self,
402 entity_path: &EntityPath,
403 query: &re_chunk_store::LatestAtQuery,
404 component: ComponentIdentifier,
405 ) -> Option<((TimeInt, RowId), C)> {
406 let results = self
407 .storage_engine
408 .read()
409 .cache()
410 .latest_at(query, entity_path, [component]);
411
412 results
413 .component_mono_quiet(component)
414 .map(|value| (results.index(), value))
415 }
416
417 #[inline]
418 pub fn latest_at_component_at_closest_ancestor<C: re_types_core::Component>(
419 &self,
420 entity_path: &EntityPath,
421 query: &re_chunk_store::LatestAtQuery,
422 component: ComponentIdentifier,
423 ) -> Option<(EntityPath, (TimeInt, RowId), C)> {
424 re_tracing::profile_function!();
425
426 let mut cur_entity_path = Some(entity_path.clone());
427 while let Some(entity_path) = cur_entity_path {
428 if let Some((index, value)) = self.latest_at_component(&entity_path, query, component) {
429 return Some((entity_path, index, value));
430 }
431 cur_entity_path = entity_path.parent();
432 }
433
434 None
435 }
436
437 #[inline]
446 pub fn cloned_from(&self) -> Option<&StoreId> {
447 let info = self.store_info()?;
448 info.cloned_from.as_ref()
449 }
450
451 pub fn timelines(&self) -> std::collections::BTreeMap<TimelineName, Timeline> {
452 self.storage_engine().store().timelines()
453 }
454
455 pub fn timeline_histograms(&self) -> &TimeHistogramPerTimeline {
457 &self.time_histogram_per_timeline
458 }
459
460 pub fn time_range_for(&self, timeline: &TimelineName) -> Option<AbsoluteTimeRange> {
462 self.storage_engine().store().time_range(timeline)
463 }
464
465 pub fn time_histogram(&self, timeline: &TimelineName) -> Option<&crate::TimeHistogram> {
467 self.time_histogram_per_timeline.get(timeline)
468 }
469
470 #[inline]
471 pub fn num_rows(&self) -> u64 {
472 self.storage_engine.read().store().stats().total().num_rows
473 }
474
475 #[inline]
478 pub fn generation(&self) -> re_chunk_store::ChunkStoreGeneration {
479 self.storage_engine.read().store().generation()
480 }
481
482 #[inline]
483 pub fn last_modified_at(&self) -> web_time::Instant {
484 self.last_modified_at
485 }
486
487 #[inline]
491 pub fn latest_row_id(&self) -> Option<RowId> {
492 self.latest_row_id
493 }
494
495 #[inline]
496 pub fn is_empty(&self) -> bool {
497 self.set_store_info.is_none() && self.num_rows() == 0
498 }
499
500 pub fn entity_paths(&self) -> Vec<&EntityPath> {
502 use itertools::Itertools as _;
503 self.entity_path_from_hash.values().sorted().collect()
504 }
505
506 #[inline]
507 pub fn ingestion_stats(&self) -> &IngestionStatistics {
508 &self.stats
509 }
510
511 #[inline]
512 pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
513 self.entity_path_from_hash.get(entity_path_hash)
514 }
515
516 #[inline]
518 pub fn is_known_entity(&self, entity_path: &EntityPath) -> bool {
519 self.tree().subtree(entity_path).is_some()
520 }
521
522 #[inline]
525 pub fn is_logged_entity(&self, entity_path: &EntityPath) -> bool {
526 self.entity_path_from_hash.contains_key(&entity_path.hash())
527 }
528
529 pub fn add_rrd_manifest_message(&mut self, rrd_manifest: RrdManifest) {
530 re_tracing::profile_function!();
531 re_log::debug!("Received RrdManifest for {:?}", self.store_id());
532
533 if let Err(err) = self
534 .time_histogram_per_timeline
535 .on_rrd_manifest(&rrd_manifest)
536 {
537 re_log::error!("Failed to ingest RRD Manifest: {err}");
538 }
539
540 if let Err(err) = self.rrd_manifest_index.append(rrd_manifest) {
541 re_log::error!("Failed to load RRD Manifest: {err}");
542 }
543 }
544
545 pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> {
546 re_tracing::profile_function!();
547
548 debug_assert_eq!(msg.store_id(), self.store_id());
549
550 let store_events = match &msg {
551 LogMsg::SetStoreInfo(msg) => {
552 self.set_store_info(msg.clone());
553 vec![]
554 }
555
556 LogMsg::ArrowMsg(_, arrow_msg) => {
557 self.last_modified_at = web_time::Instant::now();
558
559 let chunk_batch = re_sorbet::ChunkBatch::try_from(&arrow_msg.batch)
560 .map_err(re_chunk::ChunkError::from)?;
561 let mut chunk = re_chunk::Chunk::from_chunk_batch(&chunk_batch)?;
562 chunk.sort_if_unsorted();
563 self.add_chunk_with_timestamp_metadata(
564 &Arc::new(chunk),
565 &chunk_batch.sorbet_schema().timestamps,
566 )?
567 }
568
569 LogMsg::BlueprintActivationCommand(_) => {
570 vec![]
572 }
573 };
574
575 Ok(store_events)
576 }
577
578 pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> {
580 self.add_chunk_with_timestamp_metadata(chunk, &Default::default())
581 }
582
583 fn add_chunk_with_timestamp_metadata(
584 &mut self,
585 chunk: &Arc<Chunk>,
586 chunk_timestamps: &re_sorbet::TimestampMetadata,
587 ) -> Result<Vec<ChunkStoreEvent>, Error> {
588 let store_events = self.storage_engine.write().store().insert_chunk(chunk)?;
589
590 self.entity_path_from_hash
591 .entry(chunk.entity_path().hash())
592 .or_insert_with(|| chunk.entity_path().clone());
593
594 if self.latest_row_id < chunk.row_id_range().map(|(_, row_id_max)| row_id_max) {
595 self.latest_row_id = chunk.row_id_range().map(|(_, row_id_max)| row_id_max);
596 }
597
598 self.rrd_manifest_index.mark_as_loaded(chunk.id());
599
600 self.on_store_events(&store_events);
601
602 self.stats.on_events(chunk_timestamps, &store_events);
607
608 Ok(store_events)
609 }
610
611 fn on_store_events(&mut self, store_events: &[ChunkStoreEvent]) {
613 re_tracing::profile_function!();
614
615 let mut engine = self.storage_engine.write();
616
617 engine.cache().on_events(store_events);
618
619 let engine = engine.downgrade();
620
621 self.rrd_manifest_index.on_events(store_events);
622
623 self.time_histogram_per_timeline
625 .on_events(&self.rrd_manifest_index, store_events);
626 self.rrd_manifest_index
627 .entity_tree
628 .on_store_additions(store_events);
629
630 let entity_paths_with_deletions = store_events
633 .iter()
634 .filter(|event| event.kind == ChunkStoreDiffKind::Deletion)
635 .map(|event| event.chunk.entity_path().clone())
636 .collect();
637
638 {
639 re_tracing::profile_scope!("on_store_deletions");
640 self.rrd_manifest_index.entity_tree.on_store_deletions(
641 &engine,
642 &entity_paths_with_deletions,
643 store_events,
644 );
645 }
646 }
647
648 pub fn set_store_info(&mut self, store_info: SetStoreInfo) {
649 self.set_store_info = Some(store_info);
650 }
651
652 pub fn purge_fraction_of_ram(
654 &mut self,
655 fraction_to_purge: f32,
656 time_cursor: Option<(Timeline, TimeInt)>,
657 ) -> Vec<ChunkStoreEvent> {
658 re_tracing::profile_function!();
659
660 assert!((0.0..=1.0).contains(&fraction_to_purge));
661
662 let store_events = self.gc(&GarbageCollectionOptions {
663 target: GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _),
664 protect_latest: 1,
665 time_budget: DEFAULT_GC_TIME_BUDGET,
666
667 protected_time_ranges: Default::default(),
673
674 furthest_from: time_cursor.map(|(timeline, time)| (*timeline.name(), time)),
675 });
676
677 if store_events.is_empty() {
678 self.storage_engine
683 .write()
684 .cache()
685 .purge_fraction_of_ram(fraction_to_purge);
686 } else {
687 self.on_store_events(&store_events);
688 }
689
690 store_events
691 }
692
693 pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> {
694 re_tracing::profile_function!();
695
696 let (store_events, stats_diff) = self.storage_engine.write().store().gc(gc_options);
697
698 re_log::trace!(
699 num_row_ids_dropped = store_events.len(),
700 size_bytes_dropped = re_format::format_bytes(stats_diff.total().total_size_bytes as _),
701 "purged datastore"
702 );
703
704 self.on_store_events(&store_events);
705
706 store_events
707 }
708
709 pub fn drop_time_range(
713 &mut self,
714 timeline: &TimelineName,
715 drop_range: AbsoluteTimeRange,
716 ) -> Vec<ChunkStoreEvent> {
717 re_tracing::profile_function!();
718
719 let store_events = self
720 .storage_engine
721 .write()
722 .store()
723 .drop_time_range(timeline, drop_range);
724
725 self.on_store_events(&store_events);
726
727 store_events
728 }
729
730 pub fn drop_entity_path(&mut self, entity_path: &EntityPath) {
736 re_tracing::profile_function!();
737
738 let store_events = self
739 .storage_engine
740 .write()
741 .store()
742 .drop_entity_path(entity_path);
743
744 self.on_store_events(&store_events);
745 }
746
747 pub fn drop_entity_path_recursive(&mut self, entity_path: &EntityPath) {
749 re_tracing::profile_function!();
750
751 let mut to_drop = vec![entity_path.clone()];
752
753 if let Some(tree) = self.tree().subtree(entity_path) {
754 tree.visit_children_recursively(|path| {
755 to_drop.push(path.clone());
756 });
757 }
758
759 for entity_path in to_drop {
760 self.drop_entity_path(&entity_path);
761 }
762 }
763
764 pub fn to_messages(
769 &self,
770 time_selection: Option<(TimelineName, AbsoluteTimeRangeF)>,
771 ) -> impl Iterator<Item = ChunkResult<LogMsg>> + '_ {
772 re_tracing::profile_function!();
773
774 let engine = self.storage_engine.read();
775
776 let set_store_info_msg = self
777 .store_info_msg()
778 .map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone())));
779
780 let data_messages = {
781 let time_filter = time_selection.map(|(timeline, range)| {
782 (
783 timeline,
784 AbsoluteTimeRange::new(range.min.floor(), range.max.ceil()),
785 )
786 });
787
788 let mut chunks: Vec<Arc<Chunk>> = engine
789 .store()
790 .iter_chunks()
791 .filter(move |chunk| {
792 if chunk.is_static() {
793 return true; }
795
796 let Some((timeline, time_range)) = time_filter else {
797 return true; };
799
800 chunk
802 .timelines()
803 .get(&timeline)
804 .is_some_and(|time_column| time_range.intersects(time_column.time_range()))
805 })
806 .cloned() .collect();
808
809 chunks.sort_by_key(|chunk| chunk.row_id_range().map(|(min, _)| min));
813
814 chunks.into_iter().map(|chunk| {
815 chunk
816 .to_arrow_msg()
817 .map(|msg| LogMsg::ArrowMsg(self.store_id().clone(), msg))
818 })
819 };
820
821 let blueprint_ready = if self.store_kind() == StoreKind::Blueprint {
828 let activate_cmd =
829 re_log_types::BlueprintActivationCommand::make_active(self.store_id().clone());
830
831 itertools::Either::Left(std::iter::once(Ok(activate_cmd.into())))
832 } else {
833 itertools::Either::Right(std::iter::empty())
834 };
835
836 set_store_info_msg
837 .into_iter()
838 .chain(data_messages)
839 .chain(blueprint_ready)
840 }
841
842 pub fn clone_with_new_id(&self, new_id: StoreId) -> Result<Self, Error> {
844 re_tracing::profile_function!();
845
846 let mut new_db = Self::new(new_id.clone());
847
848 new_db.last_modified_at = self.last_modified_at;
849 new_db.latest_row_id = self.latest_row_id;
850
851 if let Some(store_info) = self.store_info() {
860 let mut new_info = store_info.clone();
861 new_info.store_id = new_id;
862 new_info.cloned_from = Some(self.store_id().clone());
863
864 new_db.set_store_info(SetStoreInfo {
865 row_id: *RowId::new(),
866 info: new_info,
867 });
868 }
869
870 let engine = self.storage_engine.read();
871 for chunk in engine.store().iter_chunks() {
872 new_db.add_chunk(&Arc::clone(chunk))?;
873 }
874
875 Ok(new_db)
876 }
877}
878
879impl EntityDb {
881 pub fn subtree_stats_static(
885 &self,
886 engine: &StorageEngineReadGuard<'_>,
887 entity_path: &EntityPath,
888 ) -> ChunkStoreChunkStats {
889 re_tracing::profile_function!();
890
891 let Some(subtree) = self.tree().subtree(entity_path) else {
892 return Default::default();
893 };
894
895 let mut stats = ChunkStoreChunkStats::default();
896 subtree.visit_children_recursively(|path| {
897 stats += engine.store().entity_stats_static(path);
898 });
899
900 stats
901 }
902
903 pub fn subtree_stats_on_timeline(
907 &self,
908 engine: &StorageEngineReadGuard<'_>,
909 entity_path: &EntityPath,
910 timeline: &TimelineName,
911 ) -> ChunkStoreChunkStats {
912 re_tracing::profile_function!();
913
914 let Some(subtree) = self.tree().subtree(entity_path) else {
915 return Default::default();
916 };
917
918 let mut stats = ChunkStoreChunkStats::default();
919 subtree.visit_children_recursively(|path| {
920 stats += engine.store().entity_stats_on_timeline(path, timeline);
921 });
922
923 stats
924 }
925
926 pub fn subtree_has_data_on_timeline(
930 &self,
931 engine: &StorageEngineReadGuard<'_>,
932 timeline: &TimelineName,
933 entity_path: &EntityPath,
934 ) -> bool {
935 re_tracing::profile_function!();
936
937 let Some(subtree) = self.tree().subtree(entity_path) else {
938 return false;
939 };
940
941 subtree
942 .find_first_child_recursive(|path| {
943 self.rrd_manifest_index
944 .entity_has_data_on_timeline(path, timeline)
945 || engine.store().entity_has_data_on_timeline(timeline, path)
946 })
947 .is_some()
948 }
949
950 pub fn subtree_has_temporal_data_on_timeline(
954 &self,
955 engine: &StorageEngineReadGuard<'_>,
956 timeline: &TimelineName,
957 entity_path: &EntityPath,
958 ) -> bool {
959 re_tracing::profile_function!();
960
961 let Some(subtree) = self.tree().subtree(entity_path) else {
962 return false;
963 };
964
965 subtree
966 .find_first_child_recursive(|path| {
967 self.rrd_manifest_index
968 .entity_has_temporal_data_on_timeline(path, timeline)
969 || engine
970 .store()
971 .entity_has_temporal_data_on_timeline(timeline, path)
972 })
973 .is_some()
974 }
975
976 pub fn entity_has_temporal_data_on_timeline(
980 &self,
981 engine: &StorageEngineReadGuard<'_>,
982 timeline: &TimelineName,
983 entity_path: &EntityPath,
984 ) -> bool {
985 re_tracing::profile_function!();
986
987 self.rrd_manifest_index
988 .entity_has_temporal_data_on_timeline(entity_path, timeline)
989 || engine
990 .store()
991 .entity_has_temporal_data_on_timeline(timeline, entity_path)
992 }
993}
994
995impl re_byte_size::SizeBytes for EntityDb {
996 #[inline]
997 fn heap_size_bytes(&self) -> u64 {
998 self.storage_engine
1000 .read()
1001 .store()
1002 .stats()
1003 .total()
1004 .total_size_bytes
1005 }
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010 use std::sync::Arc;
1011
1012 use re_chunk::{Chunk, RowId};
1013 use re_log_types::example_components::{MyPoint, MyPoints};
1014 use re_log_types::{StoreId, TimePoint, Timeline};
1015
1016 use super::*;
1017
1018 #[test]
1019 fn format_with_components() -> anyhow::Result<()> {
1020 re_log::setup_logging();
1021
1022 let mut db = EntityDb::new(StoreId::random(
1023 re_log_types::StoreKind::Recording,
1024 "test_app",
1025 ));
1026
1027 let timeline_frame = Timeline::new_sequence("frame");
1028
1029 {
1031 let row_id = RowId::new();
1032 let timepoint = TimePoint::from_iter([(timeline_frame, 10)]);
1033 let point = MyPoint::new(1.0, 2.0);
1034 let chunk = Chunk::builder("parent/child1/grandchild")
1035 .with_component_batches(
1036 row_id,
1037 timepoint,
1038 [(MyPoints::descriptor_points(), &[point] as _)],
1039 )
1040 .build()?;
1041
1042 db.add_chunk(&Arc::new(chunk))?;
1043 }
1044
1045 assert_eq!(
1046 db.format_with_components(),
1047 "/parent\n /parent/child1\n /parent/child1/grandchild\n example.MyPoint: Struct[2]\n"
1048 );
1049
1050 Ok(())
1051 }
1052}