re_chunk_store/
query.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use itertools::{Either, Itertools as _};
7use nohash_hasher::IntSet;
8use saturating_cast::SaturatingCast as _;
9
10use re_chunk::{Chunk, ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
11use re_log_types::AbsoluteTimeRange;
12use re_log_types::{EntityPath, TimeInt, Timeline};
13use re_types_core::{ComponentDescriptor, ComponentSet, UnorderedComponentSet};
14
15use crate::{ChunkStore, store::ChunkIdSetPerTime};
16
17// Used all over in docstrings.
18#[expect(unused_imports)]
19use crate::RowId;
20
21// ---
22
23// These APIs often have `temporal` and `static` variants.
24// It is sometimes useful to be able to separately query either,
25// such as when we want to tell the user that they logged a component
26// as both static and temporal, which is probably wrong.
27
28impl ChunkStore {
29    /// Retrieve all [`Timeline`]s in the store.
30    #[inline]
31    pub fn timelines(&self) -> BTreeMap<TimelineName, Timeline> {
32        self.time_type_registry
33            .iter()
34            .map(|(name, typ)| (*name, Timeline::new(*name, *typ)))
35            .collect()
36    }
37
38    /// Retrieve all [`EntityPath`]s in the store.
39    #[inline]
40    pub fn all_entities(&self) -> IntSet<EntityPath> {
41        self.static_chunk_ids_per_entity
42            .keys()
43            .cloned()
44            .chain(self.temporal_chunk_ids_per_entity.keys().cloned())
45            .collect()
46    }
47
48    /// Retrieve all [`EntityPath`]s in the store.
49    #[inline]
50    pub fn all_entities_sorted(&self) -> BTreeSet<EntityPath> {
51        self.static_chunk_ids_per_entity
52            .keys()
53            .cloned()
54            .chain(self.temporal_chunk_ids_per_entity.keys().cloned())
55            .collect()
56    }
57
58    /// Retrieve all [`ComponentIdentifier`]s in the store.
59    ///
60    /// See also [`Self::all_components_sorted`].
61    pub fn all_components(&self) -> UnorderedComponentSet {
62        self.static_chunk_ids_per_entity
63            .values()
64            .flat_map(|static_chunks_per_component| static_chunks_per_component.keys())
65            .chain(
66                self.temporal_chunk_ids_per_entity_per_component
67                    .values()
68                    .flat_map(|temporal_chunk_ids_per_timeline| {
69                        temporal_chunk_ids_per_timeline.values().flat_map(
70                            |temporal_chunk_ids_per_component| {
71                                temporal_chunk_ids_per_component.keys()
72                            },
73                        )
74                    }),
75            )
76            .copied()
77            .collect()
78    }
79
80    /// Retrieve all [`ComponentIdentifier`]s in the store.
81    ///
82    /// See also [`Self::all_components`].
83    pub fn all_components_sorted(&self) -> ComponentSet {
84        self.static_chunk_ids_per_entity
85            .values()
86            .flat_map(|static_chunks_per_component| static_chunks_per_component.keys())
87            .chain(
88                self.temporal_chunk_ids_per_entity_per_component
89                    .values()
90                    .flat_map(|temporal_chunk_ids_per_timeline| {
91                        temporal_chunk_ids_per_timeline.values().flat_map(
92                            |temporal_chunk_ids_per_component| {
93                                temporal_chunk_ids_per_component.keys()
94                            },
95                        )
96                    }),
97            )
98            .copied()
99            .collect()
100    }
101
102    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`] on
103    /// the specified [`Timeline`].
104    ///
105    /// Static components are always included in the results.
106    ///
107    /// Returns `None` if the entity doesn't exist at all on this `timeline`.
108    pub fn all_components_on_timeline(
109        &self,
110        timeline: &TimelineName,
111        entity_path: &EntityPath,
112    ) -> Option<UnorderedComponentSet> {
113        re_tracing::profile_function!();
114
115        let static_components: Option<UnorderedComponentSet> = self
116            .static_chunk_ids_per_entity
117            .get(entity_path)
118            .map(|static_chunks_per_component| {
119                static_chunks_per_component
120                    .keys()
121                    .copied()
122                    .collect::<UnorderedComponentSet>()
123            })
124            .filter(|names| !names.is_empty());
125
126        let temporal_components: Option<UnorderedComponentSet> = self
127            .temporal_chunk_ids_per_entity_per_component
128            .get(entity_path)
129            .map(|temporal_chunk_ids_per_timeline| {
130                temporal_chunk_ids_per_timeline
131                    .get(timeline)
132                    .map(|temporal_chunk_ids_per_component| {
133                        temporal_chunk_ids_per_component
134                            .keys()
135                            .copied()
136                            .collect::<UnorderedComponentSet>()
137                    })
138                    .unwrap_or_default()
139            })
140            .filter(|names| !names.is_empty());
141
142        match (static_components, temporal_components) {
143            (None, None) => None,
144            (None, Some(comps)) | (Some(comps), None) => Some(comps),
145            (Some(static_comps), Some(temporal_comps)) => {
146                Some(static_comps.into_iter().chain(temporal_comps).collect())
147            }
148        }
149    }
150
151    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`] on
152    /// the specified [`Timeline`].
153    ///
154    /// Static components are always included in the results.
155    ///
156    /// Returns `None` if the entity doesn't exist at all on this `timeline`.
157    pub fn all_components_on_timeline_sorted(
158        &self,
159        timeline: &TimelineName,
160        entity_path: &EntityPath,
161    ) -> Option<ComponentSet> {
162        re_tracing::profile_function!();
163
164        let static_components: Option<ComponentSet> = self
165            .static_chunk_ids_per_entity
166            .get(entity_path)
167            .map(|static_chunks_per_component| {
168                static_chunks_per_component
169                    .keys()
170                    .copied()
171                    .collect::<ComponentSet>()
172            })
173            .filter(|names| !names.is_empty());
174
175        let temporal_components: Option<ComponentSet> = self
176            .temporal_chunk_ids_per_entity_per_component
177            .get(entity_path)
178            .map(|temporal_chunk_ids_per_timeline| {
179                temporal_chunk_ids_per_timeline
180                    .get(timeline)
181                    .map(|temporal_chunk_ids_per_component| {
182                        temporal_chunk_ids_per_component
183                            .keys()
184                            .copied()
185                            .collect::<ComponentSet>()
186                    })
187                    .unwrap_or_default()
188            })
189            .filter(|names| !names.is_empty());
190
191        match (static_components, temporal_components) {
192            (None, None) => None,
193            (None, Some(comps)) | (Some(comps), None) => Some(comps),
194            (Some(static_comps), Some(temporal_comps)) => {
195                Some(static_comps.into_iter().chain(temporal_comps).collect())
196            }
197        }
198    }
199
200    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`].
201    ///
202    /// Static components are always included in the results.
203    ///
204    /// Returns `None` if the entity has never had any data logged to it.
205    pub fn all_components_for_entity(
206        &self,
207        entity_path: &EntityPath,
208    ) -> Option<UnorderedComponentSet> {
209        re_tracing::profile_function!();
210
211        let static_components: Option<UnorderedComponentSet> = self
212            .static_chunk_ids_per_entity
213            .get(entity_path)
214            .map(|static_chunks_per_component| {
215                static_chunks_per_component.keys().copied().collect()
216            });
217
218        let temporal_components: Option<UnorderedComponentSet> = self
219            .temporal_chunk_ids_per_entity_per_component
220            .get(entity_path)
221            .map(|temporal_chunk_ids_per_timeline| {
222                temporal_chunk_ids_per_timeline
223                    .iter()
224                    .flat_map(|(_, temporal_chunk_ids_per_component)| {
225                        temporal_chunk_ids_per_component.keys().copied()
226                    })
227                    .collect()
228            });
229
230        match (static_components, temporal_components) {
231            (None, None) => None,
232            (None, comps @ Some(_)) | (comps @ Some(_), None) => comps,
233            (Some(static_comps), Some(temporal_comps)) => {
234                Some(static_comps.into_iter().chain(temporal_comps).collect())
235            }
236        }
237    }
238
239    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`].
240    ///
241    /// Static components are always included in the results.
242    ///
243    /// Returns `None` if the entity has never had any data logged to it.
244    pub fn all_components_for_entity_sorted(
245        &self,
246        entity_path: &EntityPath,
247    ) -> Option<ComponentSet> {
248        re_tracing::profile_function!();
249
250        let static_components: Option<ComponentSet> = self
251            .static_chunk_ids_per_entity
252            .get(entity_path)
253            .map(|static_chunks_per_component| {
254                static_chunks_per_component.keys().copied().collect()
255            });
256
257        let temporal_components: Option<ComponentSet> = self
258            .temporal_chunk_ids_per_entity_per_component
259            .get(entity_path)
260            .map(|temporal_chunk_ids_per_timeline| {
261                temporal_chunk_ids_per_timeline
262                    .iter()
263                    .flat_map(|(_, temporal_chunk_ids_per_component)| {
264                        temporal_chunk_ids_per_component.keys().copied()
265                    })
266                    .collect()
267            });
268
269        match (static_components, temporal_components) {
270            (None, None) => None,
271            (None, comps @ Some(_)) | (comps @ Some(_), None) => comps,
272            (Some(static_comps), Some(temporal_comps)) => {
273                Some(static_comps.into_iter().chain(temporal_comps).collect())
274            }
275        }
276    }
277
278    /// Retrieves the [`ComponentDescriptor`] at a given [`EntityPath`] that has a certain [`ComponentIdentifier`].
279    // TODO(andreas): The descriptor for a given identifier should never change within a recording.
280    pub fn entity_component_descriptor(
281        &self,
282        entity_path: &EntityPath,
283        component: ComponentIdentifier,
284    ) -> Option<ComponentDescriptor> {
285        self.per_column_metadata
286            .get(entity_path)
287            .and_then(|per_identifier| per_identifier.get(&component))
288            .map(|(component_descr, _, _)| component_descr.clone())
289    }
290
291    /// Check whether an entity has a static component or a temporal component on the specified timeline.
292    ///
293    /// This does _not_ check if the entity actually currently holds any data for that component.
294    #[inline]
295    pub fn entity_has_component_on_timeline(
296        &self,
297        timeline: &TimelineName,
298        entity_path: &EntityPath,
299        component: ComponentIdentifier,
300    ) -> bool {
301        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
302
303        self.entity_has_static_component(entity_path, component)
304            || self.entity_has_temporal_component_on_timeline(timeline, entity_path, component)
305    }
306
307    /// Check whether an entity has a static component or a temporal component on any timeline.
308    ///
309    /// This does _not_ check if the entity actually currently holds any data for that component.
310    pub fn entity_has_component(
311        &self,
312        entity_path: &EntityPath,
313        component: ComponentIdentifier,
314    ) -> bool {
315        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
316
317        self.entity_has_static_component(entity_path, component)
318            || self.entity_has_temporal_component(entity_path, component)
319    }
320
321    /// Check whether an entity has a specific static component.
322    ///
323    /// This does _not_ check if the entity actually currently holds any data for that component.
324    #[inline]
325    pub fn entity_has_static_component(
326        &self,
327        entity_path: &EntityPath,
328        component: ComponentIdentifier,
329    ) -> bool {
330        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
331
332        self.static_chunk_ids_per_entity
333            .get(entity_path)
334            .is_some_and(|static_chunk_ids_per_component| {
335                static_chunk_ids_per_component.contains_key(&component)
336            })
337    }
338
339    /// Check whether an entity has a temporal component on any timeline.
340    ///
341    /// This does _not_ check if the entity actually currently holds any data for that component.
342    #[inline]
343    pub fn entity_has_temporal_component(
344        &self,
345        entity_path: &EntityPath,
346        component: ComponentIdentifier,
347    ) -> bool {
348        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
349
350        self.temporal_chunk_ids_per_entity_per_component
351            .get(entity_path)
352            .iter()
353            .flat_map(|temporal_chunk_ids_per_timeline| temporal_chunk_ids_per_timeline.values())
354            .any(|temporal_chunk_ids_per_component| {
355                temporal_chunk_ids_per_component.contains_key(&component)
356            })
357    }
358
359    /// Check whether an entity has a temporal component on a specific timeline.
360    ///
361    /// This does _not_ check if the entity actually currently holds any data for that component.
362    #[inline]
363    pub fn entity_has_temporal_component_on_timeline(
364        &self,
365        timeline: &TimelineName,
366        entity_path: &EntityPath,
367        component: ComponentIdentifier,
368    ) -> bool {
369        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
370
371        self.temporal_chunk_ids_per_entity_per_component
372            .get(entity_path)
373            .iter()
374            .filter_map(|temporal_chunk_ids_per_timeline| {
375                temporal_chunk_ids_per_timeline.get(timeline)
376            })
377            .any(|temporal_chunk_ids_per_component| {
378                temporal_chunk_ids_per_component.contains_key(&component)
379            })
380    }
381
382    /// Check whether an entity has any data on a specific timeline, or any static data.
383    ///
384    /// This is different from checking if the entity has any component, it also ensures
385    /// that some _data_ currently exists in the store for this entity.
386    #[inline]
387    pub fn entity_has_data_on_timeline(
388        &self,
389        timeline: &TimelineName,
390        entity_path: &EntityPath,
391    ) -> bool {
392        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
393
394        self.entity_has_static_data(entity_path)
395            || self.entity_has_temporal_data_on_timeline(timeline, entity_path)
396    }
397
398    /// Check whether an entity has any static data or any temporal data on any timeline.
399    ///
400    /// This is different from checking if the entity has any component, it also ensures
401    /// that some _data_ currently exists in the store for this entity.
402    #[inline]
403    pub fn entity_has_data(&self, entity_path: &EntityPath) -> bool {
404        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
405
406        self.entity_has_static_data(entity_path) || self.entity_has_temporal_data(entity_path)
407    }
408
409    /// Check whether an entity has any static data.
410    ///
411    /// This is different from checking if the entity has any component, it also ensures
412    /// that some _data_ currently exists in the store for this entity.
413    #[inline]
414    pub fn entity_has_static_data(&self, entity_path: &EntityPath) -> bool {
415        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
416
417        self.static_chunk_ids_per_entity
418            .get(entity_path)
419            .is_some_and(|static_chunk_ids_per_component| {
420                static_chunk_ids_per_component
421                    .values()
422                    .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
423            })
424    }
425
426    /// Check whether an entity has any temporal data.
427    ///
428    /// This is different from checking if the entity has any component, it also ensures
429    /// that some _data_ currently exists in the store for this entity.
430    #[inline]
431    pub fn entity_has_temporal_data(&self, entity_path: &EntityPath) -> bool {
432        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
433
434        self.temporal_chunk_ids_per_entity_per_component
435            .get(entity_path)
436            .is_some_and(|temporal_chunks_per_timeline| {
437                temporal_chunks_per_timeline
438                    .values()
439                    .flat_map(|temporal_chunks_per_component| {
440                        temporal_chunks_per_component.values()
441                    })
442                    .flat_map(|chunk_id_sets| chunk_id_sets.per_start_time.values())
443                    .flat_map(|chunk_id_set| chunk_id_set.iter())
444                    .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
445            })
446    }
447
448    /// Check whether an entity has any temporal data.
449    ///
450    /// This is different from checking if the entity has any component, it also ensures
451    /// that some _data_ currently exists in the store for this entity.
452    #[inline]
453    pub fn entity_has_temporal_data_on_timeline(
454        &self,
455        timeline: &TimelineName,
456        entity_path: &EntityPath,
457    ) -> bool {
458        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
459
460        self.temporal_chunk_ids_per_entity_per_component
461            .get(entity_path)
462            .and_then(|temporal_chunks_per_timeline| temporal_chunks_per_timeline.get(timeline))
463            .is_some_and(|temporal_chunks_per_component| {
464                temporal_chunks_per_component
465                    .values()
466                    .flat_map(|chunk_id_sets| chunk_id_sets.per_start_time.values())
467                    .flat_map(|chunk_id_set| chunk_id_set.iter())
468                    .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
469            })
470    }
471
472    /// Find the earliest time at which something was logged for a given entity on the specified
473    /// timeline.
474    ///
475    /// Ignores static data.
476    #[inline]
477    pub fn entity_min_time(
478        &self,
479        timeline: &TimelineName,
480        entity_path: &EntityPath,
481    ) -> Option<TimeInt> {
482        let temporal_chunk_ids_per_timeline = self
483            .temporal_chunk_ids_per_entity_per_component
484            .get(entity_path)?;
485        let temporal_chunk_ids_per_component = temporal_chunk_ids_per_timeline.get(timeline)?;
486
487        let mut time_min = TimeInt::MAX;
488        for temporal_chunk_ids_per_time in temporal_chunk_ids_per_component.values() {
489            let Some(time) = temporal_chunk_ids_per_time
490                .per_start_time
491                .first_key_value()
492                .map(|(time, _)| *time)
493            else {
494                continue;
495            };
496            time_min = TimeInt::min(time_min, time);
497        }
498
499        (time_min != TimeInt::MAX).then_some(time_min)
500    }
501
502    /// Returns the min and max times at which data was logged for an entity on a specific timeline.
503    ///
504    /// This ignores static data.
505    pub fn entity_time_range(
506        &self,
507        timeline: &TimelineName,
508        entity_path: &EntityPath,
509    ) -> Option<AbsoluteTimeRange> {
510        re_tracing::profile_function!();
511
512        let temporal_chunk_ids_per_timeline =
513            self.temporal_chunk_ids_per_entity.get(entity_path)?;
514        let chunk_id_sets = temporal_chunk_ids_per_timeline.get(timeline)?;
515
516        let start = chunk_id_sets.per_start_time.first_key_value()?.0;
517        let end = chunk_id_sets.per_end_time.last_key_value()?.0;
518
519        Some(AbsoluteTimeRange::new(*start, *end))
520    }
521
522    /// Returns the min and max times at which data was logged on a specific timeline, considering
523    /// all entities.
524    ///
525    /// This ignores static data.
526    pub fn time_range(&self, timeline: &TimelineName) -> Option<AbsoluteTimeRange> {
527        re_tracing::profile_function!();
528
529        self.temporal_chunk_ids_per_entity
530            .values()
531            .filter_map(|temporal_chunk_ids_per_timeline| {
532                let per_time = temporal_chunk_ids_per_timeline.get(timeline)?;
533                let start = per_time.per_start_time.first_key_value()?.0;
534                let end = per_time.per_end_time.last_key_value()?.0;
535                Some(AbsoluteTimeRange::new(*start, *end))
536            })
537            .reduce(|r1, r2| r1.union(r2))
538    }
539}
540
541// LatestAt
542impl ChunkStore {
543    /// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`] and [`ComponentIdentifier`].
544    ///
545    /// The returned vector is guaranteed free of duplicates, by definition.
546    ///
547    /// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is
548    /// oblivious to the data therein.
549    /// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible
550    /// that a query has more than one relevant chunk.
551    ///
552    /// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to
553    /// determine what exact row contains the final result.
554    ///
555    /// If the entity has static component data associated with it, it will unconditionally
556    /// override any temporal component data.
557    pub fn latest_at_relevant_chunks(
558        &self,
559        query: &LatestAtQuery,
560        entity_path: &EntityPath,
561        component: ComponentIdentifier,
562    ) -> Vec<Arc<Chunk>> {
563        // Don't do a profile scope here, this can have a lot of overhead when executing many small queries.
564        //re_tracing::profile_function!(format!("{query:?}"));
565
566        // Reminder: if a chunk has been indexed for a given component, then it must contain at
567        // least one non-null value for that column.
568
569        if let Some(static_chunk) = self
570            .static_chunk_ids_per_entity
571            .get(entity_path)
572            .and_then(|static_chunks_per_component| static_chunks_per_component.get(&component))
573            .and_then(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
574        {
575            return vec![Arc::clone(static_chunk)];
576        }
577
578        let chunks = self
579            .temporal_chunk_ids_per_entity_per_component
580            .get(entity_path)
581            .and_then(|temporal_chunk_ids_per_timeline| {
582                temporal_chunk_ids_per_timeline.get(&query.timeline())
583            })
584            .and_then(|temporal_chunk_ids_per_component| {
585                temporal_chunk_ids_per_component.get(&component)
586            })
587            .and_then(|temporal_chunk_ids_per_time| {
588                self.latest_at(query, temporal_chunk_ids_per_time)
589            })
590            .unwrap_or_default();
591
592        debug_assert!(
593            chunks.iter().map(|chunk| chunk.id()).all_unique(),
594            "{entity_path}:{component} @ {query:?}",
595        );
596
597        chunks
598    }
599
600    /// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`].
601    ///
602    /// Optionally include static data.
603    ///
604    /// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is
605    /// oblivious to the data therein.
606    /// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible
607    /// that a query has more than one relevant chunk.
608    ///
609    /// The returned vector is free of duplicates.
610    ///
611    /// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to
612    /// determine what exact row contains the final result.
613    pub fn latest_at_relevant_chunks_for_all_components(
614        &self,
615        query: &LatestAtQuery,
616        entity_path: &EntityPath,
617        include_static: bool,
618    ) -> Vec<Arc<Chunk>> {
619        re_tracing::profile_function!(format!("{query:?}"));
620
621        let chunks = if include_static {
622            let empty = Default::default();
623            let static_chunks_per_component = self
624                .static_chunk_ids_per_entity
625                .get(entity_path)
626                .unwrap_or(&empty);
627
628            // All static chunks for the given entity
629            let static_chunks = static_chunks_per_component
630                .values()
631                .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
632                .cloned();
633
634            // All temporal chunks for the given entity, filtered by components
635            // for which we already have static chunks.
636            let temporal_chunks = self
637                .temporal_chunk_ids_per_entity_per_component
638                .get(entity_path)
639                .and_then(|temporal_chunk_ids_per_timeline_per_component| {
640                    temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
641                })
642                .map(|temporal_chunk_ids_per_component| {
643                    temporal_chunk_ids_per_component
644                        .iter()
645                        .filter(|(component_type, _)| {
646                            !static_chunks_per_component.contains_key(component_type)
647                        })
648                        .map(|(_, chunk_id_set)| chunk_id_set)
649                })
650                .into_iter()
651                .flatten()
652                .filter_map(|temporal_chunk_ids_per_time| {
653                    self.latest_at(query, temporal_chunk_ids_per_time)
654                })
655                .flatten();
656
657            static_chunks
658                .chain(temporal_chunks)
659                // Deduplicate before passing it along.
660                // Both temporal and static chunk "sets" here may have duplicates in them,
661                // so we de-duplicate them together to reduce the number of allocations.
662                .unique_by(|chunk| chunk.id())
663                .collect_vec()
664        } else {
665            // This cannot yield duplicates by definition.
666            self.temporal_chunk_ids_per_entity
667                .get(entity_path)
668                .and_then(|temporal_chunk_ids_per_timeline| {
669                    temporal_chunk_ids_per_timeline.get(&query.timeline())
670                })
671                .and_then(|temporal_chunk_ids_per_time| {
672                    self.latest_at(query, temporal_chunk_ids_per_time)
673                })
674                .unwrap_or_default()
675        };
676
677        debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
678
679        chunks
680    }
681
682    fn latest_at(
683        &self,
684        query: &LatestAtQuery,
685        temporal_chunk_ids_per_time: &ChunkIdSetPerTime,
686    ) -> Option<Vec<Arc<Chunk>>> {
687        // Don't do a profile scope here, this can have a lot of overhead when executing many small queries.
688        //re_tracing::profile_function!();
689
690        let upper_bound = temporal_chunk_ids_per_time
691            .per_start_time
692            .range(..=query.at())
693            .next_back()
694            .map(|(time, _)| *time)?;
695
696        // Overlapped chunks
697        // =================
698        //
699        // To deal with potentially overlapping chunks, we keep track of the longest
700        // interval in the entire map, which gives us an upper bound on how much we
701        // would need to walk backwards in order to find all potential overlaps.
702        //
703        // This is a fairly simple solution that scales much better than interval-tree
704        // based alternatives, both in terms of complexity and performance, in the normal
705        // case where most chunks in a collection have similar lengths.
706        //
707        // The most degenerate case -- a single chunk overlaps everything else -- results
708        // in `O(n)` performance, which gets amortized by the query cache.
709        // If that turns out to be a problem in practice, we can experiment with more
710        // complex solutions then.
711        let lower_bound = upper_bound.as_i64().saturating_sub(
712            temporal_chunk_ids_per_time
713                .max_interval_length
714                .saturating_cast(),
715        );
716
717        let temporal_chunk_ids = temporal_chunk_ids_per_time
718            .per_start_time
719            .range(..=query.at())
720            .rev()
721            .take_while(|(time, _)| time.as_i64() >= lower_bound)
722            .flat_map(|(_time, chunk_ids)| chunk_ids.iter())
723            .copied()
724            .collect_vec();
725
726        Some(
727            temporal_chunk_ids
728                .iter()
729                .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id).cloned())
730                .collect(),
731        )
732    }
733}
734
735// Range
736impl ChunkStore {
737    /// Returns the most-relevant chunk(s) for the given [`RangeQuery`] and [`ComponentIdentifier`].
738    ///
739    /// The returned vector is guaranteed free of duplicates, by definition.
740    ///
741    /// The criteria for returning a chunk is only that it may contain data that overlaps with
742    /// the queried range.
743    ///
744    /// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to
745    /// determine how exactly each row of data fit with the rest.
746    ///
747    /// If the entity has static component data associated with it, it will unconditionally
748    /// override any temporal component data.
749    pub fn range_relevant_chunks(
750        &self,
751        query: &RangeQuery,
752        entity_path: &EntityPath,
753        component: ComponentIdentifier,
754    ) -> Vec<Arc<Chunk>> {
755        re_tracing::profile_function!(format!("{query:?}"));
756
757        if let Some(static_chunk) = self
758            .static_chunk_ids_per_entity
759            .get(entity_path)
760            .and_then(|static_chunks_per_component| static_chunks_per_component.get(&component))
761            .and_then(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
762        {
763            return vec![Arc::clone(static_chunk)];
764        }
765
766        let chunks = self
767            .range(
768                query,
769                self.temporal_chunk_ids_per_entity_per_component
770                    .get(entity_path)
771                    .and_then(|temporal_chunk_ids_per_timeline| {
772                        temporal_chunk_ids_per_timeline.get(query.timeline())
773                    })
774                    .and_then(|temporal_chunk_ids_per_component| {
775                        temporal_chunk_ids_per_component.get(&component)
776                    })
777                    .into_iter(),
778            )
779            .into_iter()
780            // Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
781            // need to make sure that the resulting chunks' per-component time range intersects with the
782            // time range of the query itself.
783            .filter(|chunk| {
784                chunk
785                    .timelines()
786                    .get(query.timeline())
787                    .is_some_and(|time_column| {
788                        time_column
789                            .time_range_per_component(chunk.components())
790                            .get(&component)
791                            .is_some_and(|time_range| time_range.intersects(query.range()))
792                    })
793            })
794            .collect_vec();
795
796        debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
797
798        chunks
799    }
800
801    /// Returns the most-relevant chunk(s) for the given [`RangeQuery`].
802    ///
803    /// The criteria for returning a chunk is only that it may contain data that overlaps with
804    /// the queried range, or that it is static.
805    ///
806    /// The returned vector is free of duplicates.
807    ///
808    /// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to
809    /// determine how exactly each row of data fit with the rest.
810    pub fn range_relevant_chunks_for_all_components(
811        &self,
812        query: &RangeQuery,
813        entity_path: &EntityPath,
814        include_static: bool,
815    ) -> Vec<Arc<Chunk>> {
816        re_tracing::profile_function!(format!("{query:?}"));
817
818        let empty = Default::default();
819        let chunks = if include_static {
820            let static_chunks_per_component = self
821                .static_chunk_ids_per_entity
822                .get(entity_path)
823                .unwrap_or(&empty);
824
825            // All static chunks for the given entity
826            let static_chunks = static_chunks_per_component
827                .values()
828                .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
829                .cloned();
830
831            // All temporal chunks for the given entity, filtered by components
832            // for which we already have static chunks.
833            let temporal_chunks = self
834                .range(
835                    query,
836                    self.temporal_chunk_ids_per_entity_per_component
837                        .get(entity_path)
838                        .and_then(|temporal_chunk_ids_per_timeline_per_component| {
839                            temporal_chunk_ids_per_timeline_per_component.get(query.timeline())
840                        })
841                        .map(|temporal_chunk_ids_per_component| {
842                            temporal_chunk_ids_per_component
843                                .iter()
844                                .filter(|(component_type, _)| {
845                                    !static_chunks_per_component.contains_key(component_type)
846                                })
847                                .map(|(_, chunk_id_set)| chunk_id_set)
848                        })
849                        .into_iter()
850                        .flatten(),
851                )
852                .into_iter();
853
854            Either::Left(
855                static_chunks
856                    .chain(temporal_chunks)
857                    // Deduplicate before passing it along.
858                    // Both temporal and static chunk "sets" here may have duplicates in them,
859                    // so we de-duplicate them together to reduce the number of allocations.
860                    .unique_by(|chunk| chunk.id()),
861            )
862        } else {
863            // This cannot yield duplicates by definition.
864            Either::Right(
865                self.range(
866                    query,
867                    self.temporal_chunk_ids_per_entity
868                        .get(entity_path)
869                        .and_then(|temporal_chunk_ids_per_timeline| {
870                            temporal_chunk_ids_per_timeline.get(query.timeline())
871                        })
872                        .into_iter(),
873                ),
874            )
875        };
876
877        // Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
878        // need to make sure that the resulting chunks' global time ranges intersect with the
879        // time range of the query itself.
880        let chunks = chunks
881            .into_iter()
882            .filter(|chunk| {
883                chunk
884                    .timelines()
885                    .get(query.timeline())
886                    .is_some_and(|time_column| time_column.time_range().intersects(query.range()))
887            })
888            .collect_vec();
889
890        debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
891
892        chunks
893    }
894
895    fn range<'a>(
896        &'a self,
897        query: &RangeQuery,
898        temporal_chunk_ids_per_times: impl Iterator<Item = &'a ChunkIdSetPerTime>,
899    ) -> Vec<Arc<Chunk>> {
900        // Too small & frequent for profiling scopes.
901        //re_tracing::profile_function!();
902
903        temporal_chunk_ids_per_times
904            .map(|temporal_chunk_ids_per_time| {
905                // See `RangeQueryOptions::include_extended_bounds` for more information.
906                let query_min = if query.options().include_extended_bounds {
907                    re_log_types::TimeInt::new_temporal(
908                        query.range.min().as_i64().saturating_sub(1),
909                    )
910                } else {
911                    query.range.min()
912                };
913                let query_max = if query.options().include_extended_bounds {
914                    re_log_types::TimeInt::new_temporal(
915                        query.range.max().as_i64().saturating_add(1),
916                    )
917                } else {
918                    query.range.max()
919                };
920
921                // Overlapped chunks
922                // =================
923                //
924                // To deal with potentially overlapping chunks, we keep track of the longest
925                // interval in the entire map, which gives us an upper bound on how much we
926                // would need to walk backwards in order to find all potential overlaps.
927                //
928                // This is a fairly simple solution that scales much better than interval-tree
929                // based alternatives, both in terms of complexity and performance, in the normal
930                // case where most chunks in a collection have similar lengths.
931                //
932                // The most degenerate case -- a single chunk overlaps everything else -- results
933                // in `O(n)` performance, which gets amortized by the query cache.
934                // If that turns out to be a problem in practice, we can experiment with more
935                // complex solutions then.
936                let query_min = TimeInt::new_temporal(
937                    query_min.as_i64().saturating_sub(
938                        temporal_chunk_ids_per_time
939                            .max_interval_length
940                            .saturating_cast(),
941                    ),
942                );
943
944                let start_time = temporal_chunk_ids_per_time
945                    .per_start_time
946                    .range(..=query_min)
947                    .next_back()
948                    .map_or(TimeInt::MIN, |(&time, _)| time);
949
950                let end_time = temporal_chunk_ids_per_time
951                    .per_start_time
952                    .range(..=query_max)
953                    .next_back()
954                    .map_or(start_time, |(&time, _)| time);
955
956                // NOTE: Just being extra cautious because, even though this shouldnt possibly ever happen,
957                // indexing a std map with a backwards range is an instant crash.
958                let end_time = TimeInt::max(start_time, end_time);
959
960                (start_time, end_time, temporal_chunk_ids_per_time)
961            })
962            .flat_map(|(start_time, end_time, temporal_chunk_ids_per_time)| {
963                temporal_chunk_ids_per_time
964                    .per_start_time
965                    .range(start_time..=end_time)
966                    .map(|(_time, chunk_ids)| chunk_ids)
967            })
968            .flat_map(|temporal_chunk_ids| {
969                temporal_chunk_ids
970                    .iter()
971                    .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id).cloned())
972            })
973            .collect()
974    }
975}