1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use itertools::{Either, Itertools as _};
5use nohash_hasher::IntSet;
6use saturating_cast::SaturatingCast as _;
7
8use re_chunk::{Chunk, ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
9use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, Timeline};
10use re_types_core::{ComponentDescriptor, ComponentSet, UnorderedComponentSet};
11
12use crate::ChunkStore;
13use crate::store::ChunkIdSetPerTime;
14
15#[expect(unused_imports)]
17use crate::RowId;
18
19impl ChunkStore {
27 #[inline]
29 pub fn timelines(&self) -> BTreeMap<TimelineName, Timeline> {
30 self.time_type_registry
31 .iter()
32 .map(|(name, typ)| (*name, Timeline::new(*name, *typ)))
33 .collect()
34 }
35
36 #[inline]
38 pub fn all_entities(&self) -> IntSet<EntityPath> {
39 self.static_chunk_ids_per_entity
40 .keys()
41 .cloned()
42 .chain(self.temporal_chunk_ids_per_entity.keys().cloned())
43 .collect()
44 }
45
46 pub fn find_temporal_chunks_furthest_from(
49 &self,
50 timeline: &TimelineName,
51 time: TimeInt,
52 ) -> Vec<Arc<Chunk>> {
53 re_tracing::profile_function!();
54
55 self.chunks_per_chunk_id
56 .values()
57 .filter_map(|chunk| {
58 let times = chunk.timelines().get(timeline)?;
59
60 let min_dist = if times.is_sorted() {
61 let pivot = times.times_raw().partition_point(|t| *t < time.as_i64());
62 let min_value1 = times
63 .times_raw()
64 .get(pivot.saturating_sub(1))
65 .map(|t| t.abs_diff(time.as_i64()));
66 let min_value2 = times
67 .times_raw()
68 .get(pivot)
69 .map(|t| t.abs_diff(time.as_i64()));
70
71 [min_value1, min_value2].into_iter().flatten().min()
75 } else {
76 times
77 .times()
78 .map(|t| t.as_i64().abs_diff(time.as_i64()))
79 .min()
80 };
81
82 min_dist.map(|max_dist| (chunk, max_dist))
83 })
84 .sorted_by(|(_chunk1, dist1), (_chunk2, dist2)| std::cmp::Ord::cmp(dist2, dist1)) .map(|(chunk, _dist)| chunk.clone())
86 .collect_vec()
87 }
88
89 pub(crate) fn find_temporal_chunks_furthest_from_slow(
93 &self,
94 timeline: &TimelineName,
95 time: TimeInt,
96 ) -> Vec<Arc<Chunk>> {
97 re_tracing::profile_function!();
98
99 self.chunks_per_chunk_id
100 .values()
101 .filter_map(|chunk| {
102 let times = chunk.timelines().get(timeline)?;
103
104 let min_dist = times
105 .times()
106 .map(|t| t.as_i64().abs_diff(time.as_i64()))
107 .min();
108
109 min_dist.map(|max_dist| (chunk, max_dist))
110 })
111 .sorted_by(|(_chunk1, dist1), (_chunk2, dist2)| std::cmp::Ord::cmp(dist2, dist1)) .map(|(chunk, _dist)| chunk.clone())
113 .collect_vec()
114 }
115
116 #[inline]
118 pub fn all_entities_sorted(&self) -> BTreeSet<EntityPath> {
119 self.static_chunk_ids_per_entity
120 .keys()
121 .cloned()
122 .chain(self.temporal_chunk_ids_per_entity.keys().cloned())
123 .collect()
124 }
125
126 pub fn all_components(&self) -> UnorderedComponentSet {
130 self.static_chunk_ids_per_entity
131 .values()
132 .flat_map(|static_chunks_per_component| static_chunks_per_component.keys())
133 .chain(
134 self.temporal_chunk_ids_per_entity_per_component
135 .values()
136 .flat_map(|temporal_chunk_ids_per_timeline| {
137 temporal_chunk_ids_per_timeline.values().flat_map(
138 |temporal_chunk_ids_per_component| {
139 temporal_chunk_ids_per_component.keys()
140 },
141 )
142 }),
143 )
144 .copied()
145 .collect()
146 }
147
148 pub fn all_components_sorted(&self) -> ComponentSet {
152 self.static_chunk_ids_per_entity
153 .values()
154 .flat_map(|static_chunks_per_component| static_chunks_per_component.keys())
155 .chain(
156 self.temporal_chunk_ids_per_entity_per_component
157 .values()
158 .flat_map(|temporal_chunk_ids_per_timeline| {
159 temporal_chunk_ids_per_timeline.values().flat_map(
160 |temporal_chunk_ids_per_component| {
161 temporal_chunk_ids_per_component.keys()
162 },
163 )
164 }),
165 )
166 .copied()
167 .collect()
168 }
169
170 pub fn all_components_on_timeline(
177 &self,
178 timeline: &TimelineName,
179 entity_path: &EntityPath,
180 ) -> Option<UnorderedComponentSet> {
181 re_tracing::profile_function!();
182
183 let static_components: Option<UnorderedComponentSet> = self
184 .static_chunk_ids_per_entity
185 .get(entity_path)
186 .map(|static_chunks_per_component| {
187 static_chunks_per_component
188 .keys()
189 .copied()
190 .collect::<UnorderedComponentSet>()
191 })
192 .filter(|names| !names.is_empty());
193
194 let temporal_components: Option<UnorderedComponentSet> = self
195 .temporal_chunk_ids_per_entity_per_component
196 .get(entity_path)
197 .map(|temporal_chunk_ids_per_timeline| {
198 temporal_chunk_ids_per_timeline
199 .get(timeline)
200 .map(|temporal_chunk_ids_per_component| {
201 temporal_chunk_ids_per_component
202 .keys()
203 .copied()
204 .collect::<UnorderedComponentSet>()
205 })
206 .unwrap_or_default()
207 })
208 .filter(|names| !names.is_empty());
209
210 match (static_components, temporal_components) {
211 (None, None) => None,
212 (None, Some(comps)) | (Some(comps), None) => Some(comps),
213 (Some(static_comps), Some(temporal_comps)) => {
214 Some(static_comps.into_iter().chain(temporal_comps).collect())
215 }
216 }
217 }
218
219 pub fn all_components_on_timeline_sorted(
226 &self,
227 timeline: &TimelineName,
228 entity_path: &EntityPath,
229 ) -> Option<ComponentSet> {
230 re_tracing::profile_function!();
231
232 let static_components: Option<ComponentSet> = self
233 .static_chunk_ids_per_entity
234 .get(entity_path)
235 .map(|static_chunks_per_component| {
236 static_chunks_per_component
237 .keys()
238 .copied()
239 .collect::<ComponentSet>()
240 })
241 .filter(|names| !names.is_empty());
242
243 let temporal_components: Option<ComponentSet> = self
244 .temporal_chunk_ids_per_entity_per_component
245 .get(entity_path)
246 .map(|temporal_chunk_ids_per_timeline| {
247 temporal_chunk_ids_per_timeline
248 .get(timeline)
249 .map(|temporal_chunk_ids_per_component| {
250 temporal_chunk_ids_per_component
251 .keys()
252 .copied()
253 .collect::<ComponentSet>()
254 })
255 .unwrap_or_default()
256 })
257 .filter(|names| !names.is_empty());
258
259 match (static_components, temporal_components) {
260 (None, None) => None,
261 (None, Some(comps)) | (Some(comps), None) => Some(comps),
262 (Some(static_comps), Some(temporal_comps)) => {
263 Some(static_comps.into_iter().chain(temporal_comps).collect())
264 }
265 }
266 }
267
268 pub fn all_components_for_entity(
274 &self,
275 entity_path: &EntityPath,
276 ) -> Option<UnorderedComponentSet> {
277 re_tracing::profile_function!();
278
279 let static_components: Option<UnorderedComponentSet> = self
280 .static_chunk_ids_per_entity
281 .get(entity_path)
282 .map(|static_chunks_per_component| {
283 static_chunks_per_component.keys().copied().collect()
284 });
285
286 let temporal_components: Option<UnorderedComponentSet> = self
287 .temporal_chunk_ids_per_entity_per_component
288 .get(entity_path)
289 .map(|temporal_chunk_ids_per_timeline| {
290 temporal_chunk_ids_per_timeline
291 .iter()
292 .flat_map(|(_, temporal_chunk_ids_per_component)| {
293 temporal_chunk_ids_per_component.keys().copied()
294 })
295 .collect()
296 });
297
298 match (static_components, temporal_components) {
299 (None, None) => None,
300 (None, comps @ Some(_)) | (comps @ Some(_), None) => comps,
301 (Some(static_comps), Some(temporal_comps)) => {
302 Some(static_comps.into_iter().chain(temporal_comps).collect())
303 }
304 }
305 }
306
307 pub fn all_components_for_entity_sorted(
313 &self,
314 entity_path: &EntityPath,
315 ) -> Option<ComponentSet> {
316 re_tracing::profile_function!();
317
318 let static_components: Option<ComponentSet> = self
319 .static_chunk_ids_per_entity
320 .get(entity_path)
321 .map(|static_chunks_per_component| {
322 static_chunks_per_component.keys().copied().collect()
323 });
324
325 let temporal_components: Option<ComponentSet> = self
326 .temporal_chunk_ids_per_entity_per_component
327 .get(entity_path)
328 .map(|temporal_chunk_ids_per_timeline| {
329 temporal_chunk_ids_per_timeline
330 .iter()
331 .flat_map(|(_, temporal_chunk_ids_per_component)| {
332 temporal_chunk_ids_per_component.keys().copied()
333 })
334 .collect()
335 });
336
337 match (static_components, temporal_components) {
338 (None, None) => None,
339 (None, comps @ Some(_)) | (comps @ Some(_), None) => comps,
340 (Some(static_comps), Some(temporal_comps)) => {
341 Some(static_comps.into_iter().chain(temporal_comps).collect())
342 }
343 }
344 }
345
346 pub fn entity_component_descriptor(
349 &self,
350 entity_path: &EntityPath,
351 component: ComponentIdentifier,
352 ) -> Option<ComponentDescriptor> {
353 self.per_column_metadata
354 .get(entity_path)
355 .and_then(|per_identifier| per_identifier.get(&component))
356 .map(|(component_descr, _, _)| component_descr.clone())
357 }
358
359 #[inline]
363 pub fn entity_has_component_on_timeline(
364 &self,
365 timeline: &TimelineName,
366 entity_path: &EntityPath,
367 component: ComponentIdentifier,
368 ) -> bool {
369 self.entity_has_static_component(entity_path, component)
372 || self.entity_has_temporal_component_on_timeline(timeline, entity_path, component)
373 }
374
375 pub fn entity_has_component(
379 &self,
380 entity_path: &EntityPath,
381 component: ComponentIdentifier,
382 ) -> bool {
383 self.entity_has_static_component(entity_path, component)
386 || self.entity_has_temporal_component(entity_path, component)
387 }
388
389 #[inline]
393 pub fn entity_has_static_component(
394 &self,
395 entity_path: &EntityPath,
396 component: ComponentIdentifier,
397 ) -> bool {
398 self.static_chunk_ids_per_entity
401 .get(entity_path)
402 .is_some_and(|static_chunk_ids_per_component| {
403 static_chunk_ids_per_component.contains_key(&component)
404 })
405 }
406
407 #[inline]
411 pub fn entity_has_temporal_component(
412 &self,
413 entity_path: &EntityPath,
414 component: ComponentIdentifier,
415 ) -> bool {
416 self.temporal_chunk_ids_per_entity_per_component
419 .get(entity_path)
420 .iter()
421 .flat_map(|temporal_chunk_ids_per_timeline| temporal_chunk_ids_per_timeline.values())
422 .any(|temporal_chunk_ids_per_component| {
423 temporal_chunk_ids_per_component.contains_key(&component)
424 })
425 }
426
427 #[inline]
431 pub fn entity_has_temporal_component_on_timeline(
432 &self,
433 timeline: &TimelineName,
434 entity_path: &EntityPath,
435 component: ComponentIdentifier,
436 ) -> bool {
437 self.temporal_chunk_ids_per_entity_per_component
440 .get(entity_path)
441 .iter()
442 .filter_map(|temporal_chunk_ids_per_timeline| {
443 temporal_chunk_ids_per_timeline.get(timeline)
444 })
445 .any(|temporal_chunk_ids_per_component| {
446 temporal_chunk_ids_per_component.contains_key(&component)
447 })
448 }
449
450 #[inline]
455 pub fn entity_has_data_on_timeline(
456 &self,
457 timeline: &TimelineName,
458 entity_path: &EntityPath,
459 ) -> bool {
460 self.entity_has_static_data(entity_path)
463 || self.entity_has_temporal_data_on_timeline(timeline, entity_path)
464 }
465
466 #[inline]
471 pub fn entity_has_data(&self, entity_path: &EntityPath) -> bool {
472 self.entity_has_static_data(entity_path) || self.entity_has_temporal_data(entity_path)
475 }
476
477 #[inline]
482 pub fn entity_has_static_data(&self, entity_path: &EntityPath) -> bool {
483 self.static_chunk_ids_per_entity
486 .get(entity_path)
487 .is_some_and(|static_chunk_ids_per_component| {
488 static_chunk_ids_per_component
489 .values()
490 .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
491 })
492 }
493
494 #[inline]
499 pub fn entity_has_temporal_data(&self, entity_path: &EntityPath) -> bool {
500 self.temporal_chunk_ids_per_entity_per_component
503 .get(entity_path)
504 .is_some_and(|temporal_chunks_per_timeline| {
505 temporal_chunks_per_timeline
506 .values()
507 .flat_map(|temporal_chunks_per_component| {
508 temporal_chunks_per_component.values()
509 })
510 .flat_map(|chunk_id_sets| chunk_id_sets.per_start_time.values())
511 .flat_map(|chunk_id_set| chunk_id_set.iter())
512 .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
513 })
514 }
515
516 #[inline]
521 pub fn entity_has_temporal_data_on_timeline(
522 &self,
523 timeline: &TimelineName,
524 entity_path: &EntityPath,
525 ) -> bool {
526 self.temporal_chunk_ids_per_entity_per_component
529 .get(entity_path)
530 .and_then(|temporal_chunks_per_timeline| temporal_chunks_per_timeline.get(timeline))
531 .is_some_and(|temporal_chunks_per_component| {
532 temporal_chunks_per_component
533 .values()
534 .flat_map(|chunk_id_sets| chunk_id_sets.per_start_time.values())
535 .flat_map(|chunk_id_set| chunk_id_set.iter())
536 .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
537 })
538 }
539
540 #[inline]
545 pub fn entity_min_time(
546 &self,
547 timeline: &TimelineName,
548 entity_path: &EntityPath,
549 ) -> Option<TimeInt> {
550 let temporal_chunk_ids_per_timeline = self
551 .temporal_chunk_ids_per_entity_per_component
552 .get(entity_path)?;
553 let temporal_chunk_ids_per_component = temporal_chunk_ids_per_timeline.get(timeline)?;
554
555 let mut time_min = TimeInt::MAX;
556 for temporal_chunk_ids_per_time in temporal_chunk_ids_per_component.values() {
557 let Some(time) = temporal_chunk_ids_per_time
558 .per_start_time
559 .first_key_value()
560 .map(|(time, _)| *time)
561 else {
562 continue;
563 };
564 time_min = TimeInt::min(time_min, time);
565 }
566
567 (time_min != TimeInt::MAX).then_some(time_min)
568 }
569
570 pub fn entity_time_range(
574 &self,
575 timeline: &TimelineName,
576 entity_path: &EntityPath,
577 ) -> Option<AbsoluteTimeRange> {
578 re_tracing::profile_function!();
579
580 let temporal_chunk_ids_per_timeline =
581 self.temporal_chunk_ids_per_entity.get(entity_path)?;
582 let chunk_id_sets = temporal_chunk_ids_per_timeline.get(timeline)?;
583
584 let start = chunk_id_sets.per_start_time.first_key_value()?.0;
585 let end = chunk_id_sets.per_end_time.last_key_value()?.0;
586
587 Some(AbsoluteTimeRange::new(*start, *end))
588 }
589
590 pub fn time_range(&self, timeline: &TimelineName) -> Option<AbsoluteTimeRange> {
595 re_tracing::profile_function!();
596
597 self.temporal_chunk_ids_per_entity
598 .values()
599 .filter_map(|temporal_chunk_ids_per_timeline| {
600 let per_time = temporal_chunk_ids_per_timeline.get(timeline)?;
601 let start = per_time.per_start_time.first_key_value()?.0;
602 let end = per_time.per_end_time.last_key_value()?.0;
603 Some(AbsoluteTimeRange::new(*start, *end))
604 })
605 .reduce(|r1, r2| r1.union(r2))
606 }
607}
608
609impl ChunkStore {
611 pub fn latest_at_relevant_chunks(
626 &self,
627 query: &LatestAtQuery,
628 entity_path: &EntityPath,
629 component: ComponentIdentifier,
630 ) -> Vec<Arc<Chunk>> {
631 if let Some(static_chunk) = self
638 .static_chunk_ids_per_entity
639 .get(entity_path)
640 .and_then(|static_chunks_per_component| static_chunks_per_component.get(&component))
641 .and_then(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
642 {
643 return vec![Arc::clone(static_chunk)];
644 }
645
646 let chunks = self
647 .temporal_chunk_ids_per_entity_per_component
648 .get(entity_path)
649 .and_then(|temporal_chunk_ids_per_timeline| {
650 temporal_chunk_ids_per_timeline.get(&query.timeline())
651 })
652 .and_then(|temporal_chunk_ids_per_component| {
653 temporal_chunk_ids_per_component.get(&component)
654 })
655 .and_then(|temporal_chunk_ids_per_time| {
656 self.latest_at(query, temporal_chunk_ids_per_time)
657 })
658 .unwrap_or_default();
659
660 debug_assert!(
661 chunks.iter().map(|chunk| chunk.id()).all_unique(),
662 "{entity_path}:{component} @ {query:?}",
663 );
664
665 chunks
666 }
667
668 pub fn latest_at_relevant_chunks_for_all_components(
682 &self,
683 query: &LatestAtQuery,
684 entity_path: &EntityPath,
685 include_static: bool,
686 ) -> Vec<Arc<Chunk>> {
687 re_tracing::profile_function!(format!("{query:?}"));
688
689 let chunks = if include_static {
690 let empty = Default::default();
691 let static_chunks_per_component = self
692 .static_chunk_ids_per_entity
693 .get(entity_path)
694 .unwrap_or(&empty);
695
696 let static_chunks = static_chunks_per_component
698 .values()
699 .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
700 .cloned();
701
702 let temporal_chunks = self
705 .temporal_chunk_ids_per_entity_per_component
706 .get(entity_path)
707 .and_then(|temporal_chunk_ids_per_timeline_per_component| {
708 temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
709 })
710 .map(|temporal_chunk_ids_per_component| {
711 temporal_chunk_ids_per_component
712 .iter()
713 .filter(|(component_type, _)| {
714 !static_chunks_per_component.contains_key(component_type)
715 })
716 .map(|(_, chunk_id_set)| chunk_id_set)
717 })
718 .into_iter()
719 .flatten()
720 .filter_map(|temporal_chunk_ids_per_time| {
721 self.latest_at(query, temporal_chunk_ids_per_time)
722 })
723 .flatten();
724
725 static_chunks
726 .chain(temporal_chunks)
727 .unique_by(|chunk| chunk.id())
731 .collect_vec()
732 } else {
733 self.temporal_chunk_ids_per_entity
735 .get(entity_path)
736 .and_then(|temporal_chunk_ids_per_timeline| {
737 temporal_chunk_ids_per_timeline.get(&query.timeline())
738 })
739 .and_then(|temporal_chunk_ids_per_time| {
740 self.latest_at(query, temporal_chunk_ids_per_time)
741 })
742 .unwrap_or_default()
743 };
744
745 debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
746
747 chunks
748 }
749
750 fn latest_at(
751 &self,
752 query: &LatestAtQuery,
753 temporal_chunk_ids_per_time: &ChunkIdSetPerTime,
754 ) -> Option<Vec<Arc<Chunk>>> {
755 let upper_bound = temporal_chunk_ids_per_time
759 .per_start_time
760 .range(..=query.at())
761 .next_back()
762 .map(|(time, _)| *time)?;
763
764 let lower_bound = upper_bound.as_i64().saturating_sub(
780 temporal_chunk_ids_per_time
781 .max_interval_length
782 .saturating_cast(),
783 );
784
785 let temporal_chunk_ids = temporal_chunk_ids_per_time
786 .per_start_time
787 .range(..=query.at())
788 .rev()
789 .take_while(|(time, _)| time.as_i64() >= lower_bound)
790 .flat_map(|(_time, chunk_ids)| chunk_ids.iter())
791 .copied()
792 .collect_vec();
793
794 Some(
795 temporal_chunk_ids
796 .iter()
797 .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id).cloned())
798 .collect(),
799 )
800 }
801}
802
803impl ChunkStore {
805 pub fn range_relevant_chunks(
818 &self,
819 query: &RangeQuery,
820 entity_path: &EntityPath,
821 component: ComponentIdentifier,
822 ) -> Vec<Arc<Chunk>> {
823 re_tracing::profile_function!(format!("{query:?}"));
824
825 if let Some(static_chunk) = self
826 .static_chunk_ids_per_entity
827 .get(entity_path)
828 .and_then(|static_chunks_per_component| static_chunks_per_component.get(&component))
829 .and_then(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
830 {
831 return vec![Arc::clone(static_chunk)];
832 }
833
834 let chunks = self
835 .range(
836 query,
837 self.temporal_chunk_ids_per_entity_per_component
838 .get(entity_path)
839 .and_then(|temporal_chunk_ids_per_timeline| {
840 temporal_chunk_ids_per_timeline.get(query.timeline())
841 })
842 .and_then(|temporal_chunk_ids_per_component| {
843 temporal_chunk_ids_per_component.get(&component)
844 })
845 .into_iter(),
846 )
847 .into_iter()
848 .filter(|chunk| {
852 chunk
853 .timelines()
854 .get(query.timeline())
855 .is_some_and(|time_column| {
856 time_column
857 .time_range_per_component(chunk.components())
858 .get(&component)
859 .is_some_and(|time_range| time_range.intersects(query.range()))
860 })
861 })
862 .collect_vec();
863
864 debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
865
866 chunks
867 }
868
869 pub fn range_relevant_chunks_for_all_components(
879 &self,
880 query: &RangeQuery,
881 entity_path: &EntityPath,
882 include_static: bool,
883 ) -> Vec<Arc<Chunk>> {
884 re_tracing::profile_function!(format!("{query:?}"));
885
886 let empty = Default::default();
887 let chunks = if include_static {
888 let static_chunks_per_component = self
889 .static_chunk_ids_per_entity
890 .get(entity_path)
891 .unwrap_or(&empty);
892
893 let static_chunks = static_chunks_per_component
895 .values()
896 .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
897 .cloned();
898
899 let temporal_chunks = self
902 .range(
903 query,
904 self.temporal_chunk_ids_per_entity_per_component
905 .get(entity_path)
906 .and_then(|temporal_chunk_ids_per_timeline_per_component| {
907 temporal_chunk_ids_per_timeline_per_component.get(query.timeline())
908 })
909 .map(|temporal_chunk_ids_per_component| {
910 temporal_chunk_ids_per_component
911 .iter()
912 .filter(|(component_type, _)| {
913 !static_chunks_per_component.contains_key(component_type)
914 })
915 .map(|(_, chunk_id_set)| chunk_id_set)
916 })
917 .into_iter()
918 .flatten(),
919 )
920 .into_iter();
921
922 Either::Left(
923 static_chunks
924 .chain(temporal_chunks)
925 .unique_by(|chunk| chunk.id()),
929 )
930 } else {
931 Either::Right(
933 self.range(
934 query,
935 self.temporal_chunk_ids_per_entity
936 .get(entity_path)
937 .and_then(|temporal_chunk_ids_per_timeline| {
938 temporal_chunk_ids_per_timeline.get(query.timeline())
939 })
940 .into_iter(),
941 ),
942 )
943 };
944
945 let chunks = chunks
949 .into_iter()
950 .filter(|chunk| {
951 chunk
952 .timelines()
953 .get(query.timeline())
954 .is_some_and(|time_column| time_column.time_range().intersects(query.range()))
955 })
956 .collect_vec();
957
958 debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
959
960 chunks
961 }
962
963 fn range<'a>(
964 &'a self,
965 query: &RangeQuery,
966 temporal_chunk_ids_per_times: impl Iterator<Item = &'a ChunkIdSetPerTime>,
967 ) -> Vec<Arc<Chunk>> {
968 temporal_chunk_ids_per_times
972 .map(|temporal_chunk_ids_per_time| {
973 let query_min = if query.options().include_extended_bounds {
975 re_log_types::TimeInt::new_temporal(
976 query.range.min().as_i64().saturating_sub(1),
977 )
978 } else {
979 query.range.min()
980 };
981 let query_max = if query.options().include_extended_bounds {
982 re_log_types::TimeInt::new_temporal(
983 query.range.max().as_i64().saturating_add(1),
984 )
985 } else {
986 query.range.max()
987 };
988
989 let query_min = TimeInt::new_temporal(
1005 query_min.as_i64().saturating_sub(
1006 temporal_chunk_ids_per_time
1007 .max_interval_length
1008 .saturating_cast(),
1009 ),
1010 );
1011
1012 let start_time = temporal_chunk_ids_per_time
1013 .per_start_time
1014 .range(..=query_min)
1015 .next_back()
1016 .map_or(TimeInt::MIN, |(&time, _)| time);
1017
1018 let end_time = temporal_chunk_ids_per_time
1019 .per_start_time
1020 .range(..=query_max)
1021 .next_back()
1022 .map_or(start_time, |(&time, _)| time);
1023
1024 let end_time = TimeInt::max(start_time, end_time);
1027
1028 (start_time, end_time, temporal_chunk_ids_per_time)
1029 })
1030 .flat_map(|(start_time, end_time, temporal_chunk_ids_per_time)| {
1031 temporal_chunk_ids_per_time
1032 .per_start_time
1033 .range(start_time..=end_time)
1034 .map(|(_time, chunk_ids)| chunk_ids)
1035 })
1036 .flat_map(|temporal_chunk_ids| {
1037 temporal_chunk_ids
1038 .iter()
1039 .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id).cloned())
1040 })
1041 .collect()
1042 }
1043}