1use std::{
2 collections::{BTreeMap, VecDeque},
3 ops::Range,
4 sync::Arc,
5};
6
7use ahash::{HashMap, HashSet};
8use itertools::Itertools;
9use parking_lot::RwLock;
10use paste::paste;
11use seq_macro::seq;
12
13use re_data_store::{DataStore, LatestAtQuery, RangeQuery, StoreDiff, StoreEvent, StoreSubscriber};
14use re_log_types::{EntityPath, RowId, StoreId, TimeInt, TimeRange, Timeline};
15use re_query::ArchetypeView;
16use re_types_core::{
17 components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName, SizeBytes as _,
18};
19
20use crate::{ErasedFlatVecDeque, FlatVecDeque, LatestAtCache, RangeCache};
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub enum AnyQuery {
26 LatestAt(LatestAtQuery),
27 Range(RangeQuery),
28}
29
30impl From<LatestAtQuery> for AnyQuery {
31 #[inline]
32 fn from(query: LatestAtQuery) -> Self {
33 Self::LatestAt(query)
34 }
35}
36
37impl From<RangeQuery> for AnyQuery {
38 #[inline]
39 fn from(query: RangeQuery) -> Self {
40 Self::Range(query)
41 }
42}
43
44pub struct Caches {
48 store_id: StoreId,
50
51 per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>,
53}
54
55impl std::fmt::Debug for Caches {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 let Self {
58 store_id,
59 per_cache_key,
60 } = self;
61
62 let mut strings = Vec::new();
63
64 strings.push(format!("[Caches({store_id})]"));
65
66 let per_cache_key = per_cache_key.read();
67 let per_cache_key: BTreeMap<_, _> = per_cache_key.iter().collect();
68
69 for (cache_key, caches_per_archetype) in &per_cache_key {
70 let caches_per_archetype = caches_per_archetype.read();
71 strings.push(format!(
72 " [{cache_key:?} (pending_timeful={:?} pending_timeless={:?})]",
73 caches_per_archetype
74 .pending_timeful_invalidation
75 .map(|t| cache_key
76 .timeline
77 .format_time_range_utc(&TimeRange::new(t, TimeInt::MAX))),
78 caches_per_archetype.pending_timeless_invalidation,
79 ));
80 strings.push(indent::indent_all_by(
81 4,
82 format!("{caches_per_archetype:?}"),
83 ));
84 }
85
86 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
87 }
88}
89
90impl std::ops::Deref for Caches {
91 type Target = RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>;
92
93 #[inline]
94 fn deref(&self) -> &Self::Target {
95 &self.per_cache_key
96 }
97}
98
99impl Caches {
100 #[inline]
101 pub fn new(store: &DataStore) -> Self {
102 Self {
103 store_id: store.id().clone(),
104 per_cache_key: Default::default(),
105 }
106 }
107}
108
109#[derive(Default)]
110pub struct CachesPerArchetype {
111 pub(crate) latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,
123
124 pub(crate) range_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<RangeCache>>>>,
136
137 pending_timeful_invalidation: Option<TimeInt>,
145
146 pending_timeless_invalidation: bool,
154}
155
156impl std::fmt::Debug for CachesPerArchetype {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 let CachesPerArchetype {
159 latest_at_per_archetype,
160 range_per_archetype,
161 pending_timeful_invalidation: _,
162 pending_timeless_invalidation: _,
163 } = self;
164
165 let mut strings = Vec::new();
166
167 {
168 let latest_at_per_archetype = latest_at_per_archetype.read();
169 let latest_at_per_archetype: BTreeMap<_, _> = latest_at_per_archetype.iter().collect();
170
171 for (archetype_name, latest_at_cache) in &latest_at_per_archetype {
172 let latest_at_cache = latest_at_cache.read();
173 strings.push(format!(
174 "[latest_at for {archetype_name} ({})]",
175 re_format::format_bytes(latest_at_cache.total_size_bytes() as _)
176 ));
177 strings.push(indent::indent_all_by(2, format!("{latest_at_cache:?}")));
178 }
179 }
180
181 {
182 let range_per_archetype = range_per_archetype.read();
183 let range_per_archetype: BTreeMap<_, _> = range_per_archetype.iter().collect();
184
185 for (archetype_name, range_cache) in &range_per_archetype {
186 let range_cache = range_cache.read();
187 strings.push(format!(
188 "[range for {archetype_name} ({})]",
189 re_format::format_bytes(range_cache.total_size_bytes() as _)
190 ));
191 strings.push(indent::indent_all_by(2, format!("{range_cache:?}")));
192 }
193 }
194
195 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
196 }
197}
198
199impl Caches {
200 #[inline]
202 pub fn clear(&self) {
203 self.write().clear();
204 }
205
206 #[inline]
215 pub fn with_latest_at<A, F1, F2, R1, R2>(
216 &self,
217 store: &DataStore,
218 entity_path: EntityPath,
219 query: &LatestAtQuery,
220 mut upsert: F1,
221 mut iter: F2,
222 ) -> (Option<R1>, R2)
223 where
224 A: Archetype,
225 F1: FnMut(&mut LatestAtCache) -> R1,
226 F2: FnMut(&LatestAtCache) -> R2,
227 {
228 assert!(
229 self.store_id == *store.id(),
230 "attempted to use a query cache {} with the wrong datastore ({})",
231 self.store_id,
232 store.id(),
233 );
234
235 let key = CacheKey::new(entity_path, query.timeline);
236
237 let cache = {
238 let caches_per_archetype = Arc::clone(self.write().entry(key.clone()).or_default());
239 let removed_bytes = caches_per_archetype.write().handle_pending_invalidation();
242 if removed_bytes > 0 {
245 re_log::trace!(
246 store_id=%self.store_id,
247 entity_path = %key.entity_path,
248 removed = removed_bytes,
249 "invalidated latest-at caches"
250 );
251 }
252
253 let caches_per_archetype = caches_per_archetype.read();
254 let mut latest_at_per_archetype = caches_per_archetype.latest_at_per_archetype.write();
255 Arc::clone(latest_at_per_archetype.entry(A::name()).or_default())
256 };
259
260 let r1 = cache.try_write().map(|mut cache| upsert(&mut cache));
287 let r2 = iter(&cache.read_recursive());
297
298 (r1, r2)
299 }
300
301 #[inline]
309 pub fn with_range<A, F1, F2, R1, R2>(
310 &self,
311 store: &DataStore,
312 entity_path: EntityPath,
313 query: &RangeQuery,
314 mut upsert: F1,
315 mut iter: F2,
316 ) -> (Option<R1>, R2)
317 where
318 A: Archetype,
319 F1: FnMut(&mut RangeCache) -> R1,
320 F2: FnMut(&RangeCache) -> R2,
321 {
322 assert!(
323 self.store_id == *store.id(),
324 "attempted to use a query cache {} with the wrong datastore ({})",
325 self.store_id,
326 store.id(),
327 );
328
329 let key = CacheKey::new(entity_path, query.timeline);
330
331 let cache = {
332 let caches_per_archetype = Arc::clone(self.write().entry(key.clone()).or_default());
333 let removed_bytes = caches_per_archetype.write().handle_pending_invalidation();
336 if removed_bytes > 0 {
339 re_log::trace!(
340 store_id=%self.store_id,
341 entity_path = %key.entity_path,
342 removed = removed_bytes,
343 "invalidated range caches"
344 );
345 }
346
347 let caches_per_archetype = caches_per_archetype.read();
348 let mut range_per_archetype = caches_per_archetype.range_per_archetype.write();
349 Arc::clone(range_per_archetype.entry(A::name()).or_default())
350 };
353
354 let r1 = cache.try_write().map(|mut cache| upsert(&mut cache));
382 let r2 = iter(&cache.read_recursive());
392
393 (r1, r2)
394 }
395}
396
397#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
399pub struct CacheKey {
400 pub entity_path: EntityPath,
402
403 pub timeline: Timeline,
405}
406
407impl std::fmt::Debug for CacheKey {
408 #[inline]
409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410 let Self {
411 entity_path,
412 timeline,
413 } = self;
414 f.write_fmt(format_args!("{entity_path} on {}", timeline.name()))
415 }
416}
417
418impl CacheKey {
419 #[inline]
420 pub fn new(entity_path: impl Into<EntityPath>, timeline: impl Into<Timeline>) -> Self {
421 Self {
422 entity_path: entity_path.into(),
423 timeline: timeline.into(),
424 }
425 }
426}
427
428impl StoreSubscriber for Caches {
431 #[inline]
432 fn name(&self) -> String {
433 "rerun.store_subscribers.QueryCache".into()
434 }
435
436 #[inline]
437 fn as_any(&self) -> &dyn std::any::Any {
438 self
439 }
440
441 #[inline]
442 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
443 self
444 }
445
446 fn on_events(&mut self, events: &[StoreEvent]) {
447 re_tracing::profile_function!(format!("num_events={}", events.len()));
448
449 for event in events {
450 let StoreEvent {
451 store_id,
452 store_generation: _,
453 event_id: _,
454 diff,
455 } = event;
456
457 assert!(
458 self.store_id == *store_id,
459 "attempted to use a query cache {} with the wrong datastore ({})",
460 self.store_id,
461 store_id,
462 );
463
464 let StoreDiff {
465 kind: _, row_id: _,
467 times,
468 entity_path,
469 cells: _, } = diff;
471
472 #[derive(Default, Debug)]
473 struct CompactedEvents {
474 timeless: HashSet<EntityPath>,
475 timeful: HashMap<CacheKey, TimeInt>,
476 }
477
478 let mut compacted = CompactedEvents::default();
479 {
480 re_tracing::profile_scope!("compact events");
481
482 if times.is_empty() {
483 compacted.timeless.insert(entity_path.clone());
484 }
485
486 for &(timeline, time) in times {
487 let key = CacheKey::new(entity_path.clone(), timeline);
488 let min_time = compacted.timeful.entry(key).or_insert(TimeInt::MAX);
489 *min_time = TimeInt::min(*min_time, time);
490 }
491 }
492
493 let caches = self.write();
494 {
503 re_tracing::profile_scope!("timeless");
504
505 for entity_path in compacted.timeless {
506 for (key, caches_per_archetype) in caches.iter() {
507 if key.entity_path == entity_path {
508 caches_per_archetype.write().pending_timeless_invalidation = true;
509 }
510 }
511 }
512 }
513
514 {
515 re_tracing::profile_scope!("timeful");
516
517 for (key, time) in compacted.timeful {
518 if let Some(caches_per_archetype) = caches.get(&key) {
519 let mut caches_per_archetype = caches_per_archetype.write();
524 if let Some(min_time) =
525 caches_per_archetype.pending_timeful_invalidation.as_mut()
526 {
527 *min_time = TimeInt::min(*min_time, time);
528 } else {
529 caches_per_archetype.pending_timeful_invalidation = Some(time);
530 }
531 }
532 }
533 }
534 }
535 }
536}
537
538impl CachesPerArchetype {
539 fn handle_pending_invalidation(&mut self) -> u64 {
546 let pending_timeless_invalidation = self.pending_timeless_invalidation;
547 let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();
548
549 if !pending_timeless_invalidation && !pending_timeful_invalidation {
550 return 0;
551 }
552
553 re_tracing::profile_function!();
554
555 let time_threshold = self.pending_timeful_invalidation.unwrap_or(TimeInt::MAX);
556
557 self.pending_timeful_invalidation = None;
558 self.pending_timeless_invalidation = false;
559
560 if pending_timeless_invalidation {
563 re_tracing::profile_scope!("timeless");
564
565 let latest_at_removed_bytes = self
566 .latest_at_per_archetype
567 .read()
568 .values()
569 .map(|latest_at_cache| latest_at_cache.read().total_size_bytes())
570 .sum::<u64>();
571 let range_removed_bytes = self
572 .range_per_archetype
573 .read()
574 .values()
575 .map(|range_cache| range_cache.read().total_size_bytes())
576 .sum::<u64>();
577
578 *self = CachesPerArchetype::default();
579
580 return latest_at_removed_bytes + range_removed_bytes;
581 }
582
583 re_tracing::profile_scope!("timeful");
584
585 let mut removed_bytes = 0u64;
586
587 for latest_at_cache in self.latest_at_per_archetype.read().values() {
588 let mut latest_at_cache = latest_at_cache.write();
589 removed_bytes =
590 removed_bytes.saturating_add(latest_at_cache.truncate_at_time(time_threshold));
591 }
592
593 for range_cache in self.range_per_archetype.read().values() {
594 let mut range_cache = range_cache.write();
595 removed_bytes =
596 removed_bytes.saturating_add(range_cache.truncate_at_time(time_threshold));
597 }
598
599 removed_bytes
600 }
601}
602
603#[derive(Default)]
618pub struct CacheBucket {
619 pub(crate) data_times: VecDeque<(TimeInt, RowId)>,
630
631 pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
635
636 pub(crate) components: BTreeMap<ComponentName, Box<dyn ErasedFlatVecDeque + Send + Sync>>,
643
644 pub(crate) total_size_bytes: u64,
649 }
652
653impl std::fmt::Debug for CacheBucket {
654 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655 let Self {
656 data_times: _,
657 pov_instance_keys: _,
658 components,
659 total_size_bytes: _,
660 } = self;
661
662 let strings = components
663 .iter()
664 .filter(|(_, data)| data.dyn_num_values() > 0)
665 .map(|(name, data)| {
666 format!(
667 "{} {name} values spread across {} entries ({})",
668 data.dyn_num_values(),
669 data.dyn_num_entries(),
670 re_format::format_bytes(data.dyn_total_size_bytes() as _),
671 )
672 })
673 .collect_vec();
674
675 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
676 }
677}
678
679impl CacheBucket {
680 fn sanity_check(&self) {
682 if cfg!(debug_assertions) {
683 assert_eq!(self.data_times.len(), self.pov_instance_keys.num_entries());
684 let n = self.data_times.len();
685 for (name, data) in &self.components {
686 assert_eq!(data.dyn_num_entries(), n, "{name}");
687 }
688 }
689 }
690
691 #[inline]
692 pub fn time_range(&self) -> Option<TimeRange> {
693 let first_time = self.data_times.front().map(|(t, _)| *t)?;
694 let last_time = self.data_times.back().map(|(t, _)| *t)?;
695 Some(TimeRange::new(first_time, last_time))
696 }
697
698 #[inline]
699 pub fn contains_data_time(&self, data_time: TimeInt) -> bool {
700 let first_time = self.data_times.front().map_or(&TimeInt::MAX, |(t, _)| t);
701 let last_time = self.data_times.back().map_or(&TimeInt::MIN, |(t, _)| t);
702 *first_time <= data_time && data_time <= *last_time
703 }
704
705 #[inline]
706 pub fn contains_data_row(&self, data_time: TimeInt, row_id: RowId) -> bool {
707 self.data_times.binary_search(&(data_time, row_id)).is_ok()
708 }
709
710 #[inline]
712 pub fn num_entries(&self) -> usize {
713 self.data_times.len()
714 }
715
716 #[inline]
717 pub fn is_empty(&self) -> bool {
718 self.num_entries() == 0
719 }
720
721 #[inline]
725 pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
726 self.data_times.iter()
727 }
728
729 #[inline]
731 pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
732 self.pov_instance_keys.iter()
733 }
734
735 #[inline]
737 pub fn iter_component<C: Component + Send + Sync + 'static>(
738 &self,
739 ) -> Option<impl Iterator<Item = &[C]>> {
740 let data = self
741 .components
742 .get(&C::name())
743 .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
744 Some(data.iter())
745 }
746
747 #[inline]
749 pub fn iter_component_opt<C: Component + Send + Sync + 'static>(
750 &self,
751 ) -> Option<impl Iterator<Item = &[Option<C>]>> {
752 let data = self
753 .components
754 .get(&C::name())
755 .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
756 Some(data.iter())
757 }
758
759 #[inline]
774 pub fn entry_range(&self, time_range: TimeRange) -> Range<usize> {
775 let start_index = self
776 .data_times
777 .partition_point(|(data_time, _)| data_time < &time_range.min);
778 let end_index = self
779 .data_times
780 .partition_point(|(data_time, _)| data_time <= &time_range.max);
781 start_index..end_index
782 }
783
784 #[inline]
786 pub fn range_data_times(
787 &self,
788 entry_range: Range<usize>,
789 ) -> impl Iterator<Item = &(TimeInt, RowId)> {
790 self.data_times.range(entry_range)
791 }
792
793 #[inline]
795 pub fn range_pov_instance_keys(
796 &self,
797 entry_range: Range<usize>,
798 ) -> impl Iterator<Item = &[InstanceKey]> {
799 self.pov_instance_keys.range(entry_range)
800 }
801
802 #[inline]
804 pub fn component<C: Component + Send + Sync + 'static>(&self) -> Option<&FlatVecDeque<C>> {
805 self.components
806 .get(&C::name())
807 .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())
808 }
809
810 #[inline]
812 pub fn range_component<C: Component + Send + Sync + 'static>(
813 &self,
814 entry_range: Range<usize>,
815 ) -> Option<impl Iterator<Item = &[C]>> {
816 let data = self
817 .components
818 .get(&C::name())
819 .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
820 Some(data.range(entry_range))
821 }
822
823 #[inline]
825 pub fn component_opt<C: Component + Send + Sync + 'static>(
826 &self,
827 ) -> Option<&FlatVecDeque<Option<C>>> {
828 self.components
829 .get(&C::name())
830 .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())
831 }
832
833 #[inline]
835 pub fn range_component_opt<C: Component + Send + Sync + 'static>(
836 &self,
837 entry_range: Range<usize>,
838 ) -> Option<impl Iterator<Item = &[Option<C>]>> {
839 let data = self
840 .components
841 .get(&C::name())
842 .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
843 Some(data.range(entry_range))
844 }
845
846 #[inline]
851 pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
852 self.sanity_check();
853
854 let Self {
855 data_times,
856 pov_instance_keys,
857 components,
858 total_size_bytes,
859 } = self;
860
861 let mut removed_bytes = 0u64;
862
863 let threshold_idx = data_times.partition_point(|(data_time, _)| data_time < &threshold);
864
865 {
866 let total_size_bytes_before = data_times.total_size_bytes();
867 data_times.truncate(threshold_idx);
868 removed_bytes += total_size_bytes_before - data_times.total_size_bytes();
869 }
870
871 {
872 let total_size_bytes_before = pov_instance_keys.total_size_bytes();
873 pov_instance_keys.truncate(threshold_idx);
874 removed_bytes += total_size_bytes_before - pov_instance_keys.total_size_bytes();
875 }
876
877 for data in components.values_mut() {
878 let total_size_bytes_before = data.dyn_total_size_bytes();
879 data.dyn_truncate(threshold_idx);
880 removed_bytes += total_size_bytes_before - data.dyn_total_size_bytes();
881 }
882
883 *total_size_bytes = total_size_bytes
884 .checked_sub(removed_bytes)
885 .unwrap_or_else(|| {
886 re_log::debug!(
887 current = *total_size_bytes,
888 removed = removed_bytes,
889 "book keeping underflowed"
890 );
891 u64::MIN
892 });
893
894 self.sanity_check();
895
896 removed_bytes
897 }
898}
899
900macro_rules! impl_insert {
901 (for N=$N:expr, M=$M:expr => povs=[$($pov:ident)+] comps=[$($comp:ident)*]) => { paste! {
902 #[doc = "Inserts the contents of the given [`ArchetypeView`], which are made of the specified"]
903 #[doc = "`" $N "` point-of-view components and `" $M "` optional components, to the cache."]
904 #[doc = ""]
905 #[doc = "Returns the size in bytes of the data that was cached."]
906 #[doc = ""]
907 #[doc = "`query_time` must be the time of query, _not_ of the resulting data."]
908 pub fn [<insert_pov$N _comp$M>]<A, $($pov,)+ $($comp),*>(
909 &mut self,
910 query_time: TimeInt,
911 arch_view: &ArchetypeView<A>,
912 ) -> ::re_query::Result<u64>
913 where
914 A: Archetype,
915 $($pov: Component + Send + Sync + 'static,)+
916 $($comp: Component + Send + Sync + 'static,)*
917 {
918 re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));
920
921 self.sanity_check();
922
923 let pov_row_id = arch_view.primary_row_id();
924 let index = self.data_times.partition_point(|t| t < &(query_time, pov_row_id));
925
926 let mut added_size_bytes = 0u64;
927
928 self.data_times.insert(index, (query_time, pov_row_id));
929 added_size_bytes += (query_time, pov_row_id).total_size_bytes();
930
931 {
932 let added: FlatVecDeque<InstanceKey> = arch_view
935 .iter_instance_keys()
936 .collect::<VecDeque<InstanceKey>>()
937 .into();
938 added_size_bytes += added.total_size_bytes();
939 self.pov_instance_keys.insert_deque(index, added);
940 }
941
942 $(added_size_bytes += self.insert_component::<A, $pov>(index, arch_view)?;)+
943 $(added_size_bytes += self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
944
945 self.sanity_check();
946
947 self.total_size_bytes += added_size_bytes;
948
949 Ok(added_size_bytes)
950 } }
951 };
952
953 (for N=1, M=$M:expr) => {
956 seq!(COMP in 1..=$M {
957 impl_insert!(for N=1, M=$M => povs=[R1] comps=[#(C~COMP)*]);
958 });
959 };
960}
961
962impl CacheBucket {
963 #[inline]
965 #[allow(dead_code)]
966 fn insert_pov1<A, R1>(
967 &mut self,
968 query_time: TimeInt,
969 arch_view: &ArchetypeView<A>,
970 ) -> ::re_query::Result<u64>
971 where
972 A: Archetype,
973 R1: Component + Send + Sync + 'static,
974 {
975 self.insert_pov1_comp0::<A, R1>(query_time, arch_view)
976 }
977
978 seq!(NUM_COMP in 0..10 {
979 impl_insert!(for N=1, M=NUM_COMP);
980 });
981
982 #[inline]
983 fn insert_component<A: Archetype, C: Component + Send + Sync + 'static>(
984 &mut self,
985 at: usize,
986 arch_view: &ArchetypeView<A>,
987 ) -> re_query::Result<u64> {
988 re_tracing::profile_function!(C::name());
989 let num_entries = self.data_times.len();
992
993 let data = self.components.entry(C::name()).or_insert_with(|| {
994 Box::new(FlatVecDeque::<C>::from_vecs(
995 std::iter::repeat(vec![]).take(
996 num_entries
997 .checked_sub(1)
998 .expect("We should have been called AFTER inserting to data_times"),
999 ),
1000 ))
1001 });
1002
1003 debug_assert!(at <= data.dyn_num_entries());
1004
1005 let added: FlatVecDeque<C> = arch_view
1008 .iter_required_component::<C>()?
1009 .collect::<VecDeque<C>>()
1010 .into();
1011 let added_size_bytes = added.total_size_bytes();
1012
1013 let data = data.as_any_mut().downcast_mut::<FlatVecDeque<C>>().unwrap();
1015 data.insert_deque(at, added);
1016
1017 Ok(added_size_bytes)
1018 }
1019
1020 #[inline]
1022 fn insert_component_opt<A: Archetype, C: Component + Send + Sync + 'static>(
1023 &mut self,
1024 at: usize,
1025 arch_view: &ArchetypeView<A>,
1026 ) -> re_query::Result<u64> {
1027 re_tracing::profile_function!(C::name());
1028 let num_entries = self.num_entries();
1031
1032 let data = self.components.entry(C::name()).or_insert_with(|| {
1033 Box::new(FlatVecDeque::<Option<C>>::from_vecs(
1034 std::iter::repeat(vec![]).take(
1035 num_entries
1036 .checked_sub(1)
1037 .expect("We should have been called AFTER inserting to data_times"),
1038 ),
1039 ))
1040 });
1041
1042 debug_assert!(at <= data.dyn_num_entries());
1043
1044 let added: FlatVecDeque<Option<C>> = if arch_view.has_component::<C>() {
1045 arch_view
1048 .iter_optional_component::<C>()?
1049 .collect::<VecDeque<Option<C>>>()
1050 .into()
1051 } else {
1052 let mut added = FlatVecDeque::<Option<C>>::new();
1055 added.push_back(std::iter::empty());
1056 added
1057 };
1058 let added_size_bytes = added.total_size_bytes();
1059
1060 let data = data
1062 .as_any_mut()
1063 .downcast_mut::<FlatVecDeque<Option<C>>>()
1064 .unwrap();
1065 data.insert_deque(at, added);
1066
1067 Ok(added_size_bytes)
1068 }
1069}