re_query/
range.rs

1use std::{collections::BTreeSet, sync::Arc};
2
3use ahash::HashMap;
4use nohash_hasher::IntMap;
5use parking_lot::RwLock;
6
7use re_byte_size::SizeBytes;
8use re_chunk::{Chunk, ChunkId, ComponentIdentifier};
9use re_chunk_store::{ChunkStore, RangeQuery, TimeInt};
10use re_log_types::{AbsoluteTimeRange, EntityPath};
11
12use crate::{QueryCache, QueryCacheKey, QueryError};
13
14// --- Public API ---
15
16impl QueryCache {
17    /// Queries for the given components using range semantics.
18    ///
19    /// See [`RangeResults`] for more information about how to handle the results.
20    ///
21    /// This is a cached API -- data will be lazily cached upon access.
22    pub fn range(
23        &self,
24        query: &RangeQuery,
25        entity_path: &EntityPath,
26        components: impl IntoIterator<Item = ComponentIdentifier>,
27    ) -> RangeResults {
28        re_tracing::profile_function!(entity_path.to_string());
29
30        let store = self.store.read();
31
32        let mut results = RangeResults::new(query.clone());
33
34        // NOTE: This pre-filtering is extremely important: going through all these query layers
35        // has non-negligible overhead even if the final result ends up being nothing, and our
36        // number of queries for a frame grows linearly with the number of entity paths.
37        let components = components.into_iter().filter(|component_identifier| {
38            store.entity_has_component_on_timeline(
39                query.timeline(),
40                entity_path,
41                *component_identifier,
42            )
43        });
44
45        for component in components {
46            let key = QueryCacheKey::new(entity_path.clone(), *query.timeline(), component);
47
48            let cache = Arc::clone(
49                self.range_per_cache_key
50                    .write()
51                    .entry(key.clone())
52                    .or_insert_with(|| Arc::new(RwLock::new(RangeCache::new(key)))),
53            );
54
55            let mut cache = cache.write();
56
57            cache.handle_pending_invalidation();
58
59            let cached = cache.range(&store, query, entity_path, component);
60            if !cached.is_empty() {
61                results.add(component, cached);
62            }
63        }
64
65        results
66    }
67}
68
69// --- Results ---
70
71/// Results for a range query.
72///
73/// The data is both deserialized and resolved/converted.
74///
75/// Use [`RangeResults::get`] or [`RangeResults::get_required`] in order to access the results for
76/// each individual component.
77#[derive(Debug)]
78pub struct RangeResults {
79    /// The query that yielded these results.
80    pub query: RangeQuery,
81
82    /// Results for each individual component.
83    pub components: IntMap<ComponentIdentifier, Vec<Chunk>>,
84}
85
86impl RangeResults {
87    #[inline]
88    pub fn new(query: RangeQuery) -> Self {
89        Self {
90            query,
91            components: Default::default(),
92        }
93    }
94
95    /// Returns the [`Chunk`]s for the specified component.
96    #[inline]
97    pub fn get(&self, component: ComponentIdentifier) -> Option<&[Chunk]> {
98        self.components
99            .get(&component)
100            .map(|chunks| chunks.as_slice())
101    }
102
103    /// Returns the [`Chunk`]s for the specified component.
104    ///
105    /// Returns an error if the component is not present.
106    #[inline]
107    pub fn get_required(&self, component: ComponentIdentifier) -> crate::Result<&[Chunk]> {
108        self.components.get(&component).map_or_else(
109            || Err(QueryError::PrimaryNotFound(component)),
110            |chunks| Ok(chunks.as_slice()),
111        )
112    }
113}
114
115impl RangeResults {
116    #[doc(hidden)]
117    #[inline]
118    pub fn add(&mut self, component: ComponentIdentifier, chunks: Vec<Chunk>) {
119        self.components.insert(component, chunks);
120    }
121}
122
123// --- Cache implementation ---
124
125/// Caches the results of `Range` queries for a given [`QueryCacheKey`].
126pub struct RangeCache {
127    /// For debugging purposes.
128    pub cache_key: QueryCacheKey,
129
130    /// All the [`Chunk`]s currently cached.
131    ///
132    /// See [`RangeCachedChunk`] for more information.
133    pub chunks: HashMap<ChunkId, RangeCachedChunk>,
134
135    /// Every [`ChunkId`] present in this set has been asynchronously invalidated.
136    ///
137    /// The next time this cache gets queried, it must remove any entry matching any of these IDs.
138    ///
139    /// Invalidation is deferred to query time because it is far more efficient that way: the frame
140    /// time effectively behaves as a natural micro-batching mechanism.
141    pub pending_invalidations: BTreeSet<ChunkId>,
142}
143
144impl RangeCache {
145    #[inline]
146    pub fn new(cache_key: QueryCacheKey) -> Self {
147        Self {
148            cache_key,
149            chunks: HashMap::default(),
150            pending_invalidations: BTreeSet::default(),
151        }
152    }
153
154    /// Returns the time range covered by this [`RangeCache`].
155    ///
156    /// This is extremely slow (`O(n)`), don't use this for anything but debugging.
157    #[inline]
158    pub fn time_range(&self) -> AbsoluteTimeRange {
159        self.chunks
160            .values()
161            .filter_map(|cached| {
162                cached
163                    .chunk
164                    .timelines()
165                    .get(&self.cache_key.timeline_name)
166                    .map(|time_column| time_column.time_range())
167            })
168            .fold(AbsoluteTimeRange::EMPTY, |mut acc, time_range| {
169                acc.set_min(TimeInt::min(acc.min(), time_range.min()));
170                acc.set_max(TimeInt::max(acc.max(), time_range.max()));
171                acc
172            })
173    }
174}
175
176impl std::fmt::Debug for RangeCache {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        let Self {
179            cache_key: _,
180            chunks,
181            pending_invalidations: _,
182        } = self;
183
184        let mut strings: Vec<String> = Vec::new();
185
186        strings.push(format!(
187            "{:?} ({})",
188            self.time_range(),
189            re_format::format_bytes(chunks.total_size_bytes() as _),
190        ));
191
192        if strings.is_empty() {
193            return f.write_str("<empty>");
194        }
195
196        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
197    }
198}
199
200pub struct RangeCachedChunk {
201    pub chunk: Chunk,
202
203    /// When a `Chunk` gets cached, it is pre-processed according to the current [`QueryCacheKey`],
204    /// e.g. it is time-sorted on the appropriate timeline.
205    ///
206    /// In the happy case, pre-processing a `Chunk` is a no-op, and the cached `Chunk` is just a
207    /// reference to the real one sitting in the store.
208    /// Otherwise, the cached `Chunk` is a full blown copy of the original one.
209    pub resorted: bool,
210}
211
212impl SizeBytes for RangeCachedChunk {
213    #[inline]
214    fn heap_size_bytes(&self) -> u64 {
215        let Self { chunk, resorted } = self;
216
217        if *resorted {
218            // The chunk had to be post-processed for caching.
219            // Its data was duplicated.
220            Chunk::heap_size_bytes(chunk)
221        } else {
222            // This chunk is just a reference to the one in the store.
223            // Consider it amortized.
224            0
225        }
226    }
227}
228
229impl SizeBytes for RangeCache {
230    #[inline]
231    fn heap_size_bytes(&self) -> u64 {
232        let Self {
233            cache_key,
234            chunks,
235            pending_invalidations,
236        } = self;
237
238        cache_key.heap_size_bytes()
239            + chunks.heap_size_bytes()
240            + pending_invalidations.heap_size_bytes()
241    }
242}
243
244impl RangeCache {
245    /// Queries cached range data for a single component.
246    pub fn range(
247        &mut self,
248        store: &ChunkStore,
249        query: &RangeQuery,
250        entity_path: &EntityPath,
251        component: ComponentIdentifier,
252    ) -> Vec<Chunk> {
253        re_tracing::profile_scope!("range", format!("{query:?}"));
254
255        debug_assert_eq!(query.timeline(), &self.cache_key.timeline_name);
256
257        // First, we forward the query as-is to the store.
258        //
259        // It's fine to run the query every time -- the index scan itself is not the costly part of a
260        // range query.
261        //
262        // For all relevant chunks that we find, we process them according to the [`QueryCacheKey`], and
263        // cache them.
264
265        let raw_chunks = store.range_relevant_chunks(query, entity_path, component);
266        for raw_chunk in &raw_chunks {
267            self.chunks
268                .entry(raw_chunk.id())
269                .or_insert_with(|| RangeCachedChunk {
270                    // TODO(#7008): avoid unnecessary sorting on the unhappy path
271                    chunk: raw_chunk
272                        // Densify the cached chunk according to the cache key's component, which
273                        // will speed up future arrow operations on this chunk.
274                        .densified(component)
275                        // Pre-sort the cached chunk according to the cache key's timeline.
276                        .sorted_by_timeline_if_unsorted(&self.cache_key.timeline_name),
277                    resorted: !raw_chunk.is_timeline_sorted(&self.cache_key.timeline_name),
278                });
279        }
280
281        // Second, we simply retrieve from the cache all the relevant `Chunk`s .
282        //
283        // Since these `Chunk`s have already been pre-processed adequately, running a range filter
284        // on them will be quite cheap.
285
286        raw_chunks
287            .into_iter()
288            .filter_map(|raw_chunk| self.chunks.get(&raw_chunk.id()))
289            .map(|cached_sorted_chunk| {
290                debug_assert!(
291                    cached_sorted_chunk
292                        .chunk
293                        .is_timeline_sorted(query.timeline())
294                );
295
296                let chunk = &cached_sorted_chunk.chunk;
297
298                chunk.range(query, component)
299            })
300            .filter(|chunk| !chunk.is_empty())
301            .collect()
302    }
303
304    #[inline]
305    pub fn handle_pending_invalidation(&mut self) {
306        let Self {
307            cache_key: _,
308            chunks,
309            pending_invalidations,
310        } = self;
311
312        chunks.retain(|chunk_id, _chunk| !pending_invalidations.contains(chunk_id));
313
314        pending_invalidations.clear();
315    }
316}