Skip to main content

re_dataframe/
query.rs

1use std::collections::BTreeSet;
2use std::sync::OnceLock;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use arrow::array::{
6    ArrayRef as ArrowArrayRef, BooleanArray as ArrowBooleanArray,
7    PrimitiveArray as ArrowPrimitiveArray, RecordBatch as ArrowRecordBatch, RecordBatchOptions,
8};
9use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
10use arrow::datatypes::{
11    DataType as ArrowDataType, Fields as ArrowFields, Schema as ArrowSchema,
12    SchemaRef as ArrowSchemaRef,
13};
14use itertools::{Either, Itertools as _};
15use nohash_hasher::{IntMap, IntSet};
16use re_arrow_util::{ArrowArrayDowncastRef as _, into_arrow_ref};
17use re_chunk::external::arrow::array::ArrayRef;
18use re_chunk::{
19    Chunk, ComponentIdentifier, EntityPath, RangeQuery, RowId, TimeInt, TimelineName,
20    UnitChunkShared,
21};
22use re_chunk_store::{
23    ChunkStore, ColumnDescriptor, ComponentColumnDescriptor, Index, IndexColumnDescriptor,
24    IndexValue, QueryExpression, SparseFillStrategy,
25};
26use re_log::{debug_assert, debug_assert_eq, debug_panic};
27use re_log_types::AbsoluteTimeRange;
28use re_query::{QueryCache, StorageEngineLike};
29use re_sorbet::{
30    ChunkColumnDescriptors, ColumnSelector, RowIdColumnDescriptor, TimeColumnSelector,
31};
32use re_types_core::arrow_helpers::as_array_ref;
33use re_types_core::{Loggable as _, SerializedComponentColumn, archetypes};
34
35// ---
36
37// TODO(cmc): (no specific order) (should we make issues for these?)
38// * [x] basic thing working
39// * [x] custom selection
40// * [x] support for overlaps (slow)
41// * [x] pagination (any solution, even a slow one)
42// * [x] pov support
43// * [x] latestat sparse-filling
44// * [x] sampling support
45// * [x] clears
46// * [x] pagination (fast)
47// * [x] take kernel duplicates all memory
48// * [x] dedupe-latest without allocs/copies
49// * [ ] allocate null arrays once
50// * [ ] overlaps (less dumb)
51// * [ ] selector-based `filtered_index`
52// * [ ] configurable cache bypass
53
54/// A handle to a dataframe query, ready to be executed.
55///
56/// Cheaply created via `QueryEngine::query`.
57///
58/// See [`QueryHandle::next_row`] or [`QueryHandle::into_iter`].
59pub struct QueryHandle<E: StorageEngineLike> {
60    /// Handle to the `QueryEngine`.
61    pub(crate) engine: E,
62
63    /// The original query expression used to instantiate this handle.
64    pub(crate) query: QueryExpression,
65
66    /// Internal private state. Lazily computed.
67    ///
68    /// It is important that handles stay cheap to create.
69    state: OnceLock<QueryHandleState>,
70}
71
72/// Internal private state. Lazily computed.
73struct QueryHandleState {
74    /// Describes the columns that make up this view.
75    ///
76    /// See [`QueryExpression::view_contents`].
77    view_contents: ChunkColumnDescriptors,
78
79    /// Describes the columns specifically selected to be returned from this view.
80    ///
81    /// All returned rows will have an Arrow schema that matches this selection.
82    ///
83    /// Columns that do not yield any data will still be present in the results, filled with null values.
84    ///
85    /// The extra `usize` is the index in [`QueryHandleState::view_contents`] that this selection
86    /// points to.
87    ///
88    /// See also [`QueryHandleState::arrow_schema`].
89    selected_contents: Vec<(usize, ColumnDescriptor)>,
90
91    /// This keeps track of the static data associated with each entry in `selected_contents`, if any.
92    ///
93    /// This is queried only once during init, and will override all cells that follow.
94    ///
95    /// `selected_contents`: [`QueryHandleState::selected_contents`]
96    selected_static_values: Vec<Option<UnitChunkShared>>,
97
98    /// The actual index filter in use, since the user-specified one is optional.
99    ///
100    /// This just defaults to `Index::default()` if the user hasn't specified any: the actual
101    /// value is irrelevant since this means we are only concerned with static data anyway.
102    filtered_index: Index,
103
104    /// The Arrow schema that corresponds to the `selected_contents`.
105    ///
106    /// All returned rows will have this schema.
107    arrow_schema: ArrowSchemaRef,
108
109    /// All the [`Chunk`]s included in the view contents.
110    ///
111    /// These are already sorted, densified, vertically sliced, and [latest-deduped] according
112    /// to the query.
113    ///
114    /// The atomic counter is used as a cursor which keeps track of our current position within
115    /// each individual chunk.
116    /// Because chunks are allowed to overlap, we might need to rebound between two or more chunks
117    /// during our iteration.
118    ///
119    /// This vector's entries correspond to those in [`QueryHandleState::view_contents`].
120    /// Note: time and column entries don't have chunks -- inner vectors will be empty.
121    ///
122    /// [latest-deduped]: [`Chunk::deduped_latest_on_index`]
123    //
124    // NOTE: Reminder: we have to query everything in the _view_, irrelevant of the current selection.
125    view_chunks: Vec<Vec<(AtomicU64, Chunk)>>,
126
127    /// Tracks the current row index: the position of the iterator. For [`QueryHandle::next_row`].
128    ///
129    /// This represents the number of rows that the caller has iterated on: it is completely
130    /// unrelated to the cursors used to track the current position in each individual chunk.
131    ///
132    /// The corresponding index value can be obtained using `unique_index_values[cur_row]`.
133    ///
134    /// `unique_index_values[cur_row]`: [`QueryHandleState::unique_index_values`]
135    cur_row: AtomicU64,
136
137    /// All unique index values that can possibly be returned by this query.
138    ///
139    /// Guaranteed ascendingly sorted and deduped.
140    ///
141    /// See also [`QueryHandleState::cur_row`].
142    unique_index_values: Vec<IndexValue>,
143}
144
145impl<E: StorageEngineLike> QueryHandle<E> {
146    pub(crate) fn new(engine: E, query: QueryExpression) -> Self {
147        Self {
148            engine,
149            query,
150            state: Default::default(),
151        }
152    }
153}
154
155impl<E: StorageEngineLike> QueryHandle<E> {
156    /// Lazily initialize internal private state.
157    ///
158    /// It is important that query handles stay cheap to create.
159    #[tracing::instrument(level = "debug", skip_all)]
160    fn init(&self) -> &QueryHandleState {
161        self.engine
162            .with(|store, cache| self.state.get_or_init(|| self.init_(store, cache)))
163    }
164
165    // NOTE: This is split in its own method otherwise it completely breaks `rustfmt`.
166    fn init_(&self, store: &ChunkStore, cache: &QueryCache) -> QueryHandleState {
167        re_tracing::profile_scope!("init");
168
169        // The timeline doesn't matter if we're running in static-only mode.
170        let filtered_index = self
171            .query
172            .filtered_index
173            .unwrap_or_else(|| TimelineName::new(""));
174
175        // 1. Compute the schema for the query.
176        let view_contents_schema = store.schema_for_query(&self.query);
177        let view_contents = view_contents_schema.indices_and_components();
178
179        // 2. Compute the schema of the selected contents.
180        //
181        // The caller might have selected columns that do not exist in the view: they should
182        // still appear in the results.
183        let selected_contents: Vec<(_, _)> = if let Some(selection) = self.query.selection.as_ref()
184        {
185            self.compute_user_selection(&view_contents, selection)
186        } else {
187            view_contents.clone().into_iter().enumerate().collect()
188        };
189
190        // 3. Compute the Arrow schema of the selected components.
191        //
192        // Every result returned using this `QueryHandle` will match this schema exactly.
193        let arrow_schema = ArrowSchemaRef::from(ArrowSchema::new_with_metadata(
194            selected_contents
195                .iter()
196                .map(|(_, descr)| descr.to_arrow_field(re_sorbet::BatchType::Dataframe))
197                .collect::<ArrowFields>(),
198            Default::default(),
199        ));
200
201        // 4. Perform the query and keep track of all the relevant chunks.
202        let query = {
203            let index_range = if self.query.filtered_index.is_none() {
204                AbsoluteTimeRange::EMPTY // static-only
205            } else if let Some(using_index_values) = self.query.using_index_values.as_ref() {
206                using_index_values
207                    .first()
208                    .and_then(|start| using_index_values.last().map(|end| (start, end)))
209                    .map_or(AbsoluteTimeRange::EMPTY, |(start, end)| {
210                        AbsoluteTimeRange::new(*start, *end)
211                    })
212            } else {
213                self.query
214                    .filtered_index_range
215                    .unwrap_or(AbsoluteTimeRange::EVERYTHING)
216            };
217
218            RangeQuery::new(filtered_index, index_range)
219                .keep_extra_timelines(true) // we want all the timelines we can get!
220                .keep_extra_components(false)
221        };
222        let (view_pov_chunks_idx, mut view_chunks) =
223            self.fetch_view_chunks(store, cache, &query, &view_contents);
224
225        // 5. Collect all relevant clear chunks and update the view accordingly.
226        //
227        // We'll turn the clears into actual empty arrays of the expected component type.
228        {
229            re_tracing::profile_scope!("clear_chunks");
230
231            let clear_chunks = self.fetch_clear_chunks(store, cache, &query, &view_contents);
232            for (view_idx, chunks) in view_chunks.iter_mut().enumerate() {
233                let Some(ColumnDescriptor::Component(descr)) = view_contents.get(view_idx) else {
234                    continue;
235                };
236
237                descr.sanity_check();
238
239                // NOTE: It would be tempting to concatenate all these individual clear chunks into one
240                // single big chunk, but that'd be a mistake: 1) it's costly to do so but more
241                // importantly 2) that would lead to likely very large chunk overlap, which is very bad
242                // for business.
243                if let Some(clear_chunks) = clear_chunks.get(&descr.entity_path) {
244                    chunks.extend(clear_chunks.iter().map(|chunk| {
245                        let child_datatype = match &descr.store_datatype {
246                            ArrowDataType::List(field) | ArrowDataType::LargeList(field) => {
247                                field.data_type().clone()
248                            }
249                            ArrowDataType::Dictionary(_, datatype) => (**datatype).clone(),
250                            datatype => datatype.clone(),
251                        };
252
253                        let mut chunk = chunk.clone();
254                        // Only way this could fail is if the number of rows did not match.
255                        #[expect(clippy::unwrap_used)]
256                        chunk
257                            .add_component(SerializedComponentColumn::new(
258                                re_arrow_util::new_list_array_of_empties(
259                                    &child_datatype,
260                                    chunk.num_rows(),
261                                ),
262                                re_types_core::ComponentDescriptor {
263                                    component_type: descr.component_type,
264                                    archetype: descr.archetype,
265                                    component: descr.component,
266                                },
267                            ))
268                            .unwrap();
269
270                        (AtomicU64::new(0), chunk)
271                    }));
272
273                    // The chunks were sorted that way before, and it needs to stay that way after.
274                    chunks.sort_by_key(|(_cursor, chunk)| {
275                        // NOTE: The chunk has been densified already: its global time range is the same as
276                        // the time range for the specific component of interest.
277                        chunk
278                            .timelines()
279                            .get(&filtered_index)
280                            .map(|time_column| time_column.time_range())
281                            .map_or(TimeInt::STATIC, |time_range| time_range.min())
282                    });
283                }
284            }
285        }
286
287        // 6. Collect all unique index values.
288        //
289        // Used to achieve ~O(log(n)) pagination.
290        let unique_index_values = if self.query.filtered_index.is_none() {
291            vec![TimeInt::STATIC]
292        } else if let Some(using_index_values) = self.query.using_index_values.as_ref() {
293            using_index_values
294                .iter()
295                .filter(|index_value| !index_value.is_static())
296                .copied()
297                .collect_vec()
298        } else {
299            re_tracing::profile_scope!("index_values");
300
301            let mut view_chunks = view_chunks.iter();
302            let view_chunks = if let Some(view_pov_chunks_idx) = view_pov_chunks_idx {
303                Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter())
304            } else {
305                Either::Right(view_chunks)
306            };
307
308            let mut all_unique_index_values: BTreeSet<TimeInt> = view_chunks
309                .flat_map(|chunks| {
310                    chunks.iter().filter_map(|(_cursor, chunk)| {
311                        chunk
312                            .timelines()
313                            .get(&filtered_index)
314                            .map(|time_column| time_column.times())
315                    })
316                })
317                .flatten()
318                .collect();
319
320            if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() {
321                all_unique_index_values.retain(|time| filtered_index_values.contains(time));
322            }
323
324            all_unique_index_values
325                .into_iter()
326                .filter(|index_value| !index_value.is_static())
327                .collect_vec()
328        };
329
330        let selected_static_values = {
331            re_tracing::profile_scope!("static_values");
332
333            selected_contents
334                .iter()
335                .map(|(_view_idx, descr)| match descr {
336                    ColumnDescriptor::RowId(_) | ColumnDescriptor::Time(_) => None,
337                    ColumnDescriptor::Component(descr) => {
338                        descr.sanity_check();
339
340                        let query =
341                            re_chunk::LatestAtQuery::new(TimelineName::new(""), TimeInt::STATIC);
342
343                        let results =
344                            cache.latest_at(&query, &descr.entity_path, [descr.component]);
345
346                        results.components.into_values().next()
347                    }
348                })
349                .collect_vec()
350        };
351
352        for (_, descr) in &selected_contents {
353            descr.sanity_check();
354        }
355
356        QueryHandleState {
357            view_contents: view_contents_schema,
358            selected_contents,
359            selected_static_values,
360            filtered_index,
361            arrow_schema,
362            view_chunks,
363            cur_row: AtomicU64::new(0),
364            unique_index_values,
365        }
366    }
367
368    #[tracing::instrument(level = "info", skip_all)]
369    #[expect(clippy::unused_self)]
370    fn compute_user_selection(
371        &self,
372        view_contents: &[ColumnDescriptor],
373        selection: &[ColumnSelector],
374    ) -> Vec<(usize, ColumnDescriptor)> {
375        selection
376            .iter()
377            .map(|column| match column {
378                ColumnSelector::RowId => view_contents
379                    .iter()
380                    .enumerate()
381                    .find_map(|(idx, view_column)| {
382                        if let ColumnDescriptor::RowId(descr) = view_column {
383                            Some((idx, ColumnDescriptor::RowId(descr.clone())))
384                        } else {
385                            None
386                        }
387                    })
388                    .unwrap_or_else(|| {
389                        (
390                            usize::MAX,
391                            ColumnDescriptor::RowId(RowIdColumnDescriptor::from_sorted(false)),
392                        )
393                    }),
394
395                ColumnSelector::Time(selected_column) => {
396                    let TimeColumnSelector {
397                        timeline: selected_timeline,
398                    } = selected_column;
399
400                    view_contents
401                        .iter()
402                        .enumerate()
403                        .filter_map(|(idx, view_column)| {
404                            if let ColumnDescriptor::Time(view_descr) = view_column {
405                                Some((idx, view_descr))
406                            } else {
407                                None
408                            }
409                        })
410                        .find(|(_idx, view_descr)| {
411                            *view_descr.timeline().name() == *selected_timeline
412                        })
413                        .map_or_else(
414                            || {
415                                (
416                                    usize::MAX,
417                                    ColumnDescriptor::Time(IndexColumnDescriptor::new_null(
418                                        *selected_timeline,
419                                    )),
420                                )
421                            },
422                            |(idx, view_descr)| (idx, ColumnDescriptor::Time(view_descr.clone())),
423                        )
424                }
425
426                ColumnSelector::Component(selected_column) => view_contents
427                    .iter()
428                    .enumerate()
429                    .filter_map(|(idx, view_column)| {
430                        if let ColumnDescriptor::Component(view_descr) = view_column {
431                            Some((idx, view_descr))
432                        } else {
433                            None
434                        }
435                    })
436                    .find(|(_idx, view_descr)| view_descr.matches(selected_column))
437                    .map_or_else(
438                        || {
439                            (
440                                usize::MAX,
441                                ColumnDescriptor::Component(ComponentColumnDescriptor {
442                                    entity_path: selected_column.entity_path.clone(),
443                                    archetype: None,
444                                    component: selected_column.component.as_str().into(),
445                                    component_type: None,
446                                    store_datatype: ArrowDataType::Null,
447                                    is_static: false,
448                                    is_tombstone: false,
449                                    is_semantically_empty: false,
450                                }),
451                            )
452                        },
453                        |(idx, view_descr)| (idx, ColumnDescriptor::Component(view_descr.clone())),
454                    ),
455            })
456            .collect_vec()
457    }
458
459    fn fetch_view_chunks(
460        &self,
461        store: &ChunkStore,
462        cache: &QueryCache,
463        query: &RangeQuery,
464        view_contents: &[ColumnDescriptor],
465    ) -> (Option<usize>, Vec<Vec<(AtomicU64, Chunk)>>) {
466        let mut view_pov_chunks_idx = self.query.filtered_is_not_null.as_ref().map(|_| usize::MAX);
467
468        let view_chunks = view_contents
469            .iter()
470            .enumerate()
471            .map(|(idx, selected_column)| match selected_column {
472                ColumnDescriptor::RowId(_) | ColumnDescriptor::Time(_) => Vec::new(),
473
474                ColumnDescriptor::Component(column) => {
475                    let chunks = self
476                        .fetch_chunks(store, cache, query, &column.entity_path, [column.component])
477                        .unwrap_or_default();
478
479                    if let Some(pov) = self.query.filtered_is_not_null.as_ref()
480                        && column.matches(pov)
481                    {
482                        view_pov_chunks_idx = Some(idx);
483                    }
484
485                    chunks
486                }
487            })
488            .collect();
489
490        (view_pov_chunks_idx, view_chunks)
491    }
492
493    /// Returns all potentially relevant clear [`Chunk`]s for each unique entity path in the view contents.
494    ///
495    /// These chunks take recursive clear semantics into account and are guaranteed to be properly densified.
496    /// The component data is stripped out, only the indices are left.
497    fn fetch_clear_chunks(
498        &self,
499        store: &ChunkStore,
500        cache: &QueryCache,
501        query: &RangeQuery,
502        view_contents: &[ColumnDescriptor],
503    ) -> IntMap<EntityPath, Vec<Chunk>> {
504        /// Returns all the ancestors of an [`EntityPath`].
505        ///
506        /// Doesn't return `entity_path` itself.
507        fn entity_path_ancestors(
508            entity_path: &EntityPath,
509        ) -> impl Iterator<Item = EntityPath> + use<> {
510            std::iter::from_fn({
511                let mut entity_path = entity_path.parent();
512                move || {
513                    let yielded = entity_path.clone()?;
514                    entity_path = yielded.parent();
515                    Some(yielded)
516                }
517            })
518        }
519
520        /// Given a [`Chunk`] containing a [`ClearIsRecursive`] column, returns a filtered version
521        /// of that chunk where only rows with `ClearIsRecursive=true` are left.
522        ///
523        /// Returns `None` if the chunk either doesn't contain a `ClearIsRecursive` column or if
524        /// the end result is an empty chunk.
525        fn chunk_filter_recursive_only(chunk: &Chunk) -> Option<Chunk> {
526            let list_array = chunk
527                .components()
528                .get_array(archetypes::Clear::descriptor_is_recursive().component)?;
529
530            let values = list_array
531                .values()
532                .downcast_array_ref::<ArrowBooleanArray>()?;
533
534            let indices = ArrowPrimitiveArray::from(
535                values
536                    .iter()
537                    .enumerate()
538                    .filter_map(|(index, is_recursive)| {
539                        // can't fail - we're iterating over a 32-bit container
540                        #[expect(clippy::cast_possible_wrap)]
541                        (is_recursive == Some(true)).then_some(index as i32)
542                    })
543                    .collect_vec(),
544            );
545
546            let chunk = chunk.taken(&indices);
547
548            (!chunk.is_empty()).then_some(chunk)
549        }
550
551        let components = [archetypes::Clear::descriptor_is_recursive().component];
552
553        // All unique entity paths present in the view contents.
554        let entity_paths: IntSet<EntityPath> = view_contents
555            .iter()
556            .filter_map(|col| col.entity_path().cloned())
557            .collect();
558
559        entity_paths
560            .iter()
561            .filter_map(|entity_path| {
562                // For the entity itself, any chunk that contains clear data is relevant, recursive or not.
563                // Just fetch everything we find.
564                let flat_chunks = self
565                    .fetch_chunks(store, cache, query, entity_path, components)
566                    .map(|chunks| {
567                        chunks
568                            .into_iter()
569                            .map(|(_cursor, chunk)| chunk)
570                            .collect_vec()
571                    })
572                    .unwrap_or_default();
573
574                let recursive_chunks =
575                    entity_path_ancestors(entity_path).flat_map(|ancestor_path| {
576                        self.fetch_chunks(store, cache, query, &ancestor_path, components)
577                            .into_iter() // option
578                            .flat_map(|chunks| chunks.into_iter().map(|(_cursor, chunk)| chunk))
579                            // NOTE: Ancestors' chunks are only relevant for the rows where `ClearIsRecursive=true`.
580                            .filter_map(|chunk| chunk_filter_recursive_only(&chunk))
581                    });
582
583                let chunks = flat_chunks
584                    .into_iter()
585                    .chain(recursive_chunks)
586                    // The component data is irrelevant.
587                    // We do not expose the actual tombstones to end-users, only their _effect_.
588                    .map(|chunk| chunk.components_removed())
589                    .collect_vec();
590
591                (!chunks.is_empty()).then(|| (entity_path.clone(), chunks))
592            })
593            .collect()
594    }
595
596    fn fetch_chunks(
597        &self,
598        _store: &ChunkStore,
599        cache: &QueryCache,
600        query: &RangeQuery,
601        entity_path: &EntityPath,
602        components: impl IntoIterator<Item = ComponentIdentifier>,
603    ) -> Option<Vec<(AtomicU64, Chunk)>> {
604        // NOTE: Keep in mind that the range APIs natively make sure that we will
605        // either get a bunch of relevant _static_ chunks, or a bunch of relevant
606        // _temporal_ chunks, but never both.
607        //
608        // TODO(cmc): Going through the cache is very useful in a Viewer context, but
609        // not so much in an SDK context. Make it configurable.
610        let results = cache.range(query, entity_path, components);
611
612        debug_assert!(
613            results.components.len() <= 1,
614            "cannot possibly get more than one component with this query"
615        );
616
617        results
618            .components
619            .into_iter()
620            .next()
621            .map(|(_component_descr, chunks)| {
622                chunks
623                    .into_iter()
624                    .map(|chunk| {
625                        // NOTE: Keep in mind that the range APIs would have already taken care
626                        // of A) sorting the chunk on the `filtered_index` (and row-id) and
627                        // B) densifying it according to the current `component_type`.
628                        // Both of these are mandatory requirements for the deduplication logic to
629                        // do what we want: keep the latest known value for `component_type` at all
630                        // remaining unique index values all while taking row-id ordering semantics
631                        // into account.
632                        debug_assert!(
633                            if let Some(index) = self.query.filtered_index.as_ref() {
634                                chunk.is_timeline_sorted(index)
635                            } else {
636                                chunk.is_sorted()
637                            },
638                            "the query cache should have already taken care of sorting (and densifying!) the chunk",
639                        );
640
641                        // TODO(cmc): That'd be more elegant, but right now there is no way to
642                        // avoid allocations and copies when using Arrow's `ListArray`.
643                        //
644                        // let chunk = chunk.deduped_latest_on_index(&query.timeline);
645
646                        (AtomicU64::default(), chunk)
647                    })
648                    .collect_vec()
649            })
650    }
651
652    /// The query used to instantiate this handle.
653    #[inline]
654    pub fn query(&self) -> &QueryExpression {
655        &self.query
656    }
657
658    /// Describes the columns that make up this view.
659    ///
660    /// See [`QueryExpression::view_contents`].
661    #[inline]
662    pub fn view_contents(&self) -> &ChunkColumnDescriptors {
663        &self.init().view_contents
664    }
665
666    /// Describes the columns that make up this selection.
667    ///
668    /// The extra `usize` is the index in [`Self::view_contents`] that this selection points to.
669    ///
670    /// See [`QueryExpression::selection`].
671    #[inline]
672    pub fn selected_contents(&self) -> &[(usize, ColumnDescriptor)] {
673        &self.init().selected_contents
674    }
675
676    /// All results returned by this handle will strictly follow this Arrow schema.
677    ///
678    /// Columns that do not yield any data will still be present in the results, filled with null values.
679    #[inline]
680    pub fn schema(&self) -> &ArrowSchemaRef {
681        &self.init().arrow_schema
682    }
683
684    /// Advance all internal cursors so that the next row yielded will correspond to `row_idx`.
685    ///
686    /// Does nothing if `row_idx` is out of bounds.
687    ///
688    /// ## Concurrency
689    ///
690    /// Cursors are implemented using atomic variables, which means calling any of the `seek_*`
691    /// while iteration is concurrently ongoing is memory-safe but logically undefined racy
692    /// behavior. Be careful.
693    ///
694    /// ## Performance
695    ///
696    /// This requires going through every chunk once, and for each chunk running a binary search if
697    /// the chunk's time range contains the `index_value`.
698    ///
699    /// I.e.: it's pretty cheap already.
700    #[tracing::instrument(level = "trace", skip_all)]
701    #[inline]
702    pub fn seek_to_row(&self, row_idx: usize) {
703        let state = self.init();
704
705        let Some(index_value) = state.unique_index_values.get(row_idx) else {
706            return;
707        };
708
709        state.cur_row.store(row_idx as _, Ordering::Relaxed);
710        self.seek_to_index_value(*index_value);
711    }
712
713    /// Advance all internal cursors so that the next row yielded will correspond to `index_value`.
714    ///
715    /// If `index_value` isn't present in the dataset, this seeks to the first index value
716    /// available past that point, if any.
717    ///
718    /// ## Concurrency
719    ///
720    /// Cursors are implemented using atomic variables, which means calling any of the `seek_*`
721    /// while iteration is concurrently ongoing is memory-safe but logically undefined racy
722    /// behavior. Be careful.
723    ///
724    /// ## Performance
725    ///
726    /// This requires going through every chunk once, and for each chunk running a binary search if
727    /// the chunk's time range contains the `index_value`.
728    ///
729    /// I.e.: it's pretty cheap already.
730    #[tracing::instrument(level = "debug", skip_all)]
731    fn seek_to_index_value(&self, index_value: IndexValue) {
732        re_tracing::profile_function!();
733
734        let state = self.init();
735
736        if index_value.is_static() {
737            for chunks in &state.view_chunks {
738                for (cursor, _chunk) in chunks {
739                    cursor.store(0, Ordering::Relaxed);
740                }
741            }
742            return;
743        }
744
745        for chunks in &state.view_chunks {
746            for (cursor, chunk) in chunks {
747                // NOTE: The chunk has been densified already: its global time range is the same as
748                // the time range for the specific component of interest.
749                let Some(time_column) = chunk.timelines().get(&state.filtered_index) else {
750                    continue;
751                };
752
753                let time_range = time_column.time_range();
754
755                let new_cursor = if index_value < time_range.min() {
756                    0
757                } else if index_value > time_range.max() {
758                    chunk.num_rows() as u64 /* yes, one past the end -- not a mistake */
759                } else {
760                    time_column
761                        .times_raw()
762                        .partition_point(|&time| time < index_value.as_i64())
763                        as u64
764                };
765
766                cursor.store(new_cursor, Ordering::Relaxed);
767            }
768        }
769    }
770
771    /// How many rows of data will be returned?
772    ///
773    /// The number of rows depends and only depends on the _view contents_.
774    /// The _selected contents_ has no influence on this value.
775    pub fn num_rows(&self) -> u64 {
776        self.init().unique_index_values.len() as _
777    }
778
779    /// Returns the row index of the last row whose index value is <= the given time,
780    /// or `None` if no such row exists.
781    pub fn row_index_at_or_before_time(&self, time: TimeInt) -> Option<u64> {
782        let state = self.init();
783        let idx = state.unique_index_values.partition_point(|t| *t <= time);
784        if idx == 0 {
785            None
786        } else {
787            Some((idx - 1) as u64)
788        }
789    }
790
791    /// Returns the next row's worth of data.
792    ///
793    /// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
794    /// Columns that do not yield any data will still be present in the results, filled with null values.
795    ///
796    /// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
797    /// the index, for each respective `ColumnDescriptor`.
798    /// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
799    ///
800    /// Example:
801    /// ```ignore
802    /// while let Some(row) = query_handle.next_row() {
803    ///     // …
804    /// }
805    /// ```
806    ///
807    /// ## Pagination
808    ///
809    /// Use [`Self::seek_to_row`]:
810    /// ```ignore
811    /// query_handle.seek_to_row(42);
812    /// for row in query_handle.into_iter().take(len) {
813    ///     // …
814    /// }
815    /// ```
816    #[inline]
817    pub fn next_row(&self) -> Option<Vec<ArrayRef>> {
818        self.engine
819            .with(|store, cache| self._next_row(store, cache))
820    }
821
822    /// Asynchronously returns the next row's worth of data.
823    ///
824    /// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
825    /// Columns that do not yield any data will still be present in the results, filled with null values.
826    ///
827    /// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
828    /// the index, for each respective `ColumnDescriptor`.
829    /// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
830    ///
831    /// Example:
832    /// ```ignore
833    /// while let Some(row) = query_handle.next_row_async().await {
834    ///     // …
835    /// }
836    /// ```
837    #[cfg(not(target_arch = "wasm32"))]
838    pub fn next_row_async(
839        &self,
840    ) -> impl std::future::Future<Output = Option<Vec<ArrayRef>>> + use<E>
841    where
842        E: 'static + Send + Clone,
843    {
844        let res: Option<Option<_>> = self
845            .engine
846            .try_with(|store, cache| self._next_row(store, cache));
847
848        let engine = self.engine.clone();
849        std::future::poll_fn(move |cx| {
850            if let Some(row) = &res {
851                std::task::Poll::Ready(row.clone())
852            } else {
853                // The lock is already held by a writer, we have to yield control back to the async
854                // runtime, for now.
855                // Before we do so, we need to schedule a callback that will be in charge of waking up
856                // the async task once we can possibly make progress once again.
857
858                // Commenting out this code should make the `async_barebones` test deadlock.
859                rayon::spawn({
860                    let engine = engine.clone();
861                    let waker = cx.waker().clone();
862                    move || {
863                        engine.with(|_store, _cache| {
864                            // This is of course optimistic -- we might end up right back here on
865                            // next tick. That's fine.
866                            waker.wake();
867                        });
868                    }
869                });
870
871                std::task::Poll::Pending
872            }
873        })
874    }
875
876    #[tracing::instrument(level = "debug", skip_all)]
877    pub fn _next_row(&self, store: &ChunkStore, cache: &QueryCache) -> Option<Vec<ArrowArrayRef>> {
878        re_tracing::profile_function!();
879
880        /// Temporary state used to resolve the streaming join for the current iteration.
881        #[derive(Debug)]
882        struct StreamingJoinStateEntry<'a> {
883            /// Which `Chunk` is this?
884            chunk: &'a Chunk,
885
886            /// How far are we into this `Chunk`?
887            cursor: u64,
888
889            /// What's the `RowId` at the current cursor?
890            row_id: RowId,
891        }
892
893        /// Temporary state used to resolve the streaming join for the current iteration.
894        ///
895        /// Possibly retrofilled, see [`QueryExpression::sparse_fill_strategy`].
896        #[derive(Debug)]
897        enum StreamingJoinState<'a> {
898            /// Incoming data for the current iteration.
899            StreamingJoinState(StreamingJoinStateEntry<'a>),
900
901            /// Data retrofilled through an extra query.
902            ///
903            /// See [`QueryExpression::sparse_fill_strategy`].
904            Retrofilled(UnitChunkShared),
905        }
906
907        // Although that's a synchronous lock, we probably don't need to worry about it until
908        // there is proof to the contrary: we are in a specific `QueryHandle` after all, there's
909        // really no good reason to be contending here in the first place.
910        let state = self.state.get_or_init(move || self.init_(store, cache));
911
912        let row_idx = state.cur_row.fetch_add(1, Ordering::Relaxed);
913        let cur_index_value = state.unique_index_values.get(row_idx as usize)?;
914
915        // First, we need to find, among all the chunks available for the current view contents,
916        // what is their index value for the current row?
917        //
918        // NOTE: Non-component columns don't have a streaming state, hence the optional layer.
919        let mut view_streaming_state: Vec<Option<StreamingJoinStateEntry<'_>>> =
920            // NOTE: cannot use vec![], it has limitations with non-cloneable options.
921            // vec![None; state.view_chunks.len()];
922            std::iter::repeat(())
923                .map(|_| None)
924                .take(state.view_chunks.len())
925                .collect();
926        for (view_column_idx, view_chunks) in state.view_chunks.iter().enumerate() {
927            let streaming_state = &mut view_streaming_state[view_column_idx];
928
929            'overlaps: for (cur_cursor, cur_chunk) in view_chunks {
930                // TODO(cmc): This can easily be optimized by looking ahead and breaking as soon as chunks
931                // stop overlapping.
932
933                // NOTE: Too soon to increment the cursor, we cannot know yet which chunks will or
934                // will not be part of the current row.
935                let mut cur_cursor_value = cur_cursor.load(Ordering::Relaxed);
936
937                let cur_index_times_empty: &[i64] = &[];
938                let cur_index_times = cur_chunk
939                    .timelines()
940                    .get(&state.filtered_index)
941                    .map_or(cur_index_times_empty, |time_column| time_column.times_raw());
942                let cur_index_row_ids = cur_chunk.row_ids_slice();
943
944                let (index_value, cur_row_id) = 'walk: loop {
945                    let (Some(mut index_value), Some(mut cur_row_id)) = (
946                        cur_index_times
947                            .get(cur_cursor_value as usize)
948                            .copied()
949                            .map(TimeInt::new_temporal),
950                        cur_index_row_ids.get(cur_cursor_value as usize).copied(),
951                    ) else {
952                        continue 'overlaps;
953                    };
954
955                    if index_value == *cur_index_value {
956                        // TODO(cmc): Because of Arrow's `ListArray` limitations, we inline the
957                        // "deduped_latest_on_index" logic here directly, which prevents a lot of
958                        // unnecessary allocations and copies.
959                        while let (Some(next_index_value), Some(next_row_id)) = (
960                            cur_index_times
961                                .get(cur_cursor_value as usize + 1)
962                                .copied()
963                                .map(TimeInt::new_temporal),
964                            cur_index_row_ids
965                                .get(cur_cursor_value as usize + 1)
966                                .copied(),
967                        ) {
968                            if next_index_value == *cur_index_value {
969                                index_value = next_index_value;
970                                cur_row_id = next_row_id;
971                                cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
972                            } else {
973                                break;
974                            }
975                        }
976
977                        break 'walk (index_value, cur_row_id);
978                    }
979
980                    if index_value > *cur_index_value {
981                        continue 'overlaps;
982                    }
983
984                    cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
985                };
986
987                debug_assert_eq!(index_value, *cur_index_value);
988
989                if let Some(streaming_state) = streaming_state.as_mut() {
990                    let StreamingJoinStateEntry {
991                        chunk,
992                        cursor,
993                        row_id,
994                    } = streaming_state;
995
996                    if cur_row_id > *row_id {
997                        *chunk = cur_chunk;
998                        *cursor = cur_cursor_value;
999                        *row_id = cur_row_id;
1000                    }
1001                } else {
1002                    *streaming_state = Some(StreamingJoinStateEntry {
1003                        chunk: cur_chunk,
1004                        cursor: cur_cursor_value,
1005                        row_id: cur_row_id,
1006                    });
1007                }
1008            }
1009        }
1010
1011        let mut view_streaming_state = view_streaming_state
1012            .into_iter()
1013            .map(|streaming_state| streaming_state.map(StreamingJoinState::StreamingJoinState))
1014            .collect_vec();
1015
1016        // Static always wins, no matter what.
1017        for (selected_idx, static_state) in state.selected_static_values.iter().enumerate() {
1018            if let static_state @ Some(_) =
1019                static_state.clone().map(StreamingJoinState::Retrofilled)
1020            {
1021                let Some(view_idx) = state
1022                    .selected_contents
1023                    .get(selected_idx)
1024                    .map(|(view_idx, _)| *view_idx)
1025                else {
1026                    debug_panic!("selected_idx out of bounds");
1027                    continue;
1028                };
1029
1030                let Some(streaming_state) = view_streaming_state.get_mut(view_idx) else {
1031                    debug_panic!("view_idx out of bounds");
1032                    continue;
1033                };
1034
1035                *streaming_state = static_state;
1036            }
1037        }
1038
1039        match self.query.sparse_fill_strategy {
1040            SparseFillStrategy::None => {}
1041
1042            SparseFillStrategy::LatestAtGlobal => {
1043                // Everything that yielded `null` for the current iteration.
1044                let null_streaming_states = view_streaming_state
1045                    .iter_mut()
1046                    .enumerate()
1047                    .filter(|(_view_idx, streaming_state)| streaming_state.is_none());
1048
1049                for (view_idx, streaming_state) in null_streaming_states {
1050                    let Some(ColumnDescriptor::Component(descr)) =
1051                        state.view_contents.get_index_or_component(view_idx)
1052                    else {
1053                        continue;
1054                    };
1055
1056                    // NOTE: While it would be very tempting to resolve the latest-at state
1057                    // of the entire view contents at `filtered_index_range.start - 1` once
1058                    // during `QueryHandle` initialization, and then bootstrap off of that, that
1059                    // would effectively close the door to efficient pagination forever, since
1060                    // we'd have to iterate over all the pages to compute the right latest-at
1061                    // value at t+n (i.e. no more random access).
1062                    // Therefore, it is better to simply do this the "dumb" way.
1063                    //
1064                    // TODO(cmc): Still, as always, this can be made faster and smarter at
1065                    // the cost of some extra complexity (e.g. caching the result across
1066                    // consecutive nulls etc). Later.
1067
1068                    let query =
1069                        re_chunk::LatestAtQuery::new(state.filtered_index, *cur_index_value);
1070
1071                    let results =
1072                        cache.latest_at(&query, &descr.entity_path.clone(), [descr.component]);
1073
1074                    *streaming_state = results
1075                        .components
1076                        .into_values()
1077                        .next()
1078                        .map(|unit| StreamingJoinState::Retrofilled(unit.clone()));
1079                }
1080            }
1081        }
1082
1083        // We are stitching a bunch of unrelated cells together in order to create the final row
1084        // that is being returned.
1085        //
1086        // For this reason, we can only guarantee that the index being explicitly queried for
1087        // (`QueryExpression::filtered_index`) will match for all these cells.
1088        //
1089        // When it comes to other indices that the caller might have asked for, it is possible that
1090        // these different cells won't share the same values (e.g. two cells were found at
1091        // `log_time=100`, but one of them has `frame=3` and the other `frame=5`, for whatever
1092        // reason).
1093        // In order to deal with this, we keep track of the maximum value for every possible index
1094        // within the returned set of cells, and return that.
1095        //
1096        // TODO(cmc): In the future, it would be nice to make that either configurable, e.g.:
1097        // * return the minimum value instead of the max
1098        // * return the exact value for each component (that would be a _lot_ of columns!)
1099        // * etc
1100        let mut max_value_per_index: IntMap<TimelineName, (TimeInt, ArrowScalarBuffer<i64>)> =
1101            IntMap::default();
1102        {
1103            view_streaming_state
1104                .iter()
1105                .flatten()
1106                .flat_map(|streaming_state| {
1107                    match streaming_state {
1108                        StreamingJoinState::StreamingJoinState(s) => s.chunk.timelines(),
1109                        StreamingJoinState::Retrofilled(unit) => unit.timelines(),
1110                    }
1111                    .values()
1112                    // NOTE: Cannot fail, just want to stay away from unwraps.
1113                    .filter_map(move |time_column| {
1114                        let cursor = match streaming_state {
1115                            StreamingJoinState::StreamingJoinState(s) => s.cursor as usize,
1116                            StreamingJoinState::Retrofilled(_) => 0,
1117                        };
1118                        time_column
1119                            .times_raw()
1120                            .get(cursor)
1121                            .copied()
1122                            .map(TimeInt::new_temporal)
1123                            .map(|time| {
1124                                (
1125                                    *time_column.timeline(),
1126                                    (time, time_column.times_buffer().slice(cursor, 1)),
1127                                )
1128                            })
1129                    })
1130                })
1131                .for_each(|(timeline, (time, time_sliced))| {
1132                    max_value_per_index
1133                        .entry(*timeline.name())
1134                        .and_modify(|(max_time, max_time_sliced)| {
1135                            if time > *max_time {
1136                                *max_time = time;
1137                                *max_time_sliced = time_sliced.clone();
1138                            }
1139                        })
1140                        .or_insert((time, time_sliced));
1141                });
1142
1143            if !cur_index_value.is_static() {
1144                // The current index value (if temporal) should be the one returned for the
1145                // queried index, no matter what.
1146                max_value_per_index.insert(
1147                    state.filtered_index,
1148                    (
1149                        *cur_index_value,
1150                        ArrowScalarBuffer::from(vec![cur_index_value.as_i64()]),
1151                    ),
1152                );
1153            }
1154        }
1155
1156        // NOTE: Non-component entries have no data to slice, hence the optional layer.
1157        //
1158        // TODO(cmc): no point in slicing arrays that are not selected.
1159        let view_sliced_arrays: Vec<Option<_>> = view_streaming_state
1160            .iter()
1161            .enumerate()
1162            .map(|(view_idx, streaming_state)| {
1163                // NOTE: Reminder: the only reason the streaming state could be `None` here is
1164                // because this column does not have data for the current index value (i.e. `null`).
1165                let streaming_state = streaming_state.as_ref()?;
1166                let list_array = match streaming_state {
1167                    StreamingJoinState::StreamingJoinState(s) => {
1168                        debug_assert!(
1169                            s.chunk.components().iter().count() <= 1,
1170                            "cannot possibly get more than one component with this query"
1171                        );
1172
1173                        s.chunk
1174                            .components()
1175                            .list_arrays()
1176                            .next()
1177                            .map(|list_array| list_array.slice(s.cursor as usize, 1))
1178
1179                    }
1180
1181                    StreamingJoinState::Retrofilled(unit) => {
1182                        let component_desc = state.view_contents.get_index_or_component(view_idx).and_then(|col| if let ColumnDescriptor::Component(descr) = col {
1183                            if let Some(component_type) = descr.component_type  { component_type.sanity_check(); }
1184                            Some(re_types_core::ComponentDescriptor {
1185                                component_type: descr.component_type,
1186                                archetype: descr.archetype,
1187                                component: descr.component,
1188                            })
1189                        } else {
1190                            None
1191                        })?;
1192                        unit.components().get_array(component_desc.component).cloned()
1193                    }
1194                };
1195
1196
1197                debug_assert!(
1198                    list_array.is_some(),
1199                    "This must exist or the chunk wouldn't have been sliced/retrofilled to start with."
1200                );
1201
1202                // NOTE: This cannot possibly return None, see assert above.
1203                list_array
1204            })
1205            .collect();
1206
1207        // TODO(cmc): It would likely be worth it to allocate all these possible
1208        // null-arrays ahead of time, and just return a pointer to those in the failure
1209        // case here.
1210        let selected_arrays = state
1211            .selected_contents
1212            .iter()
1213            .map(|(view_idx, column)| match column {
1214                ColumnDescriptor::RowId(_) => state
1215                    .view_chunks
1216                    .first()
1217                    .and_then(|vec| vec.first()) // TODO(#9922): verify that using the row:ids from the first chunk always makes sense
1218                    .map(|(row_idx, chunk)| {
1219                        as_array_ref(
1220                            chunk
1221                                .row_ids_array()
1222                                .slice(row_idx.load(Ordering::Acquire) as _, 1),
1223                        )
1224                    })
1225                    .unwrap_or_else(|| arrow::array::new_null_array(&RowId::arrow_datatype(), 1)),
1226
1227                ColumnDescriptor::Time(descr) => max_value_per_index
1228                    .get(descr.timeline().name())
1229                    .map_or_else(
1230                        || arrow::array::new_null_array(&column.arrow_datatype(), 1),
1231                        |(_time, time_sliced)| {
1232                            descr.timeline().typ().make_arrow_array(time_sliced.clone())
1233                        },
1234                    ),
1235
1236                ColumnDescriptor::Component(_descr) => view_sliced_arrays
1237                    .get(*view_idx)
1238                    .cloned()
1239                    .flatten()
1240                    .map(into_arrow_ref)
1241                    .unwrap_or_else(|| arrow::array::new_null_array(&column.arrow_datatype(), 1)),
1242            })
1243            .collect_vec();
1244
1245        debug_assert_eq!(state.arrow_schema.fields.len(), selected_arrays.len());
1246
1247        Some(selected_arrays)
1248    }
1249
1250    /// Calls [`Self::next_row`] and wraps the result in a [`ArrowRecordBatch`].
1251    ///
1252    /// Only use this if you absolutely need a [`ArrowRecordBatch`] as this adds a
1253    /// some overhead for schema validation.
1254    ///
1255    /// See [`Self::next_row`] for more information.
1256    #[inline]
1257    pub fn next_row_batch(&self) -> Option<ArrowRecordBatch> {
1258        let row = self.next_row()?;
1259        match ArrowRecordBatch::try_new_with_options(
1260            self.schema().clone(),
1261            row,
1262            // Explicitly setting row-count to one means it works even when there are no columns (e.g. due to heavy filtering)
1263            &RecordBatchOptions::new().with_row_count(Some(1)),
1264        ) {
1265            Ok(batch) => Some(batch),
1266            Err(err) => {
1267                if cfg!(debug_assertions) {
1268                    panic!("Failed to create record batch: {err}");
1269                } else {
1270                    re_log::error_once!("Failed to create record batch: {err}");
1271                    None
1272                }
1273            }
1274        }
1275    }
1276
1277    #[inline]
1278    #[cfg(not(target_arch = "wasm32"))]
1279    pub async fn next_row_batch_async(&self) -> Option<ArrowRecordBatch>
1280    where
1281        E: 'static + Send + Clone,
1282    {
1283        let row = self.next_row_async().await?;
1284        let row_count = row.first().map(|a| a.len()).unwrap_or(0);
1285
1286        // If we managed to get a row, then the state must be initialized already.
1287        #[expect(clippy::unwrap_used)]
1288        let schema = self.state.get().unwrap().arrow_schema.clone();
1289
1290        ArrowRecordBatch::try_new_with_options(
1291            schema,
1292            row,
1293            &RecordBatchOptions::default().with_row_count(Some(row_count)),
1294        )
1295        .ok()
1296    }
1297}
1298
1299impl<E: StorageEngineLike> QueryHandle<E> {
1300    /// Returns an iterator backed by [`Self::next_row`].
1301    pub fn iter(&self) -> impl Iterator<Item = Vec<ArrowArrayRef>> + '_ {
1302        std::iter::from_fn(move || self.next_row())
1303    }
1304
1305    /// Returns an iterator backed by [`Self::next_row`].
1306    #[expect(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
1307    pub fn into_iter(self) -> impl Iterator<Item = Vec<ArrowArrayRef>> {
1308        std::iter::from_fn(move || self.next_row())
1309    }
1310
1311    /// Returns an iterator backed by [`Self::next_row_batch`].
1312    pub fn batch_iter(&self) -> impl Iterator<Item = ArrowRecordBatch> + '_ {
1313        std::iter::from_fn(move || self.next_row_batch())
1314    }
1315
1316    /// Returns an iterator backed by [`Self::next_row_batch`].
1317    pub fn into_batch_iter(self) -> impl Iterator<Item = ArrowRecordBatch> {
1318        std::iter::from_fn(move || self.next_row_batch())
1319    }
1320}
1321
1322// ---
1323
1324#[cfg(test)]
1325#[expect(clippy::iter_on_single_items)]
1326mod tests {
1327    use std::sync::Arc;
1328
1329    use arrow::array::{StringArray, UInt32Array};
1330    use arrow::compute::concat_batches;
1331    use insta::assert_snapshot;
1332    use re_arrow_util::format_record_batch;
1333    use re_chunk::{Chunk, ChunkId, ComponentIdentifier, RowId, TimePoint};
1334    use re_chunk_store::{
1335        AbsoluteTimeRange, ChunkStore, ChunkStoreConfig, ChunkStoreHandle, QueryExpression, TimeInt,
1336    };
1337    use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints};
1338    use re_log_types::{EntityPath, Timeline, build_frame_nr, build_log_time};
1339    use re_query::StorageEngine;
1340    use re_sdk_types::{AnyValues, AsComponents as _, ComponentDescriptor};
1341    use re_sorbet::ComponentColumnSelector;
1342    use re_types_core::components;
1343
1344    use super::*;
1345    use crate::{QueryCache, QueryEngine};
1346
1347    /// Implement `Display` for `ArrowRecordBatch`
1348    struct DisplayRB(ArrowRecordBatch);
1349
1350    impl std::fmt::Display for DisplayRB {
1351        #[inline]
1352        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1353            let width = 200;
1354            re_arrow_util::format_record_batch_with_width(&self.0, Some(width), f.sign_minus())
1355                .fmt(f)
1356        }
1357    }
1358
1359    // NOTE: The best way to understand what these tests are doing is to run them in verbose mode,
1360    // e.g. `cargo t -p re_dataframe -- --show-output barebones`.
1361    // Each test will print the state of the store, the query being run, and the results that were
1362    // returned in the usual human-friendly format.
1363    // From there it is generally straightforward to infer what's going on.
1364
1365    // TODO(cmc): at least one basic test for every feature in `QueryExpression`.
1366    // In no particular order:
1367    // * [x] filtered_index
1368    // * [x] filtered_index_range
1369    // * [x] filtered_index_values
1370    // * [x] view_contents
1371    // * [x] selection
1372    // * [x] filtered_is_not_null
1373    // * [x] sparse_fill_strategy
1374    // * [x] using_index_values
1375    //
1376    // In addition to those, some much needed extras:
1377    // * [x] num_rows
1378    // * [x] clears
1379    // * [ ] timelines returned with selection=none
1380    // * [x] pagination
1381
1382    // TODO(cmc): At some point I'd like to stress multi-entity queries too, but that feels less
1383    // urgent considering how things are implemented (each entity lives in its own index, so it's
1384    // really just more of the same).
1385
1386    /// All features disabled.
1387    #[test]
1388    fn barebones() -> anyhow::Result<()> {
1389        re_log::setup_logging();
1390
1391        let store = ChunkStoreHandle::new(create_nasty_store()?);
1392        eprintln!("{store}");
1393        let query_cache = QueryCache::new_handle(store.clone());
1394        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1395
1396        let filtered_index = Some(TimelineName::new("frame_nr"));
1397
1398        // static
1399        {
1400            let query = QueryExpression::default();
1401            eprintln!("{query:#?}:");
1402
1403            let query_handle = query_engine.query(query.clone());
1404            assert_eq!(
1405                query_engine.query(query.clone()).into_iter().count() as u64,
1406                query_handle.num_rows()
1407            );
1408            let dataframe = concat_batches(
1409                query_handle.schema(),
1410                &query_handle.batch_iter().collect_vec(),
1411            )?;
1412            eprintln!("{}", format_record_batch(&dataframe.clone()));
1413
1414            assert_snapshot!(DisplayRB(dataframe));
1415        }
1416
1417        // temporal
1418        {
1419            let query = QueryExpression {
1420                filtered_index,
1421                ..Default::default()
1422            };
1423            eprintln!("{query:#?}:");
1424
1425            let query_handle = query_engine.query(query.clone());
1426            assert_eq!(
1427                query_engine.query(query.clone()).into_iter().count() as u64,
1428                query_handle.num_rows()
1429            );
1430            let dataframe = concat_batches(
1431                query_handle.schema(),
1432                &query_handle.batch_iter().collect_vec(),
1433            )?;
1434            eprintln!("{}", format_record_batch(&dataframe.clone()));
1435
1436            assert_snapshot!(DisplayRB(dataframe));
1437        }
1438
1439        Ok(())
1440    }
1441
1442    #[test]
1443    fn sparse_fill_strategy_latestatglobal() -> anyhow::Result<()> {
1444        re_log::setup_logging();
1445
1446        let store = ChunkStoreHandle::new(create_nasty_store()?);
1447        eprintln!("{store}");
1448        let query_cache = QueryCache::new_handle(store.clone());
1449        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1450
1451        let filtered_index = Some(TimelineName::new("frame_nr"));
1452        let query = QueryExpression {
1453            filtered_index,
1454            sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
1455            ..Default::default()
1456        };
1457        eprintln!("{query:#?}:");
1458
1459        let query_handle = query_engine.query(query.clone());
1460        assert_eq!(
1461            query_engine.query(query.clone()).into_iter().count() as u64,
1462            query_handle.num_rows()
1463        );
1464        let dataframe = concat_batches(
1465            query_handle.schema(),
1466            &query_handle.batch_iter().collect_vec(),
1467        )?;
1468        eprintln!("{}", format_record_batch(&dataframe.clone()));
1469
1470        assert_snapshot!(DisplayRB(dataframe));
1471
1472        Ok(())
1473    }
1474
1475    #[test]
1476    fn filtered_index_range() -> anyhow::Result<()> {
1477        re_log::setup_logging();
1478
1479        let store = ChunkStoreHandle::new(create_nasty_store()?);
1480        eprintln!("{store}");
1481        let query_cache = QueryCache::new_handle(store.clone());
1482        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1483
1484        let filtered_index = Some(TimelineName::new("frame_nr"));
1485        let query = QueryExpression {
1486            filtered_index,
1487            filtered_index_range: Some(AbsoluteTimeRange::new(30, 60)),
1488            ..Default::default()
1489        };
1490        eprintln!("{query:#?}:");
1491
1492        let query_handle = query_engine.query(query.clone());
1493        assert_eq!(
1494            query_engine.query(query.clone()).into_iter().count() as u64,
1495            query_handle.num_rows()
1496        );
1497        let dataframe = concat_batches(
1498            query_handle.schema(),
1499            &query_handle.batch_iter().collect_vec(),
1500        )?;
1501        eprintln!("{}", format_record_batch(&dataframe.clone()));
1502
1503        assert_snapshot!(DisplayRB(dataframe));
1504
1505        Ok(())
1506    }
1507
1508    #[test]
1509    fn filtered_index_values() -> anyhow::Result<()> {
1510        re_log::setup_logging();
1511
1512        let store = ChunkStoreHandle::new(create_nasty_store()?);
1513        eprintln!("{store}");
1514        let query_cache = QueryCache::new_handle(store.clone());
1515        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1516
1517        let filtered_index = Some(TimelineName::new("frame_nr"));
1518        let query = QueryExpression {
1519            filtered_index,
1520            filtered_index_values: Some(
1521                [0, 30, 60, 90]
1522                    .into_iter()
1523                    .map(TimeInt::new_temporal)
1524                    .chain(std::iter::once(TimeInt::STATIC))
1525                    .collect(),
1526            ),
1527            ..Default::default()
1528        };
1529        eprintln!("{query:#?}:");
1530
1531        let query_handle = query_engine.query(query.clone());
1532        assert_eq!(
1533            query_engine.query(query.clone()).into_iter().count() as u64,
1534            query_handle.num_rows()
1535        );
1536        let dataframe = concat_batches(
1537            query_handle.schema(),
1538            &query_handle.batch_iter().collect_vec(),
1539        )?;
1540        eprintln!("{}", format_record_batch(&dataframe.clone()));
1541
1542        assert_snapshot!(DisplayRB(dataframe));
1543
1544        Ok(())
1545    }
1546
1547    #[test]
1548    fn using_index_values() -> anyhow::Result<()> {
1549        re_log::setup_logging();
1550
1551        let store = ChunkStoreHandle::new(create_nasty_store()?);
1552        eprintln!("{store}");
1553        let query_cache = QueryCache::new_handle(store.clone());
1554        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1555
1556        let filtered_index = Some(TimelineName::new("frame_nr"));
1557
1558        // vanilla
1559        {
1560            let query = QueryExpression {
1561                filtered_index,
1562                using_index_values: Some(
1563                    [0, 15, 30, 30, 45, 60, 75, 90]
1564                        .into_iter()
1565                        .map(TimeInt::new_temporal)
1566                        .chain(std::iter::once(TimeInt::STATIC))
1567                        .collect(),
1568                ),
1569                ..Default::default()
1570            };
1571            eprintln!("{query:#?}:");
1572
1573            let query_handle = query_engine.query(query.clone());
1574            assert_eq!(
1575                query_engine.query(query.clone()).into_iter().count() as u64,
1576                query_handle.num_rows()
1577            );
1578            let dataframe = concat_batches(
1579                query_handle.schema(),
1580                &query_handle.batch_iter().collect_vec(),
1581            )?;
1582            eprintln!("{}", format_record_batch(&dataframe.clone()));
1583
1584            assert_snapshot!(DisplayRB(dataframe));
1585        }
1586
1587        // sparse-filled
1588        {
1589            let query = QueryExpression {
1590                filtered_index,
1591                using_index_values: Some(
1592                    [0, 15, 30, 30, 45, 60, 75, 90]
1593                        .into_iter()
1594                        .map(TimeInt::new_temporal)
1595                        .chain(std::iter::once(TimeInt::STATIC))
1596                        .collect(),
1597                ),
1598                sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
1599                ..Default::default()
1600            };
1601            eprintln!("{query:#?}:");
1602
1603            let query_handle = query_engine.query(query.clone());
1604            assert_eq!(
1605                query_engine.query(query.clone()).into_iter().count() as u64,
1606                query_handle.num_rows()
1607            );
1608            let dataframe = concat_batches(
1609                query_handle.schema(),
1610                &query_handle.batch_iter().collect_vec(),
1611            )?;
1612            eprintln!("{}", format_record_batch(&dataframe.clone()));
1613
1614            assert_snapshot!(DisplayRB(dataframe));
1615        }
1616
1617        Ok(())
1618    }
1619
1620    #[test]
1621    fn filtered_is_not_null() -> anyhow::Result<()> {
1622        re_log::setup_logging();
1623
1624        let store = ChunkStoreHandle::new(create_nasty_store()?);
1625        eprintln!("{store}");
1626        let query_cache = QueryCache::new_handle(store.clone());
1627        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1628
1629        let filtered_index = Some(TimelineName::new("frame_nr"));
1630        let entity_path: EntityPath = "this/that".into();
1631
1632        // non-existing entity
1633        {
1634            let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
1635
1636            let query = QueryExpression {
1637                filtered_index,
1638                filtered_is_not_null: Some(ComponentColumnSelector {
1639                    entity_path: "no/such/entity".into(),
1640                    component: component.to_string(),
1641                }),
1642                ..Default::default()
1643            };
1644            eprintln!("{query:#?}:");
1645
1646            let query_handle = query_engine.query(query.clone());
1647            assert_eq!(
1648                query_engine.query(query.clone()).into_iter().count() as u64,
1649                query_handle.num_rows()
1650            );
1651            let dataframe = concat_batches(
1652                query_handle.schema(),
1653                &query_handle.batch_iter().collect_vec(),
1654            )?;
1655            eprintln!("{}", format_record_batch(&dataframe.clone()));
1656
1657            assert_snapshot!(DisplayRB(dataframe));
1658        }
1659
1660        // non-existing component
1661        {
1662            let query = QueryExpression {
1663                filtered_index,
1664                filtered_is_not_null: Some(ComponentColumnSelector {
1665                    entity_path: entity_path.clone(),
1666                    component: "AFieldThatDoesntExist".to_owned(),
1667                }),
1668                ..Default::default()
1669            };
1670            eprintln!("{query:#?}:");
1671
1672            let query_handle = query_engine.query(query.clone());
1673            assert_eq!(
1674                query_engine.query(query.clone()).into_iter().count() as u64,
1675                query_handle.num_rows()
1676            );
1677            let dataframe = concat_batches(
1678                query_handle.schema(),
1679                &query_handle.batch_iter().collect_vec(),
1680            )?;
1681            eprintln!("{}", format_record_batch(&dataframe.clone()));
1682
1683            assert_snapshot!(DisplayRB(dataframe));
1684        }
1685
1686        // MyPoint
1687        {
1688            let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
1689
1690            let query = QueryExpression {
1691                filtered_index,
1692                filtered_is_not_null: Some(ComponentColumnSelector {
1693                    entity_path: entity_path.clone(),
1694                    component: component.to_string(),
1695                }),
1696                ..Default::default()
1697            };
1698            eprintln!("{query:#?}:");
1699
1700            let query_handle = query_engine.query(query.clone());
1701            assert_eq!(
1702                query_engine.query(query.clone()).into_iter().count() as u64,
1703                query_handle.num_rows()
1704            );
1705            let dataframe = concat_batches(
1706                query_handle.schema(),
1707                &query_handle.batch_iter().collect_vec(),
1708            )?;
1709            eprintln!("{}", format_record_batch(&dataframe.clone()));
1710
1711            assert_snapshot!(DisplayRB(dataframe));
1712        }
1713
1714        // MyColor
1715        {
1716            let ComponentDescriptor { component, .. } = MyPoints::descriptor_colors();
1717
1718            let query = QueryExpression {
1719                filtered_index,
1720                filtered_is_not_null: Some(ComponentColumnSelector {
1721                    entity_path: entity_path.clone(),
1722                    component: component.to_string(),
1723                }),
1724                ..Default::default()
1725            };
1726            eprintln!("{query:#?}:");
1727
1728            let query_handle = query_engine.query(query.clone());
1729            assert_eq!(
1730                query_engine.query(query.clone()).into_iter().count() as u64,
1731                query_handle.num_rows()
1732            );
1733            let dataframe = concat_batches(
1734                query_handle.schema(),
1735                &query_handle.batch_iter().collect_vec(),
1736            )?;
1737            eprintln!("{}", format_record_batch(&dataframe.clone()));
1738
1739            assert_snapshot!(DisplayRB(dataframe));
1740        }
1741
1742        Ok(())
1743    }
1744
1745    #[test]
1746    fn view_contents() -> anyhow::Result<()> {
1747        re_log::setup_logging();
1748
1749        let store = ChunkStoreHandle::new(create_nasty_store()?);
1750        eprintln!("{store}");
1751        let query_cache = QueryCache::new_handle(store.clone());
1752        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1753
1754        let entity_path: EntityPath = "this/that".into();
1755        let filtered_index = Some(TimelineName::new("frame_nr"));
1756
1757        // empty view
1758        {
1759            let query = QueryExpression {
1760                filtered_index,
1761                view_contents: Some(
1762                    [(entity_path.clone(), Some(Default::default()))]
1763                        .into_iter()
1764                        .collect(),
1765                ),
1766                ..Default::default()
1767            };
1768            eprintln!("{query:#?}:");
1769
1770            let query_handle = query_engine.query(query.clone());
1771            assert_eq!(
1772                query_engine.query(query.clone()).into_iter().count() as u64,
1773                query_handle.num_rows()
1774            );
1775            let dataframe = concat_batches(
1776                query_handle.schema(),
1777                &query_handle.batch_iter().collect_vec(),
1778            )?;
1779            eprintln!("{}", format_record_batch(&dataframe.clone()));
1780
1781            assert_snapshot!(DisplayRB(dataframe));
1782        }
1783
1784        {
1785            let query = QueryExpression {
1786                filtered_index,
1787                view_contents: Some(
1788                    [(
1789                        entity_path.clone(),
1790                        Some(
1791                            [
1792                                MyPoints::descriptor_labels().component,
1793                                MyPoints::descriptor_colors().component,
1794                                ComponentIdentifier::new("AColumnThatDoesntEvenExist"),
1795                            ]
1796                            .into_iter()
1797                            .collect(),
1798                        ),
1799                    )]
1800                    .into_iter()
1801                    .collect(),
1802                ),
1803                ..Default::default()
1804            };
1805            eprintln!("{query:#?}:");
1806
1807            let query_handle = query_engine.query(query.clone());
1808            assert_eq!(
1809                query_engine.query(query.clone()).into_iter().count() as u64,
1810                query_handle.num_rows()
1811            );
1812            let dataframe = concat_batches(
1813                query_handle.schema(),
1814                &query_handle.batch_iter().collect_vec(),
1815            )?;
1816            eprintln!("{}", format_record_batch(&dataframe.clone()));
1817
1818            assert_snapshot!(DisplayRB(dataframe));
1819        }
1820
1821        Ok(())
1822    }
1823
1824    #[test]
1825    fn selection() -> anyhow::Result<()> {
1826        re_log::setup_logging();
1827
1828        let store = ChunkStoreHandle::new(create_nasty_store()?);
1829        eprintln!("{store}");
1830        let query_cache = QueryCache::new_handle(store.clone());
1831        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1832
1833        let entity_path: EntityPath = "this/that".into();
1834        let filtered_index = TimelineName::new("frame_nr");
1835
1836        // empty selection
1837        {
1838            let query = QueryExpression {
1839                filtered_index: Some(filtered_index),
1840                selection: Some(vec![]),
1841                ..Default::default()
1842            };
1843            eprintln!("{query:#?}:");
1844
1845            let query_handle = query_engine.query(query.clone());
1846            assert_eq!(
1847                query_engine.query(query.clone()).into_iter().count() as u64,
1848                query_handle.num_rows()
1849            );
1850            let dataframe = concat_batches(
1851                query_handle.schema(),
1852                &query_handle.batch_iter().collect_vec(),
1853            )?;
1854            eprintln!("{}", format_record_batch(&dataframe.clone()));
1855
1856            assert_snapshot!(DisplayRB(dataframe));
1857        }
1858
1859        // only indices (+ duplication)
1860        {
1861            let query = QueryExpression {
1862                filtered_index: Some(filtered_index),
1863                selection: Some(vec![
1864                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1865                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1866                    ColumnSelector::Time(TimeColumnSelector::from("ATimeColumnThatDoesntExist")),
1867                ]),
1868                ..Default::default()
1869            };
1870            eprintln!("{query:#?}:");
1871
1872            let query_handle = query_engine.query(query.clone());
1873            assert_eq!(
1874                query_engine.query(query.clone()).into_iter().count() as u64,
1875                query_handle.num_rows()
1876            );
1877            let dataframe = concat_batches(
1878                query_handle.schema(),
1879                &query_handle.batch_iter().collect_vec(),
1880            )?;
1881            eprintln!("{}", format_record_batch(&dataframe.clone()));
1882
1883            assert_snapshot!(DisplayRB(dataframe));
1884        }
1885
1886        // duplication and non-existing
1887        {
1888            let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
1889
1890            let query = QueryExpression {
1891                filtered_index: Some(filtered_index),
1892                selection: Some(vec![
1893                    // Duplication
1894                    ColumnSelector::Component(ComponentColumnSelector {
1895                        entity_path: entity_path.clone(),
1896                        component: component.to_string(),
1897                    }),
1898                    ColumnSelector::Component(ComponentColumnSelector {
1899                        entity_path: entity_path.clone(),
1900                        component: component.to_string(),
1901                    }),
1902                    // Non-existing entity
1903                    ColumnSelector::Component(ComponentColumnSelector {
1904                        entity_path: "non_existing_entity".into(),
1905                        component: component.to_string(),
1906                    }),
1907                    // Non-existing components
1908                    ColumnSelector::Component(ComponentColumnSelector {
1909                        entity_path: entity_path.clone(),
1910                        component: "MyPoints:AFieldThatDoesntExist".into(),
1911                    }),
1912                    ColumnSelector::Component(ComponentColumnSelector {
1913                        entity_path: entity_path.clone(),
1914                        component: "AFieldThatDoesntExist".into(),
1915                    }),
1916                    ColumnSelector::Component(ComponentColumnSelector {
1917                        entity_path: entity_path.clone(),
1918                        component: "AArchetypeNameThatDoesNotExist:positions".into(),
1919                    }),
1920                ]),
1921                ..Default::default()
1922            };
1923            eprintln!("{query:#?}:");
1924
1925            let query_handle = query_engine.query(query.clone());
1926            assert_eq!(
1927                query_engine.query(query.clone()).into_iter().count() as u64,
1928                query_handle.num_rows()
1929            );
1930            let dataframe = concat_batches(
1931                query_handle.schema(),
1932                &query_handle.batch_iter().collect_vec(),
1933            )?;
1934            eprintln!("{}", format_record_batch(&dataframe.clone()));
1935
1936            assert_snapshot!(DisplayRB(dataframe));
1937        }
1938
1939        // static
1940        {
1941            let ComponentDescriptor { component, .. } = MyPoints::descriptor_labels();
1942
1943            let query = QueryExpression {
1944                filtered_index: Some(filtered_index),
1945                selection: Some(vec![
1946                    // NOTE: This will force a crash if the selected indexes vs. view indexes are
1947                    // improperly handled.
1948                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1949                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1950                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1951                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1952                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1953                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1954                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1955                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1956                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1957                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1958                    //
1959                    ColumnSelector::Component(ComponentColumnSelector {
1960                        entity_path: entity_path.clone(),
1961                        component: component.to_string(),
1962                    }),
1963                ]),
1964                ..Default::default()
1965            };
1966            eprintln!("{query:#?}:");
1967
1968            let query_handle = query_engine.query(query.clone());
1969            assert_eq!(
1970                query_engine.query(query.clone()).into_iter().count() as u64,
1971                query_handle.num_rows()
1972            );
1973            let dataframe = concat_batches(
1974                query_handle.schema(),
1975                &query_handle.batch_iter().collect_vec(),
1976            )?;
1977            eprintln!("{}", format_record_batch(&dataframe.clone()));
1978
1979            assert_snapshot!(DisplayRB(dataframe));
1980        }
1981
1982        Ok(())
1983    }
1984
1985    #[test]
1986    fn view_contents_and_selection() -> anyhow::Result<()> {
1987        re_log::setup_logging();
1988
1989        let store = ChunkStoreHandle::new(create_nasty_store()?);
1990        eprintln!("{store}");
1991        let query_cache = QueryCache::new_handle(store.clone());
1992        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1993
1994        let entity_path: EntityPath = "this/that".into();
1995        let filtered_index = TimelineName::new("frame_nr");
1996
1997        // only components
1998        {
1999            let query = QueryExpression {
2000                filtered_index: Some(filtered_index),
2001                view_contents: Some(
2002                    [(
2003                        entity_path.clone(),
2004                        Some(
2005                            [
2006                                MyPoints::descriptor_colors().component,
2007                                MyPoints::descriptor_labels().component,
2008                            ]
2009                            .into_iter()
2010                            .collect(),
2011                        ),
2012                    )]
2013                    .into_iter()
2014                    .collect(),
2015                ),
2016                selection: Some(vec![
2017                    ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
2018                    ColumnSelector::Time(TimeColumnSelector::from(*Timeline::log_time().name())),
2019                    ColumnSelector::Time(TimeColumnSelector::from(*Timeline::log_tick().name())),
2020                    //
2021                    ColumnSelector::Component(ComponentColumnSelector {
2022                        entity_path: entity_path.clone(),
2023                        component: MyPoints::descriptor_points().component.to_string(),
2024                    }),
2025                    ColumnSelector::Component(ComponentColumnSelector {
2026                        entity_path: entity_path.clone(),
2027                        component: MyPoints::descriptor_colors().component.to_string(),
2028                    }),
2029                    ColumnSelector::Component(ComponentColumnSelector {
2030                        entity_path: entity_path.clone(),
2031                        component: MyPoints::descriptor_labels().component.to_string(),
2032                    }),
2033                ]),
2034                ..Default::default()
2035            };
2036            eprintln!("{query:#?}:");
2037
2038            let query_handle = query_engine.query(query.clone());
2039            assert_eq!(
2040                query_engine.query(query.clone()).into_iter().count() as u64,
2041                query_handle.num_rows()
2042            );
2043            let dataframe = concat_batches(
2044                query_handle.schema(),
2045                &query_handle.batch_iter().collect_vec(),
2046            )?;
2047            eprintln!("{}", format_record_batch(&dataframe.clone()));
2048
2049            assert_snapshot!(DisplayRB(dataframe));
2050        }
2051
2052        Ok(())
2053    }
2054
2055    #[test]
2056    fn clears() -> anyhow::Result<()> {
2057        re_log::setup_logging();
2058
2059        let store = ChunkStoreHandle::new(create_nasty_store()?);
2060        extend_nasty_store_with_clears(&mut store.write())?;
2061        eprintln!("{store}");
2062
2063        let query_cache = QueryCache::new_handle(store.clone());
2064        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
2065
2066        let filtered_index = Some(TimelineName::new("frame_nr"));
2067        let entity_path = EntityPath::from("this/that");
2068
2069        // barebones
2070        {
2071            let query = QueryExpression {
2072                filtered_index,
2073                view_contents: Some([(entity_path.clone(), None)].into_iter().collect()),
2074                ..Default::default()
2075            };
2076            eprintln!("{query:#?}:");
2077
2078            let query_handle = query_engine.query(query.clone());
2079            assert_eq!(
2080                query_engine.query(query.clone()).into_iter().count() as u64,
2081                query_handle.num_rows()
2082            );
2083            let dataframe = concat_batches(
2084                query_handle.schema(),
2085                &query_handle.batch_iter().collect_vec(),
2086            )?;
2087            eprintln!("{}", format_record_batch(&dataframe.clone()));
2088
2089            assert_snapshot!(DisplayRB(dataframe));
2090        }
2091
2092        // sparse-filled
2093        {
2094            let query = QueryExpression {
2095                filtered_index,
2096                view_contents: Some([(entity_path.clone(), None)].into_iter().collect()),
2097                sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
2098                ..Default::default()
2099            };
2100            eprintln!("{query:#?}:");
2101
2102            let query_handle = query_engine.query(query.clone());
2103            assert_eq!(
2104                query_engine.query(query.clone()).into_iter().count() as u64,
2105                query_handle.num_rows()
2106            );
2107            let dataframe = concat_batches(
2108                query_handle.schema(),
2109                &query_handle.batch_iter().collect_vec(),
2110            )?;
2111            eprintln!("{}", format_record_batch(&dataframe.clone()));
2112
2113            // TODO(#7650): Those null values for `MyColor` on 10 and 20 look completely insane, but then again
2114            // static clear semantics in general are pretty unhinged right now, especially when
2115            // ranges are involved.
2116
2117            assert_snapshot!(DisplayRB(dataframe));
2118        }
2119
2120        Ok(())
2121    }
2122
2123    #[test]
2124    fn pagination() -> anyhow::Result<()> {
2125        re_log::setup_logging();
2126
2127        let store = ChunkStoreHandle::new(create_nasty_store()?);
2128        eprintln!("{store}");
2129        let query_cache = QueryCache::new_handle(store.clone());
2130        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
2131
2132        let filtered_index = Some(TimelineName::new("frame_nr"));
2133        let entity_path = EntityPath::from("this/that");
2134
2135        // basic
2136        {
2137            let query = QueryExpression {
2138                filtered_index,
2139                ..Default::default()
2140            };
2141            eprintln!("{query:#?}:");
2142
2143            let query_handle = query_engine.query(query.clone());
2144            assert_eq!(
2145                query_engine.query(query.clone()).into_iter().count() as u64,
2146                query_handle.num_rows(),
2147            );
2148
2149            let expected_rows = query_handle.batch_iter().collect_vec();
2150
2151            for _ in 0..3 {
2152                for i in 0..expected_rows.len() {
2153                    query_handle.seek_to_row(i);
2154
2155                    let expected = concat_batches(
2156                        query_handle.schema(),
2157                        &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2158                    )?;
2159                    let got = concat_batches(
2160                        query_handle.schema(),
2161                        &query_handle.batch_iter().take(3).collect_vec(),
2162                    )?;
2163
2164                    let expected = format!("{:#?}", expected.columns());
2165                    let got = format!("{:#?}", got.columns());
2166
2167                    similar_asserts::assert_eq!(expected, got);
2168                }
2169            }
2170        }
2171
2172        // with pov
2173        {
2174            let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
2175            let query = QueryExpression {
2176                filtered_index,
2177                filtered_is_not_null: Some(ComponentColumnSelector {
2178                    entity_path: entity_path.clone(),
2179                    component: component.to_string(),
2180                }),
2181                ..Default::default()
2182            };
2183            eprintln!("{query:#?}:");
2184
2185            let query_handle = query_engine.query(query.clone());
2186            assert_eq!(
2187                query_engine.query(query.clone()).into_iter().count() as u64,
2188                query_handle.num_rows(),
2189            );
2190
2191            let expected_rows = query_handle.batch_iter().collect_vec();
2192
2193            for _ in 0..3 {
2194                for i in 0..expected_rows.len() {
2195                    query_handle.seek_to_row(i);
2196
2197                    let expected = concat_batches(
2198                        query_handle.schema(),
2199                        &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2200                    )?;
2201                    let got = concat_batches(
2202                        query_handle.schema(),
2203                        &query_handle.batch_iter().take(3).collect_vec(),
2204                    )?;
2205
2206                    let expected = format!("{:#?}", expected.columns());
2207                    let got = format!("{:#?}", got.columns());
2208
2209                    similar_asserts::assert_eq!(expected, got);
2210                }
2211            }
2212        }
2213
2214        // with sampling
2215        {
2216            let query = QueryExpression {
2217                filtered_index,
2218                using_index_values: Some(
2219                    [0, 15, 30, 30, 45, 60, 75, 90]
2220                        .into_iter()
2221                        .map(TimeInt::new_temporal)
2222                        .chain(std::iter::once(TimeInt::STATIC))
2223                        .collect(),
2224                ),
2225                ..Default::default()
2226            };
2227            eprintln!("{query:#?}:");
2228
2229            let query_handle = query_engine.query(query.clone());
2230            assert_eq!(
2231                query_engine.query(query.clone()).into_iter().count() as u64,
2232                query_handle.num_rows(),
2233            );
2234
2235            let expected_rows = query_handle.batch_iter().collect_vec();
2236
2237            for _ in 0..3 {
2238                for i in 0..expected_rows.len() {
2239                    query_handle.seek_to_row(i);
2240
2241                    let expected = concat_batches(
2242                        query_handle.schema(),
2243                        &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2244                    )?;
2245                    let got = concat_batches(
2246                        query_handle.schema(),
2247                        &query_handle.batch_iter().take(3).collect_vec(),
2248                    )?;
2249
2250                    let expected = format!("{:#?}", expected.columns());
2251                    let got = format!("{:#?}", got.columns());
2252
2253                    similar_asserts::assert_eq!(expected, got);
2254                }
2255            }
2256        }
2257
2258        // with sparse-fill
2259        {
2260            let query = QueryExpression {
2261                filtered_index,
2262                sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
2263                ..Default::default()
2264            };
2265            eprintln!("{query:#?}:");
2266
2267            let query_handle = query_engine.query(query.clone());
2268            assert_eq!(
2269                query_engine.query(query.clone()).into_iter().count() as u64,
2270                query_handle.num_rows(),
2271            );
2272
2273            let expected_rows = query_handle.batch_iter().collect_vec();
2274
2275            for _ in 0..3 {
2276                for i in 0..expected_rows.len() {
2277                    query_handle.seek_to_row(i);
2278
2279                    let expected = concat_batches(
2280                        query_handle.schema(),
2281                        &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2282                    )?;
2283                    let got = concat_batches(
2284                        query_handle.schema(),
2285                        &query_handle.batch_iter().take(3).collect_vec(),
2286                    )?;
2287
2288                    let expected = format!("{:#?}", expected.columns());
2289                    let got = format!("{:#?}", got.columns());
2290
2291                    similar_asserts::assert_eq!(expected, got);
2292                }
2293            }
2294        }
2295
2296        Ok(())
2297    }
2298
2299    #[test]
2300    fn query_static_any_values() -> anyhow::Result<()> {
2301        re_log::setup_logging();
2302
2303        let store = ChunkStore::new_handle(
2304            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
2305            ChunkStoreConfig::COMPACTION_DISABLED,
2306        );
2307
2308        let any_values = AnyValues::default()
2309            .with_component_from_data("yak", Arc::new(StringArray::from(vec!["yuk"])))
2310            .with_component_from_data("foo", Arc::new(StringArray::from(vec!["bar"])))
2311            .with_component_from_data("baz", Arc::new(UInt32Array::from(vec![42u32])));
2312
2313        let entity_path = EntityPath::from("test");
2314
2315        let chunk0 = Chunk::builder(entity_path.clone())
2316            .with_serialized_batches(
2317                RowId::new(),
2318                TimePoint::default(),
2319                any_values.as_serialized_batches(),
2320            )
2321            .build()?;
2322
2323        store.write().insert_chunk(&Arc::new(chunk0))?;
2324
2325        let engine = QueryEngine::from_store(store);
2326
2327        let query_expr = QueryExpression {
2328            view_contents: None,
2329            include_semantically_empty_columns: false,
2330            include_tombstone_columns: false,
2331            include_static_columns: re_chunk_store::StaticColumnSelection::Both,
2332            filtered_index: None,
2333            filtered_index_range: None,
2334            filtered_index_values: None,
2335            using_index_values: None,
2336            filtered_is_not_null: None,
2337            sparse_fill_strategy: re_chunk_store::SparseFillStrategy::None,
2338            selection: None,
2339        };
2340
2341        let query_handle = engine.query(query_expr);
2342
2343        let dataframe = concat_batches(
2344            query_handle.schema(),
2345            &query_handle.batch_iter().collect_vec(),
2346        )?;
2347        eprintln!("{}", format_record_batch(&dataframe.clone()));
2348
2349        assert_snapshot!(DisplayRB(dataframe));
2350
2351        Ok(())
2352    }
2353
2354    #[tokio::test]
2355    async fn async_barebones() -> anyhow::Result<()> {
2356        use tokio_stream::StreamExt as _;
2357
2358        re_log::setup_logging();
2359
2360        /// Wraps a [`QueryHandle`] in a [`Stream`].
2361        pub struct QueryHandleStream(pub QueryHandle<StorageEngine>);
2362
2363        impl tokio_stream::Stream for QueryHandleStream {
2364            type Item = ArrowRecordBatch;
2365
2366            #[inline]
2367            fn poll_next(
2368                self: std::pin::Pin<&mut Self>,
2369                cx: &mut std::task::Context<'_>,
2370            ) -> std::task::Poll<Option<Self::Item>> {
2371                let fut = self.0.next_row_batch_async();
2372                let fut = std::pin::pin!(fut);
2373
2374                use std::future::Future as _;
2375                fut.poll(cx)
2376            }
2377        }
2378
2379        let store = ChunkStoreHandle::new(create_nasty_store()?);
2380        eprintln!("{store}");
2381        let query_cache = QueryCache::new_handle(store.clone());
2382        let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
2383
2384        let engine_guard = query_engine.engine.write_arc();
2385
2386        let filtered_index = Some(TimelineName::new("frame_nr"));
2387
2388        // static
2389        let handle_static = tokio::spawn({
2390            let query_engine = query_engine.clone();
2391            async move {
2392                let query = QueryExpression::default();
2393                eprintln!("{query:#?}:");
2394
2395                let query_handle = query_engine.query(query.clone());
2396                assert_eq!(
2397                    QueryHandleStream(query_engine.query(query.clone()))
2398                        .collect::<Vec<_>>()
2399                        .await
2400                        .len() as u64,
2401                    query_handle.num_rows()
2402                );
2403                let dataframe = concat_batches(
2404                    query_handle.schema(),
2405                    &QueryHandleStream(query_engine.query(query.clone()))
2406                        .collect::<Vec<_>>()
2407                        .await,
2408                )?;
2409                eprintln!("{}", format_record_batch(&dataframe.clone()));
2410
2411                assert_snapshot!("async_barebones_static", DisplayRB(dataframe));
2412
2413                Ok::<_, anyhow::Error>(())
2414            }
2415        });
2416
2417        // temporal
2418        let handle_temporal = tokio::spawn({
2419            async move {
2420                let query = QueryExpression {
2421                    filtered_index,
2422                    ..Default::default()
2423                };
2424                eprintln!("{query:#?}:");
2425
2426                let query_handle = query_engine.query(query.clone());
2427                assert_eq!(
2428                    QueryHandleStream(query_engine.query(query.clone()))
2429                        .collect::<Vec<_>>()
2430                        .await
2431                        .len() as u64,
2432                    query_handle.num_rows()
2433                );
2434                let dataframe = concat_batches(
2435                    query_handle.schema(),
2436                    &QueryHandleStream(query_engine.query(query.clone()))
2437                        .collect::<Vec<_>>()
2438                        .await,
2439                )?;
2440                eprintln!("{}", format_record_batch(&dataframe.clone()));
2441
2442                assert_snapshot!("async_barebones_temporal", DisplayRB(dataframe));
2443
2444                Ok::<_, anyhow::Error>(())
2445            }
2446        });
2447
2448        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
2449
2450        let handle_queries = tokio::spawn(async move {
2451            let mut handle_static = std::pin::pin!(handle_static);
2452            let mut handle_temporal = std::pin::pin!(handle_temporal);
2453
2454            // Poll the query handles, just once.
2455            //
2456            // Because the storage engine is already held by a writer, this will put them in a pending state,
2457            // waiting to be woken up. If nothing wakes them up, then this will simply deadlock.
2458            {
2459                // Although it might look scary, all we're doing is crafting a noop waker manually,
2460                // because `std::task::Waker::noop` is unstable.
2461                //
2462                // We'll use this to build a noop async context, so that we can poll our promises
2463                // manually.
2464                const RAW_WAKER_NOOP: std::task::RawWaker = {
2465                    const VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
2466                        |_| RAW_WAKER_NOOP, // Cloning just returns a new no-op raw waker
2467                        |_| {},             // `wake` does nothing
2468                        |_| {},             // `wake_by_ref` does nothing
2469                        |_| {},             // Dropping does nothing as we don't allocate anything
2470                    );
2471                    std::task::RawWaker::new(std::ptr::null(), &VTABLE)
2472                };
2473
2474                #[expect(unsafe_code)]
2475                let mut cx = std::task::Context::from_waker(
2476                    // Safety: a Waker is just a privacy-preserving wrapper around a RawWaker.
2477                    unsafe {
2478                        &*std::ptr::from_ref::<std::task::RawWaker>(&RAW_WAKER_NOOP)
2479                            .cast::<std::task::Waker>()
2480                    },
2481                );
2482
2483                use std::future::Future as _;
2484                assert!(handle_static.as_mut().poll(&mut cx).is_pending());
2485                assert!(handle_temporal.as_mut().poll(&mut cx).is_pending());
2486            }
2487
2488            tx.send(()).unwrap();
2489
2490            handle_static.await??;
2491            handle_temporal.await??;
2492
2493            Ok::<_, anyhow::Error>(())
2494        });
2495
2496        rx.await?;
2497
2498        // Release the writer: the queries should now be able to stream to completion, provided
2499        // that _something_ wakes them up appropriately.
2500        drop(engine_guard);
2501
2502        handle_queries.await??;
2503
2504        Ok(())
2505    }
2506
2507    /// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps,
2508    /// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc.
2509    fn create_nasty_store() -> anyhow::Result<ChunkStore> {
2510        let mut store = ChunkStore::new(
2511            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
2512            ChunkStoreConfig::COMPACTION_DISABLED,
2513        );
2514
2515        let entity_path = EntityPath::from("/this/that");
2516
2517        let frame1 = TimeInt::new_temporal(10);
2518        let frame2 = TimeInt::new_temporal(20);
2519        let frame3 = TimeInt::new_temporal(30);
2520        let frame4 = TimeInt::new_temporal(40);
2521        let frame5 = TimeInt::new_temporal(50);
2522        let frame6 = TimeInt::new_temporal(60);
2523        let frame7 = TimeInt::new_temporal(70);
2524
2525        let points1 = MyPoint::from_iter(0..1);
2526        let points2 = MyPoint::from_iter(1..2);
2527        let points3 = MyPoint::from_iter(2..3);
2528        let points4 = MyPoint::from_iter(3..4);
2529        let points5 = MyPoint::from_iter(4..5);
2530        let points6 = MyPoint::from_iter(5..6);
2531        let points7_1 = MyPoint::from_iter(6..7);
2532        let points7_2 = MyPoint::from_iter(7..8);
2533        let points7_3 = MyPoint::from_iter(8..9);
2534
2535        let colors3 = MyColor::from_iter(2..3);
2536        let colors4 = MyColor::from_iter(3..4);
2537        let colors5 = MyColor::from_iter(4..5);
2538        let colors7 = MyColor::from_iter(6..7);
2539
2540        let labels1 = vec![MyLabel("a".to_owned())];
2541        let labels2 = vec![MyLabel("b".to_owned())];
2542        let labels3 = vec![MyLabel("c".to_owned())];
2543
2544        let row_id1_1 = RowId::new();
2545        let row_id1_3 = RowId::new();
2546        let row_id1_5 = RowId::new();
2547        let row_id1_7_1 = RowId::new();
2548        let row_id1_7_2 = RowId::new();
2549        let row_id1_7_3 = RowId::new();
2550        let chunk1_1 = Chunk::builder(entity_path.clone())
2551            .with_sparse_component_batches(
2552                row_id1_1,
2553                [build_frame_nr(frame1), build_log_time(frame1.into())],
2554                [
2555                    (MyPoints::descriptor_points(), Some(&points1 as _)),
2556                    (MyPoints::descriptor_colors(), None),
2557                    (MyPoints::descriptor_labels(), Some(&labels1 as _)), // shadowed by static
2558                ],
2559            )
2560            .with_sparse_component_batches(
2561                row_id1_3,
2562                [build_frame_nr(frame3), build_log_time(frame3.into())],
2563                [
2564                    (MyPoints::descriptor_points(), Some(&points3 as _)),
2565                    (MyPoints::descriptor_colors(), Some(&colors3 as _)),
2566                ],
2567            )
2568            .with_sparse_component_batches(
2569                row_id1_5,
2570                [build_frame_nr(frame5), build_log_time(frame5.into())],
2571                [
2572                    (MyPoints::descriptor_points(), Some(&points5 as _)),
2573                    (MyPoints::descriptor_colors(), None),
2574                ],
2575            )
2576            .with_sparse_component_batches(
2577                row_id1_7_1,
2578                [build_frame_nr(frame7), build_log_time(frame7.into())],
2579                [(MyPoints::descriptor_points(), Some(&points7_1 as _))],
2580            )
2581            .with_sparse_component_batches(
2582                row_id1_7_2,
2583                [build_frame_nr(frame7), build_log_time(frame7.into())],
2584                [(MyPoints::descriptor_points(), Some(&points7_2 as _))],
2585            )
2586            .with_sparse_component_batches(
2587                row_id1_7_3,
2588                [build_frame_nr(frame7), build_log_time(frame7.into())],
2589                [(MyPoints::descriptor_points(), Some(&points7_3 as _))],
2590            )
2591            .build()?;
2592
2593        let chunk1_1 = Arc::new(chunk1_1);
2594        store.insert_chunk(&chunk1_1)?;
2595        let chunk1_2 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new()));
2596        store.insert_chunk(&chunk1_2)?; // x2 !
2597        let chunk1_3 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new()));
2598        store.insert_chunk(&chunk1_3)?; // x3 !!
2599
2600        let row_id2_2 = RowId::new();
2601        let row_id2_3 = RowId::new();
2602        let row_id2_4 = RowId::new();
2603        let chunk2 = Chunk::builder(entity_path.clone())
2604            .with_sparse_component_batches(
2605                row_id2_2,
2606                [build_frame_nr(frame2)],
2607                [(MyPoints::descriptor_points(), Some(&points2 as _))],
2608            )
2609            .with_sparse_component_batches(
2610                row_id2_3,
2611                [build_frame_nr(frame3)],
2612                [
2613                    (MyPoints::descriptor_points(), Some(&points3 as _)),
2614                    (MyPoints::descriptor_colors(), Some(&colors3 as _)),
2615                ],
2616            )
2617            .with_sparse_component_batches(
2618                row_id2_4,
2619                [build_frame_nr(frame4)],
2620                [(MyPoints::descriptor_points(), Some(&points4 as _))],
2621            )
2622            .build()?;
2623
2624        let chunk2 = Arc::new(chunk2);
2625        store.insert_chunk(&chunk2)?;
2626
2627        let row_id3_2 = RowId::new();
2628        let row_id3_4 = RowId::new();
2629        let row_id3_6 = RowId::new();
2630        let chunk3 = Chunk::builder(entity_path.clone())
2631            .with_sparse_component_batches(
2632                row_id3_2,
2633                [build_frame_nr(frame2)],
2634                [(MyPoints::descriptor_points(), Some(&points2 as _))],
2635            )
2636            .with_sparse_component_batches(
2637                row_id3_4,
2638                [build_frame_nr(frame4)],
2639                [(MyPoints::descriptor_points(), Some(&points4 as _))],
2640            )
2641            .with_sparse_component_batches(
2642                row_id3_6,
2643                [build_frame_nr(frame6)],
2644                [(MyPoints::descriptor_points(), Some(&points6 as _))],
2645            )
2646            .build()?;
2647
2648        let chunk3 = Arc::new(chunk3);
2649        store.insert_chunk(&chunk3)?;
2650
2651        let row_id4_4 = RowId::new();
2652        let row_id4_5 = RowId::new();
2653        let row_id4_7 = RowId::new();
2654        let chunk4 = Chunk::builder(entity_path.clone())
2655            .with_sparse_component_batches(
2656                row_id4_4,
2657                [build_frame_nr(frame4)],
2658                [(MyPoints::descriptor_colors(), Some(&colors4 as _))],
2659            )
2660            .with_sparse_component_batches(
2661                row_id4_5,
2662                [build_frame_nr(frame5)],
2663                [(MyPoints::descriptor_colors(), Some(&colors5 as _))],
2664            )
2665            .with_sparse_component_batches(
2666                row_id4_7,
2667                [build_frame_nr(frame7)],
2668                [(MyPoints::descriptor_colors(), Some(&colors7 as _))],
2669            )
2670            .build()?;
2671
2672        let chunk4 = Arc::new(chunk4);
2673        store.insert_chunk(&chunk4)?;
2674
2675        let row_id5_1 = RowId::new();
2676        let chunk5 = Chunk::builder(entity_path.clone())
2677            .with_sparse_component_batches(
2678                row_id5_1,
2679                TimePoint::default(),
2680                [(MyPoints::descriptor_labels(), Some(&labels2 as _))],
2681            )
2682            .build()?;
2683
2684        let chunk5 = Arc::new(chunk5);
2685        store.insert_chunk(&chunk5)?;
2686
2687        let row_id6_1 = RowId::new();
2688        let chunk6 = Chunk::builder(entity_path.clone())
2689            .with_sparse_component_batches(
2690                row_id6_1,
2691                TimePoint::default(),
2692                [(MyPoints::descriptor_labels(), Some(&labels3 as _))],
2693            )
2694            .build()?;
2695
2696        let chunk6 = Arc::new(chunk6);
2697        store.insert_chunk(&chunk6)?;
2698
2699        Ok(store)
2700    }
2701
2702    fn extend_nasty_store_with_clears(store: &mut ChunkStore) -> anyhow::Result<()> {
2703        let entity_path = EntityPath::from("/this/that");
2704        let entity_path_parent = EntityPath::from("/this");
2705        let entity_path_root = EntityPath::from("/");
2706
2707        let frame35 = TimeInt::new_temporal(35);
2708        let frame55 = TimeInt::new_temporal(55);
2709        let frame60 = TimeInt::new_temporal(60);
2710        let frame65 = TimeInt::new_temporal(65);
2711
2712        let clear_flat = components::ClearIsRecursive(false.into());
2713        let clear_recursive = components::ClearIsRecursive(true.into());
2714
2715        let row_id1_1 = RowId::new();
2716        let chunk1 = Chunk::builder(entity_path.clone())
2717            .with_sparse_component_batches(
2718                row_id1_1,
2719                TimePoint::default(),
2720                [(
2721                    archetypes::Clear::descriptor_is_recursive(),
2722                    Some(&clear_flat as _),
2723                )],
2724            )
2725            .build()?;
2726
2727        let chunk1 = Arc::new(chunk1);
2728        store.insert_chunk(&chunk1)?;
2729
2730        // NOTE: This tombstone will never have any visible effect.
2731        //
2732        // Tombstones still obey the same rules as other all other data, specifically: if a component
2733        // has been statically logged for an entity, it shadows any temporal data for that same
2734        // component on that same entity.
2735        //
2736        // In this specific case, `this/that` already has been logged a static clear, so further temporal
2737        // clears will be ignored.
2738        //
2739        // It's pretty weird, but then again static clear semantics in general are very weird.
2740        let row_id2_1 = RowId::new();
2741        let chunk2 = Chunk::builder(entity_path.clone())
2742            .with_sparse_component_batches(
2743                row_id2_1,
2744                [build_frame_nr(frame35), build_log_time(frame35.into())],
2745                [(
2746                    archetypes::Clear::descriptor_is_recursive(),
2747                    Some(&clear_recursive as _),
2748                )],
2749            )
2750            .build()?;
2751
2752        let chunk2 = Arc::new(chunk2);
2753        store.insert_chunk(&chunk2)?;
2754
2755        let row_id3_1 = RowId::new();
2756        let chunk3 = Chunk::builder(entity_path_root.clone())
2757            .with_sparse_component_batches(
2758                row_id3_1,
2759                [build_frame_nr(frame55), build_log_time(frame55.into())],
2760                [(
2761                    archetypes::Clear::descriptor_is_recursive(),
2762                    Some(&clear_flat as _),
2763                )],
2764            )
2765            .with_sparse_component_batches(
2766                row_id3_1,
2767                [build_frame_nr(frame60), build_log_time(frame60.into())],
2768                [(
2769                    archetypes::Clear::descriptor_is_recursive(),
2770                    Some(&clear_recursive as _),
2771                )],
2772            )
2773            .with_sparse_component_batches(
2774                row_id3_1,
2775                [build_frame_nr(frame65), build_log_time(frame65.into())],
2776                [(
2777                    archetypes::Clear::descriptor_is_recursive(),
2778                    Some(&clear_flat as _),
2779                )],
2780            )
2781            .build()?;
2782
2783        let chunk3 = Arc::new(chunk3);
2784        store.insert_chunk(&chunk3)?;
2785
2786        let row_id4_1 = RowId::new();
2787        let chunk4 = Chunk::builder(entity_path_parent.clone())
2788            .with_sparse_component_batches(
2789                row_id4_1,
2790                [build_frame_nr(frame60), build_log_time(frame60.into())],
2791                [(
2792                    archetypes::Clear::descriptor_is_recursive(),
2793                    Some(&clear_flat as _),
2794                )],
2795            )
2796            .build()?;
2797
2798        let chunk4 = Arc::new(chunk4);
2799        store.insert_chunk(&chunk4)?;
2800
2801        let row_id5_1 = RowId::new();
2802        let chunk5 = Chunk::builder(entity_path_parent.clone())
2803            .with_sparse_component_batches(
2804                row_id5_1,
2805                [build_frame_nr(frame65), build_log_time(frame65.into())],
2806                [(
2807                    archetypes::Clear::descriptor_is_recursive(),
2808                    Some(&clear_recursive as _),
2809                )],
2810            )
2811            .build()?;
2812
2813        let chunk5 = Arc::new(chunk5);
2814        store.insert_chunk(&chunk5)?;
2815
2816        Ok(())
2817    }
2818}