1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use ahash::HashMap;
5use nohash_hasher::IntMap;
6use parking_lot::RwLock;
7use re_byte_size::SizeBytes;
8use re_chunk::{Chunk, ChunkId, ComponentIdentifier};
9use re_chunk_store::{ChunkStore, ChunkTrackingMode, RangeQuery, TimeInt};
10use re_log_types::{AbsoluteTimeRange, EntityPath};
11
12use crate::{QueryCache, QueryCacheKey, QueryError};
13
14impl QueryCache {
17 pub fn range(
23 &self,
24 query: &RangeQuery,
25 entity_path: &EntityPath,
26 components: impl IntoIterator<Item = ComponentIdentifier>,
27 ) -> RangeResults {
28 re_tracing::profile_function!(entity_path.to_string());
29
30 let store = self.store.read();
31
32 let mut results = RangeResults::new(query.clone());
33
34 let components = components.into_iter().filter(|component_identifier| {
38 store.entity_has_component_on_timeline(
39 query.timeline(),
40 entity_path,
41 *component_identifier,
42 )
43 });
44
45 for component in components {
46 let key = QueryCacheKey::new(entity_path.clone(), *query.timeline(), component);
47
48 let cache = Arc::clone(
49 self.range_per_cache_key
50 .write()
51 .entry(key.clone())
52 .or_insert_with(|| Arc::new(RwLock::new(RangeCache::new(key)))),
53 );
54
55 let mut cache = cache.write();
56
57 cache.handle_pending_invalidation();
58
59 let (cached, missing) = cache.range(&store, query, entity_path, component);
60 results.missing_virtual.extend(missing);
61 if !cached.is_empty() {
62 results.add(component, cached);
63 }
64 }
65
66 results
67 }
68}
69
70#[derive(Debug, PartialEq)]
83pub struct RangeResults {
84 pub query: RangeQuery,
86
87 pub missing_virtual: Vec<ChunkId>,
99
100 pub components: IntMap<ComponentIdentifier, Vec<Chunk>>,
102}
103
104impl RangeResults {
105 pub fn is_partial(&self) -> bool {
114 !self.missing_virtual.is_empty()
115 }
116
117 pub fn is_empty(&self) -> bool {
121 let Self {
122 query: _,
123 missing_virtual,
124 components,
125 } = self;
126 missing_virtual.is_empty() && components.values().all(|chunks| chunks.is_empty())
127 }
128
129 #[inline]
131 pub fn get(&self, component: ComponentIdentifier) -> Option<&[Chunk]> {
132 self.components
133 .get(&component)
134 .map(|chunks| chunks.as_slice())
135 }
136
137 #[inline]
141 pub fn get_required(&self, component: ComponentIdentifier) -> crate::Result<&[Chunk]> {
142 self.components.get(&component).map_or_else(
143 || Err(QueryError::PrimaryNotFound(component)),
144 |chunks| Ok(chunks.as_slice()),
145 )
146 }
147}
148
149impl RangeResults {
150 #[inline]
151 fn new(query: RangeQuery) -> Self {
152 Self {
153 query,
154 missing_virtual: Default::default(),
155 components: Default::default(),
156 }
157 }
158
159 #[inline]
160 fn add(&mut self, component: ComponentIdentifier, chunks: Vec<Chunk>) {
161 self.components.insert(component, chunks);
162 }
163}
164
165pub struct RangeCache {
169 pub cache_key: QueryCacheKey,
171
172 pub chunks: HashMap<ChunkId, RangeCachedChunk>,
176
177 pub pending_invalidations: BTreeSet<ChunkId>,
184}
185
186impl RangeCache {
187 #[inline]
188 pub fn new(cache_key: QueryCacheKey) -> Self {
189 Self {
190 cache_key,
191 chunks: HashMap::default(),
192 pending_invalidations: BTreeSet::default(),
193 }
194 }
195
196 #[inline]
200 pub fn time_range(&self) -> AbsoluteTimeRange {
201 self.chunks
202 .values()
203 .filter_map(|cached| {
204 cached
205 .chunk
206 .timelines()
207 .get(&self.cache_key.timeline_name)
208 .map(|time_column| time_column.time_range())
209 })
210 .fold(AbsoluteTimeRange::EMPTY, |mut acc, time_range| {
211 acc.set_min(TimeInt::min(acc.min(), time_range.min()));
212 acc.set_max(TimeInt::max(acc.max(), time_range.max()));
213 acc
214 })
215 }
216}
217
218impl std::fmt::Debug for RangeCache {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 let Self {
221 cache_key: _,
222 chunks,
223 pending_invalidations: _,
224 } = self;
225
226 let mut strings: Vec<String> = Vec::new();
227
228 strings.push(format!(
229 "{:?} ({})",
230 self.time_range(),
231 re_format::format_bytes(chunks.total_size_bytes() as _),
232 ));
233
234 if strings.is_empty() {
235 return f.write_str("<empty>");
236 }
237
238 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
239 }
240}
241
242pub struct RangeCachedChunk {
243 pub chunk: Chunk,
244
245 pub reallocated: bool,
252}
253
254impl SizeBytes for RangeCachedChunk {
255 #[inline]
256 fn heap_size_bytes(&self) -> u64 {
257 let Self { chunk, reallocated } = self;
258
259 if *reallocated {
260 Chunk::heap_size_bytes(chunk)
263 } else {
264 0
267 }
268 }
269}
270
271impl SizeBytes for RangeCache {
272 #[inline]
273 fn heap_size_bytes(&self) -> u64 {
274 let Self {
275 cache_key,
276 chunks,
277 pending_invalidations,
278 } = self;
279
280 cache_key.heap_size_bytes()
281 + chunks.heap_size_bytes()
282 + pending_invalidations.heap_size_bytes()
283 }
284}
285
286impl RangeCache {
287 fn range(
296 &mut self,
297 store: &ChunkStore,
298 query: &RangeQuery,
299 entity_path: &EntityPath,
300 component: ComponentIdentifier,
301 ) -> (Vec<Chunk>, Vec<ChunkId>) {
302 re_tracing::profile_scope!("range", format!("{query:?}"));
303
304 re_log::debug_assert_eq!(query.timeline(), &self.cache_key.timeline_name);
305
306 let results =
315 store.range_relevant_chunks(ChunkTrackingMode::Report, query, entity_path, component);
316 for raw_chunk in &results.chunks {
320 self.chunks.entry(raw_chunk.id()).or_insert_with(|| {
321 let (chunk, densified) = raw_chunk.densified(component);
324
325 let chunk = chunk.sorted_by_timeline_if_unsorted(&self.cache_key.timeline_name);
329
330 let reallocated =
331 densified || !raw_chunk.is_timeline_sorted(&self.cache_key.timeline_name);
332
333 RangeCachedChunk { chunk, reallocated }
334 });
335 }
336
337 let chunks = results
346 .chunks
347 .into_iter()
348 .filter_map(|raw_chunk| self.chunks.get(&raw_chunk.id()))
349 .map(|cached_sorted_chunk| {
350 re_log::debug_assert!(
351 cached_sorted_chunk
352 .chunk
353 .is_timeline_sorted(query.timeline())
354 );
355
356 let chunk = &cached_sorted_chunk.chunk;
357
358 chunk.range(query, component)
359 })
360 .filter(|chunk| !chunk.is_empty())
361 .collect();
362
363 (chunks, results.missing_virtual)
364 }
365
366 #[inline]
367 pub fn handle_pending_invalidation(&mut self) {
368 let Self {
369 cache_key: _,
370 chunks,
371 pending_invalidations,
372 } = self;
373
374 chunks.retain(|chunk_id, _chunk| !pending_invalidations.contains(chunk_id));
375
376 pending_invalidations.clear();
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use std::sync::Arc;
383
384 use re_chunk::{Chunk, ChunkId, RowId};
385 use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
386 use re_log_types::example_components::{MyPoint, MyPoints};
387 use re_log_types::external::re_tuid::Tuid;
388 use re_log_types::{EntityPath, TimePoint, Timeline};
389
390 use super::*;
391
392 #[test]
394 #[expect(clippy::bool_assert_comparison)] fn partial_data_basics() {
396 let store = ChunkStore::new(
397 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
398 ChunkStoreConfig::ALL_DISABLED,
399 );
400 let store = ChunkStoreHandle::new(store);
401
402 let entity_path: EntityPath = "some_entity".into();
403
404 let timeline_frame = Timeline::new_sequence("frame");
405 let timepoint1 = TimePoint::from_iter([(timeline_frame, 1)]);
406 let point1 = MyPoint::new(1.0, 1.0);
407
408 let mut next_chunk_id = next_chunk_id_generator(0x1337);
409
410 let chunk1 = create_chunk_with_point(
412 next_chunk_id(),
413 entity_path.clone(),
414 timepoint1.clone(),
415 point1,
416 );
417 let chunk2 = chunk1.clone_as(next_chunk_id(), RowId::new());
418 let chunk3 = chunk2.clone_as(next_chunk_id(), RowId::new());
419
420 let cache = QueryCache::new(store.clone());
421
422 let component = MyPoints::descriptor_points().component;
423 let query = RangeQuery::new(*timeline_frame.name(), AbsoluteTimeRange::new(0, 3));
424
425 {
427 let results = cache.range(&query, &entity_path, [component]);
428 assert!(results.is_empty());
429 }
430
431 store
434 .write()
435 .insert_chunk(&Arc::new(chunk1.clone()))
436 .unwrap();
437 store
438 .write()
439 .insert_chunk(&Arc::new(chunk2.clone()))
440 .unwrap();
441 store
442 .write()
443 .insert_chunk(&Arc::new(chunk3.clone()))
444 .unwrap();
445
446 {
448 let results = cache.range(&query, &entity_path, [component]);
449 let expected = {
450 let mut results = RangeResults::new(query.clone());
451 results.add(
452 component,
453 vec![chunk1.clone(), chunk2.clone(), chunk3.clone()],
454 );
455 results
456 };
457 assert_eq!(false, results.is_partial());
458 assert_eq!(expected, results);
459 }
460
461 store.write().remove_chunks_shallow(
464 vec![Arc::new(chunk1.clone()), Arc::new(chunk3.clone())],
465 None,
466 );
467
468 {
470 let results = cache.range(&query, &entity_path, [component]);
471 let expected = {
472 let mut results = RangeResults::new(query.clone());
473 results.add(component, vec![chunk2.clone()]);
474 results.missing_virtual = vec![chunk1.id(), chunk3.id()];
475 results
476 };
477 assert_eq!(true, results.is_partial());
478 assert_eq!(expected, results);
479 }
480
481 store
484 .write()
485 .remove_chunks_shallow(vec![Arc::new(chunk2.clone())], None);
486
487 {
489 let results = cache.range(&query, &entity_path, [component]);
490 let expected = {
491 let mut results = RangeResults::new(query.clone());
492 results.missing_virtual = vec![chunk1.id(), chunk2.id(), chunk3.id()];
493 results
494 };
495 assert_eq!(true, results.is_partial());
496 assert_eq!(expected, results);
497 }
498
499 store
502 .write()
503 .insert_chunk(&Arc::new(chunk1.clone()))
504 .unwrap();
505 store
506 .write()
507 .insert_chunk(&Arc::new(chunk2.clone()))
508 .unwrap();
509 store
510 .write()
511 .insert_chunk(&Arc::new(chunk3.clone()))
512 .unwrap();
513
514 {
516 let results = cache.range(&query, &entity_path, [component]);
517 let expected = {
518 let mut results = RangeResults::new(query.clone());
519 results.add(
520 component,
521 vec![chunk1.clone(), chunk2.clone(), chunk3.clone()],
522 );
523 results
524 };
525 assert_eq!(false, results.is_partial());
526 assert_eq!(expected, results);
527 }
528 }
529
530 fn next_chunk_id_generator(prefix: u64) -> impl FnMut() -> re_chunk::ChunkId {
531 let mut chunk_id = re_chunk::ChunkId::from_tuid(Tuid::from_nanos_and_inc(prefix, 0));
532 move || {
533 chunk_id = chunk_id.next();
534 chunk_id
535 }
536 }
537
538 fn create_chunk_with_point(
539 chunk_id: ChunkId,
540 entity_path: EntityPath,
541 timepoint: TimePoint,
542 point: MyPoint,
543 ) -> Chunk {
544 Chunk::builder_with_id(chunk_id, entity_path)
545 .with_component_batch(
546 RowId::new(),
547 timepoint,
548 (MyPoints::descriptor_points(), &[point]),
549 )
550 .build()
551 .unwrap()
552 }
553}