Skip to main content

re_query/
range.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use ahash::HashMap;
5use nohash_hasher::IntMap;
6use parking_lot::RwLock;
7use re_byte_size::SizeBytes;
8use re_chunk::{Chunk, ChunkId, ComponentIdentifier};
9use re_chunk_store::{ChunkStore, ChunkTrackingMode, RangeQuery, TimeInt};
10use re_log_types::{AbsoluteTimeRange, EntityPath};
11
12use crate::{QueryCache, QueryCacheKey, QueryError};
13
14// --- Public API ---
15
16impl QueryCache {
17    /// Queries for the given components using range semantics.
18    ///
19    /// See [`RangeResults`] for more information about how to handle the results.
20    ///
21    /// This is a cached API -- data will be lazily cached upon access.
22    pub fn range(
23        &self,
24        query: &RangeQuery,
25        entity_path: &EntityPath,
26        components: impl IntoIterator<Item = ComponentIdentifier>,
27    ) -> RangeResults {
28        re_tracing::profile_function!(entity_path.to_string());
29
30        let store = self.store.read();
31
32        let mut results = RangeResults::new(query.clone());
33
34        // NOTE: This pre-filtering is extremely important: going through all these query layers
35        // has non-negligible overhead even if the final result ends up being nothing, and our
36        // number of queries for a frame grows linearly with the number of entity paths.
37        let components = components.into_iter().filter(|component_identifier| {
38            store.entity_has_component_on_timeline(
39                query.timeline(),
40                entity_path,
41                *component_identifier,
42            )
43        });
44
45        for component in components {
46            let key = QueryCacheKey::new(entity_path.clone(), *query.timeline(), component);
47
48            let cache = Arc::clone(
49                self.range_per_cache_key
50                    .write()
51                    .entry(key.clone())
52                    .or_insert_with(|| Arc::new(RwLock::new(RangeCache::new(key)))),
53            );
54
55            let mut cache = cache.write();
56
57            cache.handle_pending_invalidation();
58
59            let (cached, missing) = cache.range(&store, query, entity_path, component);
60            results.missing_virtual.extend(missing);
61            if !cached.is_empty() {
62                results.add(component, cached);
63            }
64        }
65
66        results
67    }
68}
69
70// --- Results ---
71
72/// Results for a range query.
73///
74/// The data is both deserialized and resolved/converted.
75///
76/// Use [`RangeResults::get`] or [`RangeResults::get_required`] in order to access the results for
77/// each individual component.
78///
79/// Since the introduction of virtual/offloaded chunks, it is possible for a query to detect that
80/// it is missing some data in order to compute accurate results.
81/// This lack of data is communicated using a non-empty [`RangeResults::missing_virtual`] field.
82#[derive(Debug, PartialEq)]
83pub struct RangeResults {
84    /// The query that yielded these results.
85    pub query: RangeQuery,
86
87    /// The relevant *virtual* chunks that were found for this query.
88    ///
89    /// Until these chunks have been fetched and inserted into the appropriate [`ChunkStore`], the
90    /// results of this query cannot accurately be computed.
91    ///
92    /// Note, these are NOT necessarily _root_ chunks.
93    /// Use [`ChunkStore::find_root_chunks`] to get those.
94    //
95    // TODO(cmc): Once lineage tracking is in place, make sure that this only reports missing
96    // chunks using their root-level IDs, so downstream consumers don't have to redundantly build
97    // their own tracking. And document it so.
98    pub missing_virtual: Vec<ChunkId>,
99
100    /// Results for each individual component.
101    pub components: IntMap<ComponentIdentifier, Vec<Chunk>>,
102}
103
104impl RangeResults {
105    /// Returns true if these are partial results.
106    ///
107    /// Partial results happen when some of the chunks required to accurately compute the query are
108    /// currently missing/offloaded.
109    /// It is then the responsibility of the caller to look into the [missing chunk IDs], fetch
110    /// them, load them, and then try the query again.
111    ///
112    /// [missing chunk IDs]: `Self::missing_virtual`
113    pub fn is_partial(&self) -> bool {
114        !self.missing_virtual.is_empty()
115    }
116
117    /// Returns true if the results are *completely* empty.
118    ///
119    /// I.e. neither physical/loaded nor virtual/offloaded chunks could be found.
120    pub fn is_empty(&self) -> bool {
121        let Self {
122            query: _,
123            missing_virtual,
124            components,
125        } = self;
126        missing_virtual.is_empty() && components.values().all(|chunks| chunks.is_empty())
127    }
128
129    /// Returns the [`Chunk`]s for the specified component.
130    #[inline]
131    pub fn get(&self, component: ComponentIdentifier) -> Option<&[Chunk]> {
132        self.components
133            .get(&component)
134            .map(|chunks| chunks.as_slice())
135    }
136
137    /// Returns the [`Chunk`]s for the specified component.
138    ///
139    /// Returns an error if the component is not present.
140    #[inline]
141    pub fn get_required(&self, component: ComponentIdentifier) -> crate::Result<&[Chunk]> {
142        self.components.get(&component).map_or_else(
143            || Err(QueryError::PrimaryNotFound(component)),
144            |chunks| Ok(chunks.as_slice()),
145        )
146    }
147}
148
149impl RangeResults {
150    #[inline]
151    fn new(query: RangeQuery) -> Self {
152        Self {
153            query,
154            missing_virtual: Default::default(),
155            components: Default::default(),
156        }
157    }
158
159    #[inline]
160    fn add(&mut self, component: ComponentIdentifier, chunks: Vec<Chunk>) {
161        self.components.insert(component, chunks);
162    }
163}
164
165// --- Cache implementation ---
166
167/// Caches the results of `Range` queries for a given [`QueryCacheKey`].
168pub struct RangeCache {
169    /// For debugging purposes.
170    pub cache_key: QueryCacheKey,
171
172    /// All the [`Chunk`]s currently cached.
173    ///
174    /// See [`RangeCachedChunk`] for more information.
175    pub chunks: HashMap<ChunkId, RangeCachedChunk>,
176
177    /// Every [`ChunkId`] present in this set has been asynchronously invalidated.
178    ///
179    /// The next time this cache gets queried, it must remove any entry matching any of these IDs.
180    ///
181    /// Invalidation is deferred to query time because it is far more efficient that way: the frame
182    /// time effectively behaves as a natural micro-batching mechanism.
183    pub pending_invalidations: BTreeSet<ChunkId>,
184}
185
186impl RangeCache {
187    #[inline]
188    pub fn new(cache_key: QueryCacheKey) -> Self {
189        Self {
190            cache_key,
191            chunks: HashMap::default(),
192            pending_invalidations: BTreeSet::default(),
193        }
194    }
195
196    /// Returns the time range covered by this [`RangeCache`].
197    ///
198    /// This is extremely slow (`O(n)`), don't use this for anything but debugging.
199    #[inline]
200    pub fn time_range(&self) -> AbsoluteTimeRange {
201        self.chunks
202            .values()
203            .filter_map(|cached| {
204                cached
205                    .chunk
206                    .timelines()
207                    .get(&self.cache_key.timeline_name)
208                    .map(|time_column| time_column.time_range())
209            })
210            .fold(AbsoluteTimeRange::EMPTY, |mut acc, time_range| {
211                acc.set_min(TimeInt::min(acc.min(), time_range.min()));
212                acc.set_max(TimeInt::max(acc.max(), time_range.max()));
213                acc
214            })
215    }
216}
217
218impl std::fmt::Debug for RangeCache {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        let Self {
221            cache_key: _,
222            chunks,
223            pending_invalidations: _,
224        } = self;
225
226        let mut strings: Vec<String> = Vec::new();
227
228        strings.push(format!(
229            "{:?} ({})",
230            self.time_range(),
231            re_format::format_bytes(chunks.total_size_bytes() as _),
232        ));
233
234        if strings.is_empty() {
235            return f.write_str("<empty>");
236        }
237
238        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
239    }
240}
241
242pub struct RangeCachedChunk {
243    pub chunk: Chunk,
244
245    /// When a `Chunk` gets cached, it is pre-processed according to the current [`QueryCacheKey`],
246    /// e.g. it is time-sorted on the appropriate timeline and densified for the given component.
247    ///
248    /// In the happy case, pre-processing a `Chunk` is a no-op, and the cached `Chunk` is just a
249    /// reference to the real one sitting in the store.
250    /// Otherwise, the cached `Chunk` is a full blown copy of the original one.
251    pub reallocated: bool,
252}
253
254impl SizeBytes for RangeCachedChunk {
255    #[inline]
256    fn heap_size_bytes(&self) -> u64 {
257        let Self { chunk, reallocated } = self;
258
259        if *reallocated {
260            // The chunk had to be post-processed for caching.
261            // Its data was duplicated.
262            Chunk::heap_size_bytes(chunk)
263        } else {
264            // This chunk is just a reference to the one in the store.
265            // Consider it amortized.
266            0
267        }
268    }
269}
270
271impl SizeBytes for RangeCache {
272    #[inline]
273    fn heap_size_bytes(&self) -> u64 {
274        let Self {
275            cache_key,
276            chunks,
277            pending_invalidations,
278        } = self;
279
280        cache_key.heap_size_bytes()
281            + chunks.heap_size_bytes()
282            + pending_invalidations.heap_size_bytes()
283    }
284}
285
286impl RangeCache {
287    /// Queries cached range data for a single component.
288    ///
289    /// This returns the cached physical chunks that were found for this query, as well as any
290    /// virtual chunks that need to be fetched and loaded.
291    /// It is then the responsibility of the caller to look into these missing chunk IDs, fetch
292    /// them, load them, and then try the query again.
293    ///
294    /// Returns `(cached_chunks, missing_chunk_ids)`.
295    fn range(
296        &mut self,
297        store: &ChunkStore,
298        query: &RangeQuery,
299        entity_path: &EntityPath,
300        component: ComponentIdentifier,
301    ) -> (Vec<Chunk>, Vec<ChunkId>) {
302        re_tracing::profile_scope!("range", format!("{query:?}"));
303
304        re_log::debug_assert_eq!(query.timeline(), &self.cache_key.timeline_name);
305
306        // First, we forward the query as-is to the store.
307        //
308        // It's fine to run the query every time -- the index scan itself is not the costly part of a
309        // range query.
310        //
311        // For all relevant chunks that we find, we process them according to the [`QueryCacheKey`], and
312        // cache them.
313
314        let results =
315            store.range_relevant_chunks(ChunkTrackingMode::Report, query, entity_path, component);
316        // It is perfectly safe to cache partial range results, since missing data (if any), cannot
317        // possibly affect what's already cached, it can only augment it.
318        // Therefore, we do not even check for partial results here.
319        for raw_chunk in &results.chunks {
320            self.chunks.entry(raw_chunk.id()).or_insert_with(|| {
321                // Densify the cached chunk according to the cache key's component, which
322                // will speed up future arrow operations on this chunk.
323                let (chunk, densified) = raw_chunk.densified(component);
324
325                // Pre-sort the cached chunk according to the cache key's timeline.
326                //
327                // TODO(#7008): avoid unnecessary sorting on the unhappy path
328                let chunk = chunk.sorted_by_timeline_if_unsorted(&self.cache_key.timeline_name);
329
330                let reallocated =
331                    densified || !raw_chunk.is_timeline_sorted(&self.cache_key.timeline_name);
332
333                RangeCachedChunk { chunk, reallocated }
334            });
335        }
336
337        // Second, we simply retrieve from the cache all the relevant `Chunk`s .
338        //
339        // Since these `Chunk`s have already been pre-processed adequately, running a range filter
340        // on them will be quite cheap.
341
342        // It is perfectly fine to return partial range results, as they are always valid on their own,
343        // as long as we also advertise that some chunks were missing (which we do).
344        // Therefore, we do not even check for partial results here.
345        let chunks = results
346            .chunks
347            .into_iter()
348            .filter_map(|raw_chunk| self.chunks.get(&raw_chunk.id()))
349            .map(|cached_sorted_chunk| {
350                re_log::debug_assert!(
351                    cached_sorted_chunk
352                        .chunk
353                        .is_timeline_sorted(query.timeline())
354                );
355
356                let chunk = &cached_sorted_chunk.chunk;
357
358                chunk.range(query, component)
359            })
360            .filter(|chunk| !chunk.is_empty())
361            .collect();
362
363        (chunks, results.missing_virtual)
364    }
365
366    #[inline]
367    pub fn handle_pending_invalidation(&mut self) {
368        let Self {
369            cache_key: _,
370            chunks,
371            pending_invalidations,
372        } = self;
373
374        chunks.retain(|chunk_id, _chunk| !pending_invalidations.contains(chunk_id));
375
376        pending_invalidations.clear();
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use std::sync::Arc;
383
384    use re_chunk::{Chunk, ChunkId, RowId};
385    use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
386    use re_log_types::example_components::{MyPoint, MyPoints};
387    use re_log_types::external::re_tuid::Tuid;
388    use re_log_types::{EntityPath, TimePoint, Timeline};
389
390    use super::*;
391
392    // Make sure queries yield partial results when we expect them to.
393    #[test]
394    #[expect(clippy::bool_assert_comparison)] // I like it that way, sue me
395    fn partial_data_basics() {
396        let store = ChunkStore::new(
397            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
398            ChunkStoreConfig::ALL_DISABLED,
399        );
400        let store = ChunkStoreHandle::new(store);
401
402        let entity_path: EntityPath = "some_entity".into();
403
404        let timeline_frame = Timeline::new_sequence("frame");
405        let timepoint1 = TimePoint::from_iter([(timeline_frame, 1)]);
406        let point1 = MyPoint::new(1.0, 1.0);
407
408        let mut next_chunk_id = next_chunk_id_generator(0x1337);
409
410        // Overlapped chunks!
411        let chunk1 = create_chunk_with_point(
412            next_chunk_id(),
413            entity_path.clone(),
414            timepoint1.clone(),
415            point1,
416        );
417        let chunk2 = chunk1.clone_as(next_chunk_id(), RowId::new());
418        let chunk3 = chunk2.clone_as(next_chunk_id(), RowId::new());
419
420        let cache = QueryCache::new(store.clone());
421
422        let component = MyPoints::descriptor_points().component;
423        let query = RangeQuery::new(*timeline_frame.name(), AbsoluteTimeRange::new(0, 3));
424
425        // We haven't inserted anything yet, so we just expect empty results across the board.
426        {
427            let results = cache.range(&query, &entity_path, [component]);
428            assert!(results.is_empty());
429        }
430
431        // Reminder: the store events are irrelevant here, since the range cache still always unconditionally
432        // performs the underlying query regardless (only the sorting/slicing is cached).
433        store
434            .write()
435            .insert_chunk(&Arc::new(chunk1.clone()))
436            .unwrap();
437        store
438            .write()
439            .insert_chunk(&Arc::new(chunk2.clone()))
440            .unwrap();
441        store
442            .write()
443            .insert_chunk(&Arc::new(chunk3.clone()))
444            .unwrap();
445
446        // Now we've inserted everything, so we expect complete results across the board.
447        {
448            let results = cache.range(&query, &entity_path, [component]);
449            let expected = {
450                let mut results = RangeResults::new(query.clone());
451                results.add(
452                    component,
453                    vec![chunk1.clone(), chunk2.clone(), chunk3.clone()],
454                );
455                results
456            };
457            assert_eq!(false, results.is_partial());
458            assert_eq!(expected, results);
459        }
460
461        // Reminder: the store events are irrelevant here, since the range cache still always unconditionally
462        // performs the underlying query regardless (only the sorting/slicing is cached).
463        store.write().remove_chunks_shallow(
464            vec![Arc::new(chunk1.clone()), Arc::new(chunk3.clone())],
465            None,
466        );
467
468        // We've removed the first and last chunks from the store: results should now be partial.
469        {
470            let results = cache.range(&query, &entity_path, [component]);
471            let expected = {
472                let mut results = RangeResults::new(query.clone());
473                results.add(component, vec![chunk2.clone()]);
474                results.missing_virtual = vec![chunk1.id(), chunk3.id()];
475                results
476            };
477            assert_eq!(true, results.is_partial());
478            assert_eq!(expected, results);
479        }
480
481        // Reminder: the store events are irrelevant here, since the range cache still always unconditionally
482        // performs the underlying query regardless (only the sorting/slicing is cached).
483        store
484            .write()
485            .remove_chunks_shallow(vec![Arc::new(chunk2.clone())], None);
486
487        // Now we've removed absolutely everything: we should only get partial results.
488        {
489            let results = cache.range(&query, &entity_path, [component]);
490            let expected = {
491                let mut results = RangeResults::new(query.clone());
492                results.missing_virtual = vec![chunk1.id(), chunk2.id(), chunk3.id()];
493                results
494            };
495            assert_eq!(true, results.is_partial());
496            assert_eq!(expected, results);
497        }
498
499        // Reminder: the store events are irrelevant here, since the range cache still always unconditionally
500        // performs the underlying query regardless (only the sorting/slicing is cached).
501        store
502            .write()
503            .insert_chunk(&Arc::new(chunk1.clone()))
504            .unwrap();
505        store
506            .write()
507            .insert_chunk(&Arc::new(chunk2.clone()))
508            .unwrap();
509        store
510            .write()
511            .insert_chunk(&Arc::new(chunk3.clone()))
512            .unwrap();
513
514        // We've inserted everything back: all results should be complete once again.
515        {
516            let results = cache.range(&query, &entity_path, [component]);
517            let expected = {
518                let mut results = RangeResults::new(query.clone());
519                results.add(
520                    component,
521                    vec![chunk1.clone(), chunk2.clone(), chunk3.clone()],
522                );
523                results
524            };
525            assert_eq!(false, results.is_partial());
526            assert_eq!(expected, results);
527        }
528    }
529
530    fn next_chunk_id_generator(prefix: u64) -> impl FnMut() -> re_chunk::ChunkId {
531        let mut chunk_id = re_chunk::ChunkId::from_tuid(Tuid::from_nanos_and_inc(prefix, 0));
532        move || {
533            chunk_id = chunk_id.next();
534            chunk_id
535        }
536    }
537
538    fn create_chunk_with_point(
539        chunk_id: ChunkId,
540        entity_path: EntityPath,
541        timepoint: TimePoint,
542        point: MyPoint,
543    ) -> Chunk {
544        Chunk::builder_with_id(chunk_id, entity_path)
545            .with_component_batch(
546                RowId::new(),
547                timepoint,
548                (MyPoints::descriptor_points(), &[point]),
549            )
550            .build()
551            .unwrap()
552    }
553}