1use std::{collections::BTreeSet, sync::Arc};
2
3use ahash::HashMap;
4use nohash_hasher::IntMap;
5use parking_lot::RwLock;
6
7use re_byte_size::SizeBytes;
8use re_chunk::{Chunk, ChunkId, ComponentIdentifier};
9use re_chunk_store::{ChunkStore, RangeQuery, TimeInt};
10use re_log_types::{AbsoluteTimeRange, EntityPath};
11
12use crate::{QueryCache, QueryCacheKey, QueryError};
13
14impl QueryCache {
17 pub fn range(
23 &self,
24 query: &RangeQuery,
25 entity_path: &EntityPath,
26 components: impl IntoIterator<Item = ComponentIdentifier>,
27 ) -> RangeResults {
28 re_tracing::profile_function!(entity_path.to_string());
29
30 let store = self.store.read();
31
32 let mut results = RangeResults::new(query.clone());
33
34 let components = components.into_iter().filter(|component_identifier| {
38 store.entity_has_component_on_timeline(
39 query.timeline(),
40 entity_path,
41 *component_identifier,
42 )
43 });
44
45 for component in components {
46 let key = QueryCacheKey::new(entity_path.clone(), *query.timeline(), component);
47
48 let cache = Arc::clone(
49 self.range_per_cache_key
50 .write()
51 .entry(key.clone())
52 .or_insert_with(|| Arc::new(RwLock::new(RangeCache::new(key)))),
53 );
54
55 let mut cache = cache.write();
56
57 cache.handle_pending_invalidation();
58
59 let cached = cache.range(&store, query, entity_path, component);
60 if !cached.is_empty() {
61 results.add(component, cached);
62 }
63 }
64
65 results
66 }
67}
68
69#[derive(Debug)]
78pub struct RangeResults {
79 pub query: RangeQuery,
81
82 pub components: IntMap<ComponentIdentifier, Vec<Chunk>>,
84}
85
86impl RangeResults {
87 #[inline]
88 pub fn new(query: RangeQuery) -> Self {
89 Self {
90 query,
91 components: Default::default(),
92 }
93 }
94
95 #[inline]
97 pub fn get(&self, component: ComponentIdentifier) -> Option<&[Chunk]> {
98 self.components
99 .get(&component)
100 .map(|chunks| chunks.as_slice())
101 }
102
103 #[inline]
107 pub fn get_required(&self, component: ComponentIdentifier) -> crate::Result<&[Chunk]> {
108 self.components.get(&component).map_or_else(
109 || Err(QueryError::PrimaryNotFound(component)),
110 |chunks| Ok(chunks.as_slice()),
111 )
112 }
113}
114
115impl RangeResults {
116 #[doc(hidden)]
117 #[inline]
118 pub fn add(&mut self, component: ComponentIdentifier, chunks: Vec<Chunk>) {
119 self.components.insert(component, chunks);
120 }
121}
122
123pub struct RangeCache {
127 pub cache_key: QueryCacheKey,
129
130 pub chunks: HashMap<ChunkId, RangeCachedChunk>,
134
135 pub pending_invalidations: BTreeSet<ChunkId>,
142}
143
144impl RangeCache {
145 #[inline]
146 pub fn new(cache_key: QueryCacheKey) -> Self {
147 Self {
148 cache_key,
149 chunks: HashMap::default(),
150 pending_invalidations: BTreeSet::default(),
151 }
152 }
153
154 #[inline]
158 pub fn time_range(&self) -> AbsoluteTimeRange {
159 self.chunks
160 .values()
161 .filter_map(|cached| {
162 cached
163 .chunk
164 .timelines()
165 .get(&self.cache_key.timeline_name)
166 .map(|time_column| time_column.time_range())
167 })
168 .fold(AbsoluteTimeRange::EMPTY, |mut acc, time_range| {
169 acc.set_min(TimeInt::min(acc.min(), time_range.min()));
170 acc.set_max(TimeInt::max(acc.max(), time_range.max()));
171 acc
172 })
173 }
174}
175
176impl std::fmt::Debug for RangeCache {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 let Self {
179 cache_key: _,
180 chunks,
181 pending_invalidations: _,
182 } = self;
183
184 let mut strings: Vec<String> = Vec::new();
185
186 strings.push(format!(
187 "{:?} ({})",
188 self.time_range(),
189 re_format::format_bytes(chunks.total_size_bytes() as _),
190 ));
191
192 if strings.is_empty() {
193 return f.write_str("<empty>");
194 }
195
196 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
197 }
198}
199
200pub struct RangeCachedChunk {
201 pub chunk: Chunk,
202
203 pub resorted: bool,
210}
211
212impl SizeBytes for RangeCachedChunk {
213 #[inline]
214 fn heap_size_bytes(&self) -> u64 {
215 let Self { chunk, resorted } = self;
216
217 if *resorted {
218 Chunk::heap_size_bytes(chunk)
221 } else {
222 0
225 }
226 }
227}
228
229impl SizeBytes for RangeCache {
230 #[inline]
231 fn heap_size_bytes(&self) -> u64 {
232 let Self {
233 cache_key,
234 chunks,
235 pending_invalidations,
236 } = self;
237
238 cache_key.heap_size_bytes()
239 + chunks.heap_size_bytes()
240 + pending_invalidations.heap_size_bytes()
241 }
242}
243
244impl RangeCache {
245 pub fn range(
247 &mut self,
248 store: &ChunkStore,
249 query: &RangeQuery,
250 entity_path: &EntityPath,
251 component: ComponentIdentifier,
252 ) -> Vec<Chunk> {
253 re_tracing::profile_scope!("range", format!("{query:?}"));
254
255 debug_assert_eq!(query.timeline(), &self.cache_key.timeline_name);
256
257 let raw_chunks = store.range_relevant_chunks(query, entity_path, component);
266 for raw_chunk in &raw_chunks {
267 self.chunks
268 .entry(raw_chunk.id())
269 .or_insert_with(|| RangeCachedChunk {
270 chunk: raw_chunk
272 .densified(component)
275 .sorted_by_timeline_if_unsorted(&self.cache_key.timeline_name),
277 resorted: !raw_chunk.is_timeline_sorted(&self.cache_key.timeline_name),
278 });
279 }
280
281 raw_chunks
287 .into_iter()
288 .filter_map(|raw_chunk| self.chunks.get(&raw_chunk.id()))
289 .map(|cached_sorted_chunk| {
290 debug_assert!(
291 cached_sorted_chunk
292 .chunk
293 .is_timeline_sorted(query.timeline())
294 );
295
296 let chunk = &cached_sorted_chunk.chunk;
297
298 chunk.range(query, component)
299 })
300 .filter(|chunk| !chunk.is_empty())
301 .collect()
302 }
303
304 #[inline]
305 pub fn handle_pending_invalidation(&mut self) {
306 let Self {
307 cache_key: _,
308 chunks,
309 pending_invalidations,
310 } = self;
311
312 chunks.retain(|chunk_id, _chunk| !pending_invalidations.contains(chunk_id));
313
314 pending_invalidations.clear();
315 }
316}