re_chunk_store/
query.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use itertools::{Either, Itertools as _};
5use nohash_hasher::IntSet;
6use saturating_cast::SaturatingCast as _;
7
8use re_chunk::{Chunk, ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
9use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, Timeline};
10use re_types_core::{ComponentDescriptor, ComponentSet, UnorderedComponentSet};
11
12use crate::ChunkStore;
13use crate::store::ChunkIdSetPerTime;
14
15// Used all over in docstrings.
16#[expect(unused_imports)]
17use crate::RowId;
18
19// ---
20
21// These APIs often have `temporal` and `static` variants.
22// It is sometimes useful to be able to separately query either,
23// such as when we want to tell the user that they logged a component
24// as both static and temporal, which is probably wrong.
25
26impl ChunkStore {
27    /// Retrieve all [`Timeline`]s in the store.
28    #[inline]
29    pub fn timelines(&self) -> BTreeMap<TimelineName, Timeline> {
30        self.time_type_registry
31            .iter()
32            .map(|(name, typ)| (*name, Timeline::new(*name, *typ)))
33            .collect()
34    }
35
36    /// Retrieve all [`EntityPath`]s in the store.
37    #[inline]
38    pub fn all_entities(&self) -> IntSet<EntityPath> {
39        self.static_chunk_ids_per_entity
40            .keys()
41            .cloned()
42            .chain(self.temporal_chunk_ids_per_entity.keys().cloned())
43            .collect()
44    }
45
46    /// Returns a vector with all the chunks in this store, sorted in descending order relative to
47    /// their distance from the given `(timline, time)` cursor.
48    pub fn find_temporal_chunks_furthest_from(
49        &self,
50        timeline: &TimelineName,
51        time: TimeInt,
52    ) -> Vec<Arc<Chunk>> {
53        re_tracing::profile_function!();
54
55        self.chunks_per_chunk_id
56            .values()
57            .filter_map(|chunk| {
58                let times = chunk.timelines().get(timeline)?;
59
60                let min_dist = if times.is_sorted() {
61                    let pivot = times.times_raw().partition_point(|t| *t < time.as_i64());
62                    let min_value1 = times
63                        .times_raw()
64                        .get(pivot.saturating_sub(1))
65                        .map(|t| t.abs_diff(time.as_i64()));
66                    let min_value2 = times
67                        .times_raw()
68                        .get(pivot)
69                        .map(|t| t.abs_diff(time.as_i64()));
70
71                    // NOTE: Do *not* compare these options directly, if any of them turns out to
72                    // be None, it'll be a disaster.
73                    // min_value1.min(min_value2);
74                    [min_value1, min_value2].into_iter().flatten().min()
75                } else {
76                    times
77                        .times()
78                        .map(|t| t.as_i64().abs_diff(time.as_i64()))
79                        .min()
80                };
81
82                min_dist.map(|max_dist| (chunk, max_dist))
83            })
84            .sorted_by(|(_chunk1, dist1), (_chunk2, dist2)| std::cmp::Ord::cmp(dist2, dist1)) // descending
85            .map(|(chunk, _dist)| chunk.clone())
86            .collect_vec()
87    }
88
89    /// An implementation of `find_temporal_chunk_furthest_from` that focuses solely on correctness.
90    ///
91    /// Used to compare with results obtained from the optimized implementation.
92    pub(crate) fn find_temporal_chunks_furthest_from_slow(
93        &self,
94        timeline: &TimelineName,
95        time: TimeInt,
96    ) -> Vec<Arc<Chunk>> {
97        re_tracing::profile_function!();
98
99        self.chunks_per_chunk_id
100            .values()
101            .filter_map(|chunk| {
102                let times = chunk.timelines().get(timeline)?;
103
104                let min_dist = times
105                    .times()
106                    .map(|t| t.as_i64().abs_diff(time.as_i64()))
107                    .min();
108
109                min_dist.map(|max_dist| (chunk, max_dist))
110            })
111            .sorted_by(|(_chunk1, dist1), (_chunk2, dist2)| std::cmp::Ord::cmp(dist2, dist1)) // descending
112            .map(|(chunk, _dist)| chunk.clone())
113            .collect_vec()
114    }
115
116    /// Retrieve all [`EntityPath`]s in the store.
117    #[inline]
118    pub fn all_entities_sorted(&self) -> BTreeSet<EntityPath> {
119        self.static_chunk_ids_per_entity
120            .keys()
121            .cloned()
122            .chain(self.temporal_chunk_ids_per_entity.keys().cloned())
123            .collect()
124    }
125
126    /// Retrieve all [`ComponentIdentifier`]s in the store.
127    ///
128    /// See also [`Self::all_components_sorted`].
129    pub fn all_components(&self) -> UnorderedComponentSet {
130        self.static_chunk_ids_per_entity
131            .values()
132            .flat_map(|static_chunks_per_component| static_chunks_per_component.keys())
133            .chain(
134                self.temporal_chunk_ids_per_entity_per_component
135                    .values()
136                    .flat_map(|temporal_chunk_ids_per_timeline| {
137                        temporal_chunk_ids_per_timeline.values().flat_map(
138                            |temporal_chunk_ids_per_component| {
139                                temporal_chunk_ids_per_component.keys()
140                            },
141                        )
142                    }),
143            )
144            .copied()
145            .collect()
146    }
147
148    /// Retrieve all [`ComponentIdentifier`]s in the store.
149    ///
150    /// See also [`Self::all_components`].
151    pub fn all_components_sorted(&self) -> ComponentSet {
152        self.static_chunk_ids_per_entity
153            .values()
154            .flat_map(|static_chunks_per_component| static_chunks_per_component.keys())
155            .chain(
156                self.temporal_chunk_ids_per_entity_per_component
157                    .values()
158                    .flat_map(|temporal_chunk_ids_per_timeline| {
159                        temporal_chunk_ids_per_timeline.values().flat_map(
160                            |temporal_chunk_ids_per_component| {
161                                temporal_chunk_ids_per_component.keys()
162                            },
163                        )
164                    }),
165            )
166            .copied()
167            .collect()
168    }
169
170    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`] on
171    /// the specified [`Timeline`].
172    ///
173    /// Static components are always included in the results.
174    ///
175    /// Returns `None` if the entity doesn't exist at all on this `timeline`.
176    pub fn all_components_on_timeline(
177        &self,
178        timeline: &TimelineName,
179        entity_path: &EntityPath,
180    ) -> Option<UnorderedComponentSet> {
181        re_tracing::profile_function!();
182
183        let static_components: Option<UnorderedComponentSet> = self
184            .static_chunk_ids_per_entity
185            .get(entity_path)
186            .map(|static_chunks_per_component| {
187                static_chunks_per_component
188                    .keys()
189                    .copied()
190                    .collect::<UnorderedComponentSet>()
191            })
192            .filter(|names| !names.is_empty());
193
194        let temporal_components: Option<UnorderedComponentSet> = self
195            .temporal_chunk_ids_per_entity_per_component
196            .get(entity_path)
197            .map(|temporal_chunk_ids_per_timeline| {
198                temporal_chunk_ids_per_timeline
199                    .get(timeline)
200                    .map(|temporal_chunk_ids_per_component| {
201                        temporal_chunk_ids_per_component
202                            .keys()
203                            .copied()
204                            .collect::<UnorderedComponentSet>()
205                    })
206                    .unwrap_or_default()
207            })
208            .filter(|names| !names.is_empty());
209
210        match (static_components, temporal_components) {
211            (None, None) => None,
212            (None, Some(comps)) | (Some(comps), None) => Some(comps),
213            (Some(static_comps), Some(temporal_comps)) => {
214                Some(static_comps.into_iter().chain(temporal_comps).collect())
215            }
216        }
217    }
218
219    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`] on
220    /// the specified [`Timeline`].
221    ///
222    /// Static components are always included in the results.
223    ///
224    /// Returns `None` if the entity doesn't exist at all on this `timeline`.
225    pub fn all_components_on_timeline_sorted(
226        &self,
227        timeline: &TimelineName,
228        entity_path: &EntityPath,
229    ) -> Option<ComponentSet> {
230        re_tracing::profile_function!();
231
232        let static_components: Option<ComponentSet> = self
233            .static_chunk_ids_per_entity
234            .get(entity_path)
235            .map(|static_chunks_per_component| {
236                static_chunks_per_component
237                    .keys()
238                    .copied()
239                    .collect::<ComponentSet>()
240            })
241            .filter(|names| !names.is_empty());
242
243        let temporal_components: Option<ComponentSet> = self
244            .temporal_chunk_ids_per_entity_per_component
245            .get(entity_path)
246            .map(|temporal_chunk_ids_per_timeline| {
247                temporal_chunk_ids_per_timeline
248                    .get(timeline)
249                    .map(|temporal_chunk_ids_per_component| {
250                        temporal_chunk_ids_per_component
251                            .keys()
252                            .copied()
253                            .collect::<ComponentSet>()
254                    })
255                    .unwrap_or_default()
256            })
257            .filter(|names| !names.is_empty());
258
259        match (static_components, temporal_components) {
260            (None, None) => None,
261            (None, Some(comps)) | (Some(comps), None) => Some(comps),
262            (Some(static_comps), Some(temporal_comps)) => {
263                Some(static_comps.into_iter().chain(temporal_comps).collect())
264            }
265        }
266    }
267
268    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`].
269    ///
270    /// Static components are always included in the results.
271    ///
272    /// Returns `None` if the entity has never had any data logged to it.
273    pub fn all_components_for_entity(
274        &self,
275        entity_path: &EntityPath,
276    ) -> Option<UnorderedComponentSet> {
277        re_tracing::profile_function!();
278
279        let static_components: Option<UnorderedComponentSet> = self
280            .static_chunk_ids_per_entity
281            .get(entity_path)
282            .map(|static_chunks_per_component| {
283                static_chunks_per_component.keys().copied().collect()
284            });
285
286        let temporal_components: Option<UnorderedComponentSet> = self
287            .temporal_chunk_ids_per_entity_per_component
288            .get(entity_path)
289            .map(|temporal_chunk_ids_per_timeline| {
290                temporal_chunk_ids_per_timeline
291                    .iter()
292                    .flat_map(|(_, temporal_chunk_ids_per_component)| {
293                        temporal_chunk_ids_per_component.keys().copied()
294                    })
295                    .collect()
296            });
297
298        match (static_components, temporal_components) {
299            (None, None) => None,
300            (None, comps @ Some(_)) | (comps @ Some(_), None) => comps,
301            (Some(static_comps), Some(temporal_comps)) => {
302                Some(static_comps.into_iter().chain(temporal_comps).collect())
303            }
304        }
305    }
306
307    /// Retrieve all the [`ComponentIdentifier`]s that have been written to for a given [`EntityPath`].
308    ///
309    /// Static components are always included in the results.
310    ///
311    /// Returns `None` if the entity has never had any data logged to it.
312    pub fn all_components_for_entity_sorted(
313        &self,
314        entity_path: &EntityPath,
315    ) -> Option<ComponentSet> {
316        re_tracing::profile_function!();
317
318        let static_components: Option<ComponentSet> = self
319            .static_chunk_ids_per_entity
320            .get(entity_path)
321            .map(|static_chunks_per_component| {
322                static_chunks_per_component.keys().copied().collect()
323            });
324
325        let temporal_components: Option<ComponentSet> = self
326            .temporal_chunk_ids_per_entity_per_component
327            .get(entity_path)
328            .map(|temporal_chunk_ids_per_timeline| {
329                temporal_chunk_ids_per_timeline
330                    .iter()
331                    .flat_map(|(_, temporal_chunk_ids_per_component)| {
332                        temporal_chunk_ids_per_component.keys().copied()
333                    })
334                    .collect()
335            });
336
337        match (static_components, temporal_components) {
338            (None, None) => None,
339            (None, comps @ Some(_)) | (comps @ Some(_), None) => comps,
340            (Some(static_comps), Some(temporal_comps)) => {
341                Some(static_comps.into_iter().chain(temporal_comps).collect())
342            }
343        }
344    }
345
346    /// Retrieves the [`ComponentDescriptor`] at a given [`EntityPath`] that has a certain [`ComponentIdentifier`].
347    // TODO(andreas): The descriptor for a given identifier should never change within a recording.
348    pub fn entity_component_descriptor(
349        &self,
350        entity_path: &EntityPath,
351        component: ComponentIdentifier,
352    ) -> Option<ComponentDescriptor> {
353        self.per_column_metadata
354            .get(entity_path)
355            .and_then(|per_identifier| per_identifier.get(&component))
356            .map(|(component_descr, _, _)| component_descr.clone())
357    }
358
359    /// Check whether an entity has a static component or a temporal component on the specified timeline.
360    ///
361    /// This does _not_ check if the entity actually currently holds any data for that component.
362    #[inline]
363    pub fn entity_has_component_on_timeline(
364        &self,
365        timeline: &TimelineName,
366        entity_path: &EntityPath,
367        component: ComponentIdentifier,
368    ) -> bool {
369        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
370
371        self.entity_has_static_component(entity_path, component)
372            || self.entity_has_temporal_component_on_timeline(timeline, entity_path, component)
373    }
374
375    /// Check whether an entity has a static component or a temporal component on any timeline.
376    ///
377    /// This does _not_ check if the entity actually currently holds any data for that component.
378    pub fn entity_has_component(
379        &self,
380        entity_path: &EntityPath,
381        component: ComponentIdentifier,
382    ) -> bool {
383        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
384
385        self.entity_has_static_component(entity_path, component)
386            || self.entity_has_temporal_component(entity_path, component)
387    }
388
389    /// Check whether an entity has a specific static component.
390    ///
391    /// This does _not_ check if the entity actually currently holds any data for that component.
392    #[inline]
393    pub fn entity_has_static_component(
394        &self,
395        entity_path: &EntityPath,
396        component: ComponentIdentifier,
397    ) -> bool {
398        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
399
400        self.static_chunk_ids_per_entity
401            .get(entity_path)
402            .is_some_and(|static_chunk_ids_per_component| {
403                static_chunk_ids_per_component.contains_key(&component)
404            })
405    }
406
407    /// Check whether an entity has a temporal component on any timeline.
408    ///
409    /// This does _not_ check if the entity actually currently holds any data for that component.
410    #[inline]
411    pub fn entity_has_temporal_component(
412        &self,
413        entity_path: &EntityPath,
414        component: ComponentIdentifier,
415    ) -> bool {
416        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
417
418        self.temporal_chunk_ids_per_entity_per_component
419            .get(entity_path)
420            .iter()
421            .flat_map(|temporal_chunk_ids_per_timeline| temporal_chunk_ids_per_timeline.values())
422            .any(|temporal_chunk_ids_per_component| {
423                temporal_chunk_ids_per_component.contains_key(&component)
424            })
425    }
426
427    /// Check whether an entity has a temporal component on a specific timeline.
428    ///
429    /// This does _not_ check if the entity actually currently holds any data for that component.
430    #[inline]
431    pub fn entity_has_temporal_component_on_timeline(
432        &self,
433        timeline: &TimelineName,
434        entity_path: &EntityPath,
435        component: ComponentIdentifier,
436    ) -> bool {
437        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
438
439        self.temporal_chunk_ids_per_entity_per_component
440            .get(entity_path)
441            .iter()
442            .filter_map(|temporal_chunk_ids_per_timeline| {
443                temporal_chunk_ids_per_timeline.get(timeline)
444            })
445            .any(|temporal_chunk_ids_per_component| {
446                temporal_chunk_ids_per_component.contains_key(&component)
447            })
448    }
449
450    /// Check whether an entity has any data on a specific timeline, or any static data.
451    ///
452    /// This is different from checking if the entity has any component, it also ensures
453    /// that some _data_ currently exists in the store for this entity.
454    #[inline]
455    pub fn entity_has_data_on_timeline(
456        &self,
457        timeline: &TimelineName,
458        entity_path: &EntityPath,
459    ) -> bool {
460        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
461
462        self.entity_has_static_data(entity_path)
463            || self.entity_has_temporal_data_on_timeline(timeline, entity_path)
464    }
465
466    /// Check whether an entity has any static data or any temporal data on any timeline.
467    ///
468    /// This is different from checking if the entity has any component, it also ensures
469    /// that some _data_ currently exists in the store for this entity.
470    #[inline]
471    pub fn entity_has_data(&self, entity_path: &EntityPath) -> bool {
472        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
473
474        self.entity_has_static_data(entity_path) || self.entity_has_temporal_data(entity_path)
475    }
476
477    /// Check whether an entity has any static data.
478    ///
479    /// This is different from checking if the entity has any component, it also ensures
480    /// that some _data_ currently exists in the store for this entity.
481    #[inline]
482    pub fn entity_has_static_data(&self, entity_path: &EntityPath) -> bool {
483        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
484
485        self.static_chunk_ids_per_entity
486            .get(entity_path)
487            .is_some_and(|static_chunk_ids_per_component| {
488                static_chunk_ids_per_component
489                    .values()
490                    .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
491            })
492    }
493
494    /// Check whether an entity has any temporal data.
495    ///
496    /// This is different from checking if the entity has any component, it also ensures
497    /// that some _data_ currently exists in the store for this entity.
498    #[inline]
499    pub fn entity_has_temporal_data(&self, entity_path: &EntityPath) -> bool {
500        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
501
502        self.temporal_chunk_ids_per_entity_per_component
503            .get(entity_path)
504            .is_some_and(|temporal_chunks_per_timeline| {
505                temporal_chunks_per_timeline
506                    .values()
507                    .flat_map(|temporal_chunks_per_component| {
508                        temporal_chunks_per_component.values()
509                    })
510                    .flat_map(|chunk_id_sets| chunk_id_sets.per_start_time.values())
511                    .flat_map(|chunk_id_set| chunk_id_set.iter())
512                    .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
513            })
514    }
515
516    /// Check whether an entity has any temporal data.
517    ///
518    /// This is different from checking if the entity has any component, it also ensures
519    /// that some _data_ currently exists in the store for this entity.
520    #[inline]
521    pub fn entity_has_temporal_data_on_timeline(
522        &self,
523        timeline: &TimelineName,
524        entity_path: &EntityPath,
525    ) -> bool {
526        // re_tracing::profile_function!(); // This function is too fast; profiling will only add overhead
527
528        self.temporal_chunk_ids_per_entity_per_component
529            .get(entity_path)
530            .and_then(|temporal_chunks_per_timeline| temporal_chunks_per_timeline.get(timeline))
531            .is_some_and(|temporal_chunks_per_component| {
532                temporal_chunks_per_component
533                    .values()
534                    .flat_map(|chunk_id_sets| chunk_id_sets.per_start_time.values())
535                    .flat_map(|chunk_id_set| chunk_id_set.iter())
536                    .any(|chunk_id| self.chunks_per_chunk_id.contains_key(chunk_id))
537            })
538    }
539
540    /// Find the earliest time at which something was logged for a given entity on the specified
541    /// timeline.
542    ///
543    /// Ignores static data.
544    #[inline]
545    pub fn entity_min_time(
546        &self,
547        timeline: &TimelineName,
548        entity_path: &EntityPath,
549    ) -> Option<TimeInt> {
550        let temporal_chunk_ids_per_timeline = self
551            .temporal_chunk_ids_per_entity_per_component
552            .get(entity_path)?;
553        let temporal_chunk_ids_per_component = temporal_chunk_ids_per_timeline.get(timeline)?;
554
555        let mut time_min = TimeInt::MAX;
556        for temporal_chunk_ids_per_time in temporal_chunk_ids_per_component.values() {
557            let Some(time) = temporal_chunk_ids_per_time
558                .per_start_time
559                .first_key_value()
560                .map(|(time, _)| *time)
561            else {
562                continue;
563            };
564            time_min = TimeInt::min(time_min, time);
565        }
566
567        (time_min != TimeInt::MAX).then_some(time_min)
568    }
569
570    /// Returns the min and max times at which data was logged for an entity on a specific timeline.
571    ///
572    /// This ignores static data.
573    pub fn entity_time_range(
574        &self,
575        timeline: &TimelineName,
576        entity_path: &EntityPath,
577    ) -> Option<AbsoluteTimeRange> {
578        re_tracing::profile_function!();
579
580        let temporal_chunk_ids_per_timeline =
581            self.temporal_chunk_ids_per_entity.get(entity_path)?;
582        let chunk_id_sets = temporal_chunk_ids_per_timeline.get(timeline)?;
583
584        let start = chunk_id_sets.per_start_time.first_key_value()?.0;
585        let end = chunk_id_sets.per_end_time.last_key_value()?.0;
586
587        Some(AbsoluteTimeRange::new(*start, *end))
588    }
589
590    /// Returns the min and max times at which data was logged on a specific timeline, considering
591    /// all entities.
592    ///
593    /// This ignores static data.
594    pub fn time_range(&self, timeline: &TimelineName) -> Option<AbsoluteTimeRange> {
595        re_tracing::profile_function!();
596
597        self.temporal_chunk_ids_per_entity
598            .values()
599            .filter_map(|temporal_chunk_ids_per_timeline| {
600                let per_time = temporal_chunk_ids_per_timeline.get(timeline)?;
601                let start = per_time.per_start_time.first_key_value()?.0;
602                let end = per_time.per_end_time.last_key_value()?.0;
603                Some(AbsoluteTimeRange::new(*start, *end))
604            })
605            .reduce(|r1, r2| r1.union(r2))
606    }
607}
608
609// LatestAt
610impl ChunkStore {
611    /// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`] and [`ComponentIdentifier`].
612    ///
613    /// The returned vector is guaranteed free of duplicates, by definition.
614    ///
615    /// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is
616    /// oblivious to the data therein.
617    /// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible
618    /// that a query has more than one relevant chunk.
619    ///
620    /// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to
621    /// determine what exact row contains the final result.
622    ///
623    /// If the entity has static component data associated with it, it will unconditionally
624    /// override any temporal component data.
625    pub fn latest_at_relevant_chunks(
626        &self,
627        query: &LatestAtQuery,
628        entity_path: &EntityPath,
629        component: ComponentIdentifier,
630    ) -> Vec<Arc<Chunk>> {
631        // Don't do a profile scope here, this can have a lot of overhead when executing many small queries.
632        //re_tracing::profile_function!(format!("{query:?}"));
633
634        // Reminder: if a chunk has been indexed for a given component, then it must contain at
635        // least one non-null value for that column.
636
637        if let Some(static_chunk) = self
638            .static_chunk_ids_per_entity
639            .get(entity_path)
640            .and_then(|static_chunks_per_component| static_chunks_per_component.get(&component))
641            .and_then(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
642        {
643            return vec![Arc::clone(static_chunk)];
644        }
645
646        let chunks = self
647            .temporal_chunk_ids_per_entity_per_component
648            .get(entity_path)
649            .and_then(|temporal_chunk_ids_per_timeline| {
650                temporal_chunk_ids_per_timeline.get(&query.timeline())
651            })
652            .and_then(|temporal_chunk_ids_per_component| {
653                temporal_chunk_ids_per_component.get(&component)
654            })
655            .and_then(|temporal_chunk_ids_per_time| {
656                self.latest_at(query, temporal_chunk_ids_per_time)
657            })
658            .unwrap_or_default();
659
660        debug_assert!(
661            chunks.iter().map(|chunk| chunk.id()).all_unique(),
662            "{entity_path}:{component} @ {query:?}",
663        );
664
665        chunks
666    }
667
668    /// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`].
669    ///
670    /// Optionally include static data.
671    ///
672    /// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is
673    /// oblivious to the data therein.
674    /// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible
675    /// that a query has more than one relevant chunk.
676    ///
677    /// The returned vector is free of duplicates.
678    ///
679    /// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to
680    /// determine what exact row contains the final result.
681    pub fn latest_at_relevant_chunks_for_all_components(
682        &self,
683        query: &LatestAtQuery,
684        entity_path: &EntityPath,
685        include_static: bool,
686    ) -> Vec<Arc<Chunk>> {
687        re_tracing::profile_function!(format!("{query:?}"));
688
689        let chunks = if include_static {
690            let empty = Default::default();
691            let static_chunks_per_component = self
692                .static_chunk_ids_per_entity
693                .get(entity_path)
694                .unwrap_or(&empty);
695
696            // All static chunks for the given entity
697            let static_chunks = static_chunks_per_component
698                .values()
699                .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
700                .cloned();
701
702            // All temporal chunks for the given entity, filtered by components
703            // for which we already have static chunks.
704            let temporal_chunks = self
705                .temporal_chunk_ids_per_entity_per_component
706                .get(entity_path)
707                .and_then(|temporal_chunk_ids_per_timeline_per_component| {
708                    temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
709                })
710                .map(|temporal_chunk_ids_per_component| {
711                    temporal_chunk_ids_per_component
712                        .iter()
713                        .filter(|(component_type, _)| {
714                            !static_chunks_per_component.contains_key(component_type)
715                        })
716                        .map(|(_, chunk_id_set)| chunk_id_set)
717                })
718                .into_iter()
719                .flatten()
720                .filter_map(|temporal_chunk_ids_per_time| {
721                    self.latest_at(query, temporal_chunk_ids_per_time)
722                })
723                .flatten();
724
725            static_chunks
726                .chain(temporal_chunks)
727                // Deduplicate before passing it along.
728                // Both temporal and static chunk "sets" here may have duplicates in them,
729                // so we de-duplicate them together to reduce the number of allocations.
730                .unique_by(|chunk| chunk.id())
731                .collect_vec()
732        } else {
733            // This cannot yield duplicates by definition.
734            self.temporal_chunk_ids_per_entity
735                .get(entity_path)
736                .and_then(|temporal_chunk_ids_per_timeline| {
737                    temporal_chunk_ids_per_timeline.get(&query.timeline())
738                })
739                .and_then(|temporal_chunk_ids_per_time| {
740                    self.latest_at(query, temporal_chunk_ids_per_time)
741                })
742                .unwrap_or_default()
743        };
744
745        debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
746
747        chunks
748    }
749
750    fn latest_at(
751        &self,
752        query: &LatestAtQuery,
753        temporal_chunk_ids_per_time: &ChunkIdSetPerTime,
754    ) -> Option<Vec<Arc<Chunk>>> {
755        // Don't do a profile scope here, this can have a lot of overhead when executing many small queries.
756        //re_tracing::profile_function!();
757
758        let upper_bound = temporal_chunk_ids_per_time
759            .per_start_time
760            .range(..=query.at())
761            .next_back()
762            .map(|(time, _)| *time)?;
763
764        // Overlapped chunks
765        // =================
766        //
767        // To deal with potentially overlapping chunks, we keep track of the longest
768        // interval in the entire map, which gives us an upper bound on how much we
769        // would need to walk backwards in order to find all potential overlaps.
770        //
771        // This is a fairly simple solution that scales much better than interval-tree
772        // based alternatives, both in terms of complexity and performance, in the normal
773        // case where most chunks in a collection have similar lengths.
774        //
775        // The most degenerate case -- a single chunk overlaps everything else -- results
776        // in `O(n)` performance, which gets amortized by the query cache.
777        // If that turns out to be a problem in practice, we can experiment with more
778        // complex solutions then.
779        let lower_bound = upper_bound.as_i64().saturating_sub(
780            temporal_chunk_ids_per_time
781                .max_interval_length
782                .saturating_cast(),
783        );
784
785        let temporal_chunk_ids = temporal_chunk_ids_per_time
786            .per_start_time
787            .range(..=query.at())
788            .rev()
789            .take_while(|(time, _)| time.as_i64() >= lower_bound)
790            .flat_map(|(_time, chunk_ids)| chunk_ids.iter())
791            .copied()
792            .collect_vec();
793
794        Some(
795            temporal_chunk_ids
796                .iter()
797                .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id).cloned())
798                .collect(),
799        )
800    }
801}
802
803// Range
804impl ChunkStore {
805    /// Returns the most-relevant chunk(s) for the given [`RangeQuery`] and [`ComponentIdentifier`].
806    ///
807    /// The returned vector is guaranteed free of duplicates, by definition.
808    ///
809    /// The criteria for returning a chunk is only that it may contain data that overlaps with
810    /// the queried range.
811    ///
812    /// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to
813    /// determine how exactly each row of data fit with the rest.
814    ///
815    /// If the entity has static component data associated with it, it will unconditionally
816    /// override any temporal component data.
817    pub fn range_relevant_chunks(
818        &self,
819        query: &RangeQuery,
820        entity_path: &EntityPath,
821        component: ComponentIdentifier,
822    ) -> Vec<Arc<Chunk>> {
823        re_tracing::profile_function!(format!("{query:?}"));
824
825        if let Some(static_chunk) = self
826            .static_chunk_ids_per_entity
827            .get(entity_path)
828            .and_then(|static_chunks_per_component| static_chunks_per_component.get(&component))
829            .and_then(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
830        {
831            return vec![Arc::clone(static_chunk)];
832        }
833
834        let chunks = self
835            .range(
836                query,
837                self.temporal_chunk_ids_per_entity_per_component
838                    .get(entity_path)
839                    .and_then(|temporal_chunk_ids_per_timeline| {
840                        temporal_chunk_ids_per_timeline.get(query.timeline())
841                    })
842                    .and_then(|temporal_chunk_ids_per_component| {
843                        temporal_chunk_ids_per_component.get(&component)
844                    })
845                    .into_iter(),
846            )
847            .into_iter()
848            // Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
849            // need to make sure that the resulting chunks' per-component time range intersects with the
850            // time range of the query itself.
851            .filter(|chunk| {
852                chunk
853                    .timelines()
854                    .get(query.timeline())
855                    .is_some_and(|time_column| {
856                        time_column
857                            .time_range_per_component(chunk.components())
858                            .get(&component)
859                            .is_some_and(|time_range| time_range.intersects(query.range()))
860                    })
861            })
862            .collect_vec();
863
864        debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
865
866        chunks
867    }
868
869    /// Returns the most-relevant chunk(s) for the given [`RangeQuery`].
870    ///
871    /// The criteria for returning a chunk is only that it may contain data that overlaps with
872    /// the queried range, or that it is static.
873    ///
874    /// The returned vector is free of duplicates.
875    ///
876    /// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to
877    /// determine how exactly each row of data fit with the rest.
878    pub fn range_relevant_chunks_for_all_components(
879        &self,
880        query: &RangeQuery,
881        entity_path: &EntityPath,
882        include_static: bool,
883    ) -> Vec<Arc<Chunk>> {
884        re_tracing::profile_function!(format!("{query:?}"));
885
886        let empty = Default::default();
887        let chunks = if include_static {
888            let static_chunks_per_component = self
889                .static_chunk_ids_per_entity
890                .get(entity_path)
891                .unwrap_or(&empty);
892
893            // All static chunks for the given entity
894            let static_chunks = static_chunks_per_component
895                .values()
896                .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
897                .cloned();
898
899            // All temporal chunks for the given entity, filtered by components
900            // for which we already have static chunks.
901            let temporal_chunks = self
902                .range(
903                    query,
904                    self.temporal_chunk_ids_per_entity_per_component
905                        .get(entity_path)
906                        .and_then(|temporal_chunk_ids_per_timeline_per_component| {
907                            temporal_chunk_ids_per_timeline_per_component.get(query.timeline())
908                        })
909                        .map(|temporal_chunk_ids_per_component| {
910                            temporal_chunk_ids_per_component
911                                .iter()
912                                .filter(|(component_type, _)| {
913                                    !static_chunks_per_component.contains_key(component_type)
914                                })
915                                .map(|(_, chunk_id_set)| chunk_id_set)
916                        })
917                        .into_iter()
918                        .flatten(),
919                )
920                .into_iter();
921
922            Either::Left(
923                static_chunks
924                    .chain(temporal_chunks)
925                    // Deduplicate before passing it along.
926                    // Both temporal and static chunk "sets" here may have duplicates in them,
927                    // so we de-duplicate them together to reduce the number of allocations.
928                    .unique_by(|chunk| chunk.id()),
929            )
930        } else {
931            // This cannot yield duplicates by definition.
932            Either::Right(
933                self.range(
934                    query,
935                    self.temporal_chunk_ids_per_entity
936                        .get(entity_path)
937                        .and_then(|temporal_chunk_ids_per_timeline| {
938                            temporal_chunk_ids_per_timeline.get(query.timeline())
939                        })
940                        .into_iter(),
941                ),
942            )
943        };
944
945        // Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
946        // need to make sure that the resulting chunks' global time ranges intersect with the
947        // time range of the query itself.
948        let chunks = chunks
949            .into_iter()
950            .filter(|chunk| {
951                chunk
952                    .timelines()
953                    .get(query.timeline())
954                    .is_some_and(|time_column| time_column.time_range().intersects(query.range()))
955            })
956            .collect_vec();
957
958        debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
959
960        chunks
961    }
962
963    fn range<'a>(
964        &'a self,
965        query: &RangeQuery,
966        temporal_chunk_ids_per_times: impl Iterator<Item = &'a ChunkIdSetPerTime>,
967    ) -> Vec<Arc<Chunk>> {
968        // Too small & frequent for profiling scopes.
969        //re_tracing::profile_function!();
970
971        temporal_chunk_ids_per_times
972            .map(|temporal_chunk_ids_per_time| {
973                // See `RangeQueryOptions::include_extended_bounds` for more information.
974                let query_min = if query.options().include_extended_bounds {
975                    re_log_types::TimeInt::new_temporal(
976                        query.range.min().as_i64().saturating_sub(1),
977                    )
978                } else {
979                    query.range.min()
980                };
981                let query_max = if query.options().include_extended_bounds {
982                    re_log_types::TimeInt::new_temporal(
983                        query.range.max().as_i64().saturating_add(1),
984                    )
985                } else {
986                    query.range.max()
987                };
988
989                // Overlapped chunks
990                // =================
991                //
992                // To deal with potentially overlapping chunks, we keep track of the longest
993                // interval in the entire map, which gives us an upper bound on how much we
994                // would need to walk backwards in order to find all potential overlaps.
995                //
996                // This is a fairly simple solution that scales much better than interval-tree
997                // based alternatives, both in terms of complexity and performance, in the normal
998                // case where most chunks in a collection have similar lengths.
999                //
1000                // The most degenerate case -- a single chunk overlaps everything else -- results
1001                // in `O(n)` performance, which gets amortized by the query cache.
1002                // If that turns out to be a problem in practice, we can experiment with more
1003                // complex solutions then.
1004                let query_min = TimeInt::new_temporal(
1005                    query_min.as_i64().saturating_sub(
1006                        temporal_chunk_ids_per_time
1007                            .max_interval_length
1008                            .saturating_cast(),
1009                    ),
1010                );
1011
1012                let start_time = temporal_chunk_ids_per_time
1013                    .per_start_time
1014                    .range(..=query_min)
1015                    .next_back()
1016                    .map_or(TimeInt::MIN, |(&time, _)| time);
1017
1018                let end_time = temporal_chunk_ids_per_time
1019                    .per_start_time
1020                    .range(..=query_max)
1021                    .next_back()
1022                    .map_or(start_time, |(&time, _)| time);
1023
1024                // NOTE: Just being extra cautious because, even though this shouldnt possibly ever happen,
1025                // indexing a std map with a backwards range is an instant crash.
1026                let end_time = TimeInt::max(start_time, end_time);
1027
1028                (start_time, end_time, temporal_chunk_ids_per_time)
1029            })
1030            .flat_map(|(start_time, end_time, temporal_chunk_ids_per_time)| {
1031                temporal_chunk_ids_per_time
1032                    .per_start_time
1033                    .range(start_time..=end_time)
1034                    .map(|(_time, chunk_ids)| chunk_ids)
1035            })
1036            .flat_map(|temporal_chunk_ids| {
1037                temporal_chunk_ids
1038                    .iter()
1039                    .filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id).cloned())
1040            })
1041            .collect()
1042    }
1043}