1use std::collections::BTreeSet;
2use std::sync::OnceLock;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use arrow::array::{
6 ArrayRef as ArrowArrayRef, BooleanArray as ArrowBooleanArray,
7 PrimitiveArray as ArrowPrimitiveArray, RecordBatch as ArrowRecordBatch, RecordBatchOptions,
8};
9use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
10use arrow::datatypes::{
11 DataType as ArrowDataType, Fields as ArrowFields, Schema as ArrowSchema,
12 SchemaRef as ArrowSchemaRef,
13};
14use itertools::{Either, Itertools as _};
15use nohash_hasher::{IntMap, IntSet};
16use re_arrow_util::{ArrowArrayDowncastRef as _, into_arrow_ref};
17use re_chunk::external::arrow::array::ArrayRef;
18use re_chunk::{
19 Chunk, ComponentIdentifier, EntityPath, RangeQuery, RowId, TimeInt, TimelineName,
20 UnitChunkShared,
21};
22use re_chunk_store::{
23 ChunkStore, ColumnDescriptor, ComponentColumnDescriptor, Index, IndexColumnDescriptor,
24 IndexValue, QueryExpression, SparseFillStrategy,
25};
26use re_log::{debug_assert, debug_assert_eq, debug_panic};
27use re_log_types::AbsoluteTimeRange;
28use re_query::{QueryCache, StorageEngineLike};
29use re_sorbet::{
30 ChunkColumnDescriptors, ColumnSelector, RowIdColumnDescriptor, TimeColumnSelector,
31};
32use re_types_core::arrow_helpers::as_array_ref;
33use re_types_core::{Loggable as _, SerializedComponentColumn, archetypes};
34
35pub struct QueryHandle<E: StorageEngineLike> {
60 pub(crate) engine: E,
62
63 pub(crate) query: QueryExpression,
65
66 state: OnceLock<QueryHandleState>,
70}
71
72struct QueryHandleState {
74 view_contents: ChunkColumnDescriptors,
78
79 selected_contents: Vec<(usize, ColumnDescriptor)>,
90
91 selected_static_values: Vec<Option<UnitChunkShared>>,
97
98 filtered_index: Index,
103
104 arrow_schema: ArrowSchemaRef,
108
109 view_chunks: Vec<Vec<(AtomicU64, Chunk)>>,
126
127 cur_row: AtomicU64,
136
137 unique_index_values: Vec<IndexValue>,
143}
144
145impl<E: StorageEngineLike> QueryHandle<E> {
146 pub(crate) fn new(engine: E, query: QueryExpression) -> Self {
147 Self {
148 engine,
149 query,
150 state: Default::default(),
151 }
152 }
153}
154
155impl<E: StorageEngineLike> QueryHandle<E> {
156 #[tracing::instrument(level = "debug", skip_all)]
160 fn init(&self) -> &QueryHandleState {
161 self.engine
162 .with(|store, cache| self.state.get_or_init(|| self.init_(store, cache)))
163 }
164
165 fn init_(&self, store: &ChunkStore, cache: &QueryCache) -> QueryHandleState {
167 re_tracing::profile_scope!("init");
168
169 let filtered_index = self
171 .query
172 .filtered_index
173 .unwrap_or_else(|| TimelineName::new(""));
174
175 let view_contents_schema = store.schema_for_query(&self.query);
177 let view_contents = view_contents_schema.indices_and_components();
178
179 let selected_contents: Vec<(_, _)> = if let Some(selection) = self.query.selection.as_ref()
184 {
185 self.compute_user_selection(&view_contents, selection)
186 } else {
187 view_contents.clone().into_iter().enumerate().collect()
188 };
189
190 let arrow_schema = ArrowSchemaRef::from(ArrowSchema::new_with_metadata(
194 selected_contents
195 .iter()
196 .map(|(_, descr)| descr.to_arrow_field(re_sorbet::BatchType::Dataframe))
197 .collect::<ArrowFields>(),
198 Default::default(),
199 ));
200
201 let query = {
203 let index_range = if self.query.filtered_index.is_none() {
204 AbsoluteTimeRange::EMPTY } else if let Some(using_index_values) = self.query.using_index_values.as_ref() {
206 using_index_values
207 .first()
208 .and_then(|start| using_index_values.last().map(|end| (start, end)))
209 .map_or(AbsoluteTimeRange::EMPTY, |(start, end)| {
210 AbsoluteTimeRange::new(*start, *end)
211 })
212 } else {
213 self.query
214 .filtered_index_range
215 .unwrap_or(AbsoluteTimeRange::EVERYTHING)
216 };
217
218 RangeQuery::new(filtered_index, index_range)
219 .keep_extra_timelines(true) .keep_extra_components(false)
221 };
222 let (view_pov_chunks_idx, mut view_chunks) =
223 self.fetch_view_chunks(store, cache, &query, &view_contents);
224
225 {
229 re_tracing::profile_scope!("clear_chunks");
230
231 let clear_chunks = self.fetch_clear_chunks(store, cache, &query, &view_contents);
232 for (view_idx, chunks) in view_chunks.iter_mut().enumerate() {
233 let Some(ColumnDescriptor::Component(descr)) = view_contents.get(view_idx) else {
234 continue;
235 };
236
237 descr.sanity_check();
238
239 if let Some(clear_chunks) = clear_chunks.get(&descr.entity_path) {
244 chunks.extend(clear_chunks.iter().map(|chunk| {
245 let child_datatype = match &descr.store_datatype {
246 ArrowDataType::List(field) | ArrowDataType::LargeList(field) => {
247 field.data_type().clone()
248 }
249 ArrowDataType::Dictionary(_, datatype) => (**datatype).clone(),
250 datatype => datatype.clone(),
251 };
252
253 let mut chunk = chunk.clone();
254 #[expect(clippy::unwrap_used)]
256 chunk
257 .add_component(SerializedComponentColumn::new(
258 re_arrow_util::new_list_array_of_empties(
259 &child_datatype,
260 chunk.num_rows(),
261 ),
262 re_types_core::ComponentDescriptor {
263 component_type: descr.component_type,
264 archetype: descr.archetype,
265 component: descr.component,
266 },
267 ))
268 .unwrap();
269
270 (AtomicU64::new(0), chunk)
271 }));
272
273 chunks.sort_by_key(|(_cursor, chunk)| {
275 chunk
278 .timelines()
279 .get(&filtered_index)
280 .map(|time_column| time_column.time_range())
281 .map_or(TimeInt::STATIC, |time_range| time_range.min())
282 });
283 }
284 }
285 }
286
287 let unique_index_values = if self.query.filtered_index.is_none() {
291 vec![TimeInt::STATIC]
292 } else if let Some(using_index_values) = self.query.using_index_values.as_ref() {
293 using_index_values
294 .iter()
295 .filter(|index_value| !index_value.is_static())
296 .copied()
297 .collect_vec()
298 } else {
299 re_tracing::profile_scope!("index_values");
300
301 let mut view_chunks = view_chunks.iter();
302 let view_chunks = if let Some(view_pov_chunks_idx) = view_pov_chunks_idx {
303 Either::Left(view_chunks.nth(view_pov_chunks_idx).into_iter())
304 } else {
305 Either::Right(view_chunks)
306 };
307
308 let mut all_unique_index_values: BTreeSet<TimeInt> = view_chunks
309 .flat_map(|chunks| {
310 chunks.iter().filter_map(|(_cursor, chunk)| {
311 chunk
312 .timelines()
313 .get(&filtered_index)
314 .map(|time_column| time_column.times())
315 })
316 })
317 .flatten()
318 .collect();
319
320 if let Some(filtered_index_values) = self.query.filtered_index_values.as_ref() {
321 all_unique_index_values.retain(|time| filtered_index_values.contains(time));
322 }
323
324 all_unique_index_values
325 .into_iter()
326 .filter(|index_value| !index_value.is_static())
327 .collect_vec()
328 };
329
330 let selected_static_values = {
331 re_tracing::profile_scope!("static_values");
332
333 selected_contents
334 .iter()
335 .map(|(_view_idx, descr)| match descr {
336 ColumnDescriptor::RowId(_) | ColumnDescriptor::Time(_) => None,
337 ColumnDescriptor::Component(descr) => {
338 descr.sanity_check();
339
340 let query =
341 re_chunk::LatestAtQuery::new(TimelineName::new(""), TimeInt::STATIC);
342
343 let results =
344 cache.latest_at(&query, &descr.entity_path, [descr.component]);
345
346 results.components.into_values().next()
347 }
348 })
349 .collect_vec()
350 };
351
352 for (_, descr) in &selected_contents {
353 descr.sanity_check();
354 }
355
356 QueryHandleState {
357 view_contents: view_contents_schema,
358 selected_contents,
359 selected_static_values,
360 filtered_index,
361 arrow_schema,
362 view_chunks,
363 cur_row: AtomicU64::new(0),
364 unique_index_values,
365 }
366 }
367
368 #[tracing::instrument(level = "info", skip_all)]
369 #[expect(clippy::unused_self)]
370 fn compute_user_selection(
371 &self,
372 view_contents: &[ColumnDescriptor],
373 selection: &[ColumnSelector],
374 ) -> Vec<(usize, ColumnDescriptor)> {
375 selection
376 .iter()
377 .map(|column| match column {
378 ColumnSelector::RowId => view_contents
379 .iter()
380 .enumerate()
381 .find_map(|(idx, view_column)| {
382 if let ColumnDescriptor::RowId(descr) = view_column {
383 Some((idx, ColumnDescriptor::RowId(descr.clone())))
384 } else {
385 None
386 }
387 })
388 .unwrap_or_else(|| {
389 (
390 usize::MAX,
391 ColumnDescriptor::RowId(RowIdColumnDescriptor::from_sorted(false)),
392 )
393 }),
394
395 ColumnSelector::Time(selected_column) => {
396 let TimeColumnSelector {
397 timeline: selected_timeline,
398 } = selected_column;
399
400 view_contents
401 .iter()
402 .enumerate()
403 .filter_map(|(idx, view_column)| {
404 if let ColumnDescriptor::Time(view_descr) = view_column {
405 Some((idx, view_descr))
406 } else {
407 None
408 }
409 })
410 .find(|(_idx, view_descr)| {
411 *view_descr.timeline().name() == *selected_timeline
412 })
413 .map_or_else(
414 || {
415 (
416 usize::MAX,
417 ColumnDescriptor::Time(IndexColumnDescriptor::new_null(
418 *selected_timeline,
419 )),
420 )
421 },
422 |(idx, view_descr)| (idx, ColumnDescriptor::Time(view_descr.clone())),
423 )
424 }
425
426 ColumnSelector::Component(selected_column) => view_contents
427 .iter()
428 .enumerate()
429 .filter_map(|(idx, view_column)| {
430 if let ColumnDescriptor::Component(view_descr) = view_column {
431 Some((idx, view_descr))
432 } else {
433 None
434 }
435 })
436 .find(|(_idx, view_descr)| view_descr.matches(selected_column))
437 .map_or_else(
438 || {
439 (
440 usize::MAX,
441 ColumnDescriptor::Component(ComponentColumnDescriptor {
442 entity_path: selected_column.entity_path.clone(),
443 archetype: None,
444 component: selected_column.component.as_str().into(),
445 component_type: None,
446 store_datatype: ArrowDataType::Null,
447 is_static: false,
448 is_tombstone: false,
449 is_semantically_empty: false,
450 }),
451 )
452 },
453 |(idx, view_descr)| (idx, ColumnDescriptor::Component(view_descr.clone())),
454 ),
455 })
456 .collect_vec()
457 }
458
459 fn fetch_view_chunks(
460 &self,
461 store: &ChunkStore,
462 cache: &QueryCache,
463 query: &RangeQuery,
464 view_contents: &[ColumnDescriptor],
465 ) -> (Option<usize>, Vec<Vec<(AtomicU64, Chunk)>>) {
466 let mut view_pov_chunks_idx = self.query.filtered_is_not_null.as_ref().map(|_| usize::MAX);
467
468 let view_chunks = view_contents
469 .iter()
470 .enumerate()
471 .map(|(idx, selected_column)| match selected_column {
472 ColumnDescriptor::RowId(_) | ColumnDescriptor::Time(_) => Vec::new(),
473
474 ColumnDescriptor::Component(column) => {
475 let chunks = self
476 .fetch_chunks(store, cache, query, &column.entity_path, [column.component])
477 .unwrap_or_default();
478
479 if let Some(pov) = self.query.filtered_is_not_null.as_ref()
480 && column.matches(pov)
481 {
482 view_pov_chunks_idx = Some(idx);
483 }
484
485 chunks
486 }
487 })
488 .collect();
489
490 (view_pov_chunks_idx, view_chunks)
491 }
492
493 fn fetch_clear_chunks(
498 &self,
499 store: &ChunkStore,
500 cache: &QueryCache,
501 query: &RangeQuery,
502 view_contents: &[ColumnDescriptor],
503 ) -> IntMap<EntityPath, Vec<Chunk>> {
504 fn entity_path_ancestors(
508 entity_path: &EntityPath,
509 ) -> impl Iterator<Item = EntityPath> + use<> {
510 std::iter::from_fn({
511 let mut entity_path = entity_path.parent();
512 move || {
513 let yielded = entity_path.clone()?;
514 entity_path = yielded.parent();
515 Some(yielded)
516 }
517 })
518 }
519
520 fn chunk_filter_recursive_only(chunk: &Chunk) -> Option<Chunk> {
526 let list_array = chunk
527 .components()
528 .get_array(archetypes::Clear::descriptor_is_recursive().component)?;
529
530 let values = list_array
531 .values()
532 .downcast_array_ref::<ArrowBooleanArray>()?;
533
534 let indices = ArrowPrimitiveArray::from(
535 values
536 .iter()
537 .enumerate()
538 .filter_map(|(index, is_recursive)| {
539 #[expect(clippy::cast_possible_wrap)]
541 (is_recursive == Some(true)).then_some(index as i32)
542 })
543 .collect_vec(),
544 );
545
546 let chunk = chunk.taken(&indices);
547
548 (!chunk.is_empty()).then_some(chunk)
549 }
550
551 let components = [archetypes::Clear::descriptor_is_recursive().component];
552
553 let entity_paths: IntSet<EntityPath> = view_contents
555 .iter()
556 .filter_map(|col| col.entity_path().cloned())
557 .collect();
558
559 entity_paths
560 .iter()
561 .filter_map(|entity_path| {
562 let flat_chunks = self
565 .fetch_chunks(store, cache, query, entity_path, components)
566 .map(|chunks| {
567 chunks
568 .into_iter()
569 .map(|(_cursor, chunk)| chunk)
570 .collect_vec()
571 })
572 .unwrap_or_default();
573
574 let recursive_chunks =
575 entity_path_ancestors(entity_path).flat_map(|ancestor_path| {
576 self.fetch_chunks(store, cache, query, &ancestor_path, components)
577 .into_iter() .flat_map(|chunks| chunks.into_iter().map(|(_cursor, chunk)| chunk))
579 .filter_map(|chunk| chunk_filter_recursive_only(&chunk))
581 });
582
583 let chunks = flat_chunks
584 .into_iter()
585 .chain(recursive_chunks)
586 .map(|chunk| chunk.components_removed())
589 .collect_vec();
590
591 (!chunks.is_empty()).then(|| (entity_path.clone(), chunks))
592 })
593 .collect()
594 }
595
596 fn fetch_chunks(
597 &self,
598 _store: &ChunkStore,
599 cache: &QueryCache,
600 query: &RangeQuery,
601 entity_path: &EntityPath,
602 components: impl IntoIterator<Item = ComponentIdentifier>,
603 ) -> Option<Vec<(AtomicU64, Chunk)>> {
604 let results = cache.range(query, entity_path, components);
611
612 debug_assert!(
613 results.components.len() <= 1,
614 "cannot possibly get more than one component with this query"
615 );
616
617 results
618 .components
619 .into_iter()
620 .next()
621 .map(|(_component_descr, chunks)| {
622 chunks
623 .into_iter()
624 .map(|chunk| {
625 debug_assert!(
633 if let Some(index) = self.query.filtered_index.as_ref() {
634 chunk.is_timeline_sorted(index)
635 } else {
636 chunk.is_sorted()
637 },
638 "the query cache should have already taken care of sorting (and densifying!) the chunk",
639 );
640
641 (AtomicU64::default(), chunk)
647 })
648 .collect_vec()
649 })
650 }
651
652 #[inline]
654 pub fn query(&self) -> &QueryExpression {
655 &self.query
656 }
657
658 #[inline]
662 pub fn view_contents(&self) -> &ChunkColumnDescriptors {
663 &self.init().view_contents
664 }
665
666 #[inline]
672 pub fn selected_contents(&self) -> &[(usize, ColumnDescriptor)] {
673 &self.init().selected_contents
674 }
675
676 #[inline]
680 pub fn schema(&self) -> &ArrowSchemaRef {
681 &self.init().arrow_schema
682 }
683
684 #[tracing::instrument(level = "trace", skip_all)]
701 #[inline]
702 pub fn seek_to_row(&self, row_idx: usize) {
703 let state = self.init();
704
705 let Some(index_value) = state.unique_index_values.get(row_idx) else {
706 return;
707 };
708
709 state.cur_row.store(row_idx as _, Ordering::Relaxed);
710 self.seek_to_index_value(*index_value);
711 }
712
713 #[tracing::instrument(level = "debug", skip_all)]
731 fn seek_to_index_value(&self, index_value: IndexValue) {
732 re_tracing::profile_function!();
733
734 let state = self.init();
735
736 if index_value.is_static() {
737 for chunks in &state.view_chunks {
738 for (cursor, _chunk) in chunks {
739 cursor.store(0, Ordering::Relaxed);
740 }
741 }
742 return;
743 }
744
745 for chunks in &state.view_chunks {
746 for (cursor, chunk) in chunks {
747 let Some(time_column) = chunk.timelines().get(&state.filtered_index) else {
750 continue;
751 };
752
753 let time_range = time_column.time_range();
754
755 let new_cursor = if index_value < time_range.min() {
756 0
757 } else if index_value > time_range.max() {
758 chunk.num_rows() as u64 } else {
760 time_column
761 .times_raw()
762 .partition_point(|&time| time < index_value.as_i64())
763 as u64
764 };
765
766 cursor.store(new_cursor, Ordering::Relaxed);
767 }
768 }
769 }
770
771 pub fn num_rows(&self) -> u64 {
776 self.init().unique_index_values.len() as _
777 }
778
779 pub fn row_index_at_or_before_time(&self, time: TimeInt) -> Option<u64> {
782 let state = self.init();
783 let idx = state.unique_index_values.partition_point(|t| *t <= time);
784 if idx == 0 {
785 None
786 } else {
787 Some((idx - 1) as u64)
788 }
789 }
790
791 #[inline]
817 pub fn next_row(&self) -> Option<Vec<ArrayRef>> {
818 self.engine
819 .with(|store, cache| self._next_row(store, cache))
820 }
821
822 #[cfg(not(target_arch = "wasm32"))]
838 pub fn next_row_async(
839 &self,
840 ) -> impl std::future::Future<Output = Option<Vec<ArrayRef>>> + use<E>
841 where
842 E: 'static + Send + Clone,
843 {
844 let res: Option<Option<_>> = self
845 .engine
846 .try_with(|store, cache| self._next_row(store, cache));
847
848 let engine = self.engine.clone();
849 std::future::poll_fn(move |cx| {
850 if let Some(row) = &res {
851 std::task::Poll::Ready(row.clone())
852 } else {
853 rayon::spawn({
860 let engine = engine.clone();
861 let waker = cx.waker().clone();
862 move || {
863 engine.with(|_store, _cache| {
864 waker.wake();
867 });
868 }
869 });
870
871 std::task::Poll::Pending
872 }
873 })
874 }
875
876 #[tracing::instrument(level = "debug", skip_all)]
877 pub fn _next_row(&self, store: &ChunkStore, cache: &QueryCache) -> Option<Vec<ArrowArrayRef>> {
878 re_tracing::profile_function!();
879
880 #[derive(Debug)]
882 struct StreamingJoinStateEntry<'a> {
883 chunk: &'a Chunk,
885
886 cursor: u64,
888
889 row_id: RowId,
891 }
892
893 #[derive(Debug)]
897 enum StreamingJoinState<'a> {
898 StreamingJoinState(StreamingJoinStateEntry<'a>),
900
901 Retrofilled(UnitChunkShared),
905 }
906
907 let state = self.state.get_or_init(move || self.init_(store, cache));
911
912 let row_idx = state.cur_row.fetch_add(1, Ordering::Relaxed);
913 let cur_index_value = state.unique_index_values.get(row_idx as usize)?;
914
915 let mut view_streaming_state: Vec<Option<StreamingJoinStateEntry<'_>>> =
920 std::iter::repeat(())
923 .map(|_| None)
924 .take(state.view_chunks.len())
925 .collect();
926 for (view_column_idx, view_chunks) in state.view_chunks.iter().enumerate() {
927 let streaming_state = &mut view_streaming_state[view_column_idx];
928
929 'overlaps: for (cur_cursor, cur_chunk) in view_chunks {
930 let mut cur_cursor_value = cur_cursor.load(Ordering::Relaxed);
936
937 let cur_index_times_empty: &[i64] = &[];
938 let cur_index_times = cur_chunk
939 .timelines()
940 .get(&state.filtered_index)
941 .map_or(cur_index_times_empty, |time_column| time_column.times_raw());
942 let cur_index_row_ids = cur_chunk.row_ids_slice();
943
944 let (index_value, cur_row_id) = 'walk: loop {
945 let (Some(mut index_value), Some(mut cur_row_id)) = (
946 cur_index_times
947 .get(cur_cursor_value as usize)
948 .copied()
949 .map(TimeInt::new_temporal),
950 cur_index_row_ids.get(cur_cursor_value as usize).copied(),
951 ) else {
952 continue 'overlaps;
953 };
954
955 if index_value == *cur_index_value {
956 while let (Some(next_index_value), Some(next_row_id)) = (
960 cur_index_times
961 .get(cur_cursor_value as usize + 1)
962 .copied()
963 .map(TimeInt::new_temporal),
964 cur_index_row_ids
965 .get(cur_cursor_value as usize + 1)
966 .copied(),
967 ) {
968 if next_index_value == *cur_index_value {
969 index_value = next_index_value;
970 cur_row_id = next_row_id;
971 cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
972 } else {
973 break;
974 }
975 }
976
977 break 'walk (index_value, cur_row_id);
978 }
979
980 if index_value > *cur_index_value {
981 continue 'overlaps;
982 }
983
984 cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
985 };
986
987 debug_assert_eq!(index_value, *cur_index_value);
988
989 if let Some(streaming_state) = streaming_state.as_mut() {
990 let StreamingJoinStateEntry {
991 chunk,
992 cursor,
993 row_id,
994 } = streaming_state;
995
996 if cur_row_id > *row_id {
997 *chunk = cur_chunk;
998 *cursor = cur_cursor_value;
999 *row_id = cur_row_id;
1000 }
1001 } else {
1002 *streaming_state = Some(StreamingJoinStateEntry {
1003 chunk: cur_chunk,
1004 cursor: cur_cursor_value,
1005 row_id: cur_row_id,
1006 });
1007 }
1008 }
1009 }
1010
1011 let mut view_streaming_state = view_streaming_state
1012 .into_iter()
1013 .map(|streaming_state| streaming_state.map(StreamingJoinState::StreamingJoinState))
1014 .collect_vec();
1015
1016 for (selected_idx, static_state) in state.selected_static_values.iter().enumerate() {
1018 if let static_state @ Some(_) =
1019 static_state.clone().map(StreamingJoinState::Retrofilled)
1020 {
1021 let Some(view_idx) = state
1022 .selected_contents
1023 .get(selected_idx)
1024 .map(|(view_idx, _)| *view_idx)
1025 else {
1026 debug_panic!("selected_idx out of bounds");
1027 continue;
1028 };
1029
1030 let Some(streaming_state) = view_streaming_state.get_mut(view_idx) else {
1031 debug_panic!("view_idx out of bounds");
1032 continue;
1033 };
1034
1035 *streaming_state = static_state;
1036 }
1037 }
1038
1039 match self.query.sparse_fill_strategy {
1040 SparseFillStrategy::None => {}
1041
1042 SparseFillStrategy::LatestAtGlobal => {
1043 let null_streaming_states = view_streaming_state
1045 .iter_mut()
1046 .enumerate()
1047 .filter(|(_view_idx, streaming_state)| streaming_state.is_none());
1048
1049 for (view_idx, streaming_state) in null_streaming_states {
1050 let Some(ColumnDescriptor::Component(descr)) =
1051 state.view_contents.get_index_or_component(view_idx)
1052 else {
1053 continue;
1054 };
1055
1056 let query =
1069 re_chunk::LatestAtQuery::new(state.filtered_index, *cur_index_value);
1070
1071 let results =
1072 cache.latest_at(&query, &descr.entity_path.clone(), [descr.component]);
1073
1074 *streaming_state = results
1075 .components
1076 .into_values()
1077 .next()
1078 .map(|unit| StreamingJoinState::Retrofilled(unit.clone()));
1079 }
1080 }
1081 }
1082
1083 let mut max_value_per_index: IntMap<TimelineName, (TimeInt, ArrowScalarBuffer<i64>)> =
1101 IntMap::default();
1102 {
1103 view_streaming_state
1104 .iter()
1105 .flatten()
1106 .flat_map(|streaming_state| {
1107 match streaming_state {
1108 StreamingJoinState::StreamingJoinState(s) => s.chunk.timelines(),
1109 StreamingJoinState::Retrofilled(unit) => unit.timelines(),
1110 }
1111 .values()
1112 .filter_map(move |time_column| {
1114 let cursor = match streaming_state {
1115 StreamingJoinState::StreamingJoinState(s) => s.cursor as usize,
1116 StreamingJoinState::Retrofilled(_) => 0,
1117 };
1118 time_column
1119 .times_raw()
1120 .get(cursor)
1121 .copied()
1122 .map(TimeInt::new_temporal)
1123 .map(|time| {
1124 (
1125 *time_column.timeline(),
1126 (time, time_column.times_buffer().slice(cursor, 1)),
1127 )
1128 })
1129 })
1130 })
1131 .for_each(|(timeline, (time, time_sliced))| {
1132 max_value_per_index
1133 .entry(*timeline.name())
1134 .and_modify(|(max_time, max_time_sliced)| {
1135 if time > *max_time {
1136 *max_time = time;
1137 *max_time_sliced = time_sliced.clone();
1138 }
1139 })
1140 .or_insert((time, time_sliced));
1141 });
1142
1143 if !cur_index_value.is_static() {
1144 max_value_per_index.insert(
1147 state.filtered_index,
1148 (
1149 *cur_index_value,
1150 ArrowScalarBuffer::from(vec![cur_index_value.as_i64()]),
1151 ),
1152 );
1153 }
1154 }
1155
1156 let view_sliced_arrays: Vec<Option<_>> = view_streaming_state
1160 .iter()
1161 .enumerate()
1162 .map(|(view_idx, streaming_state)| {
1163 let streaming_state = streaming_state.as_ref()?;
1166 let list_array = match streaming_state {
1167 StreamingJoinState::StreamingJoinState(s) => {
1168 debug_assert!(
1169 s.chunk.components().iter().count() <= 1,
1170 "cannot possibly get more than one component with this query"
1171 );
1172
1173 s.chunk
1174 .components()
1175 .list_arrays()
1176 .next()
1177 .map(|list_array| list_array.slice(s.cursor as usize, 1))
1178
1179 }
1180
1181 StreamingJoinState::Retrofilled(unit) => {
1182 let component_desc = state.view_contents.get_index_or_component(view_idx).and_then(|col| if let ColumnDescriptor::Component(descr) = col {
1183 if let Some(component_type) = descr.component_type { component_type.sanity_check(); }
1184 Some(re_types_core::ComponentDescriptor {
1185 component_type: descr.component_type,
1186 archetype: descr.archetype,
1187 component: descr.component,
1188 })
1189 } else {
1190 None
1191 })?;
1192 unit.components().get_array(component_desc.component).cloned()
1193 }
1194 };
1195
1196
1197 debug_assert!(
1198 list_array.is_some(),
1199 "This must exist or the chunk wouldn't have been sliced/retrofilled to start with."
1200 );
1201
1202 list_array
1204 })
1205 .collect();
1206
1207 let selected_arrays = state
1211 .selected_contents
1212 .iter()
1213 .map(|(view_idx, column)| match column {
1214 ColumnDescriptor::RowId(_) => state
1215 .view_chunks
1216 .first()
1217 .and_then(|vec| vec.first()) .map(|(row_idx, chunk)| {
1219 as_array_ref(
1220 chunk
1221 .row_ids_array()
1222 .slice(row_idx.load(Ordering::Acquire) as _, 1),
1223 )
1224 })
1225 .unwrap_or_else(|| arrow::array::new_null_array(&RowId::arrow_datatype(), 1)),
1226
1227 ColumnDescriptor::Time(descr) => max_value_per_index
1228 .get(descr.timeline().name())
1229 .map_or_else(
1230 || arrow::array::new_null_array(&column.arrow_datatype(), 1),
1231 |(_time, time_sliced)| {
1232 descr.timeline().typ().make_arrow_array(time_sliced.clone())
1233 },
1234 ),
1235
1236 ColumnDescriptor::Component(_descr) => view_sliced_arrays
1237 .get(*view_idx)
1238 .cloned()
1239 .flatten()
1240 .map(into_arrow_ref)
1241 .unwrap_or_else(|| arrow::array::new_null_array(&column.arrow_datatype(), 1)),
1242 })
1243 .collect_vec();
1244
1245 debug_assert_eq!(state.arrow_schema.fields.len(), selected_arrays.len());
1246
1247 Some(selected_arrays)
1248 }
1249
1250 #[inline]
1257 pub fn next_row_batch(&self) -> Option<ArrowRecordBatch> {
1258 let row = self.next_row()?;
1259 match ArrowRecordBatch::try_new_with_options(
1260 self.schema().clone(),
1261 row,
1262 &RecordBatchOptions::new().with_row_count(Some(1)),
1264 ) {
1265 Ok(batch) => Some(batch),
1266 Err(err) => {
1267 if cfg!(debug_assertions) {
1268 panic!("Failed to create record batch: {err}");
1269 } else {
1270 re_log::error_once!("Failed to create record batch: {err}");
1271 None
1272 }
1273 }
1274 }
1275 }
1276
1277 #[inline]
1278 #[cfg(not(target_arch = "wasm32"))]
1279 pub async fn next_row_batch_async(&self) -> Option<ArrowRecordBatch>
1280 where
1281 E: 'static + Send + Clone,
1282 {
1283 let row = self.next_row_async().await?;
1284 let row_count = row.first().map(|a| a.len()).unwrap_or(0);
1285
1286 #[expect(clippy::unwrap_used)]
1288 let schema = self.state.get().unwrap().arrow_schema.clone();
1289
1290 ArrowRecordBatch::try_new_with_options(
1291 schema,
1292 row,
1293 &RecordBatchOptions::default().with_row_count(Some(row_count)),
1294 )
1295 .ok()
1296 }
1297}
1298
1299impl<E: StorageEngineLike> QueryHandle<E> {
1300 pub fn iter(&self) -> impl Iterator<Item = Vec<ArrowArrayRef>> + '_ {
1302 std::iter::from_fn(move || self.next_row())
1303 }
1304
1305 #[expect(clippy::should_implement_trait)] pub fn into_iter(self) -> impl Iterator<Item = Vec<ArrowArrayRef>> {
1308 std::iter::from_fn(move || self.next_row())
1309 }
1310
1311 pub fn batch_iter(&self) -> impl Iterator<Item = ArrowRecordBatch> + '_ {
1313 std::iter::from_fn(move || self.next_row_batch())
1314 }
1315
1316 pub fn into_batch_iter(self) -> impl Iterator<Item = ArrowRecordBatch> {
1318 std::iter::from_fn(move || self.next_row_batch())
1319 }
1320}
1321
1322#[cfg(test)]
1325#[expect(clippy::iter_on_single_items)]
1326mod tests {
1327 use std::sync::Arc;
1328
1329 use arrow::array::{StringArray, UInt32Array};
1330 use arrow::compute::concat_batches;
1331 use insta::assert_snapshot;
1332 use re_arrow_util::format_record_batch;
1333 use re_chunk::{Chunk, ChunkId, ComponentIdentifier, RowId, TimePoint};
1334 use re_chunk_store::{
1335 AbsoluteTimeRange, ChunkStore, ChunkStoreConfig, ChunkStoreHandle, QueryExpression, TimeInt,
1336 };
1337 use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints};
1338 use re_log_types::{EntityPath, Timeline, build_frame_nr, build_log_time};
1339 use re_query::StorageEngine;
1340 use re_sdk_types::{AnyValues, AsComponents as _, ComponentDescriptor};
1341 use re_sorbet::ComponentColumnSelector;
1342 use re_types_core::components;
1343
1344 use super::*;
1345 use crate::{QueryCache, QueryEngine};
1346
1347 struct DisplayRB(ArrowRecordBatch);
1349
1350 impl std::fmt::Display for DisplayRB {
1351 #[inline]
1352 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1353 let width = 200;
1354 re_arrow_util::format_record_batch_with_width(&self.0, Some(width), f.sign_minus())
1355 .fmt(f)
1356 }
1357 }
1358
1359 #[test]
1388 fn barebones() -> anyhow::Result<()> {
1389 re_log::setup_logging();
1390
1391 let store = ChunkStoreHandle::new(create_nasty_store()?);
1392 eprintln!("{store}");
1393 let query_cache = QueryCache::new_handle(store.clone());
1394 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1395
1396 let filtered_index = Some(TimelineName::new("frame_nr"));
1397
1398 {
1400 let query = QueryExpression::default();
1401 eprintln!("{query:#?}:");
1402
1403 let query_handle = query_engine.query(query.clone());
1404 assert_eq!(
1405 query_engine.query(query.clone()).into_iter().count() as u64,
1406 query_handle.num_rows()
1407 );
1408 let dataframe = concat_batches(
1409 query_handle.schema(),
1410 &query_handle.batch_iter().collect_vec(),
1411 )?;
1412 eprintln!("{}", format_record_batch(&dataframe.clone()));
1413
1414 assert_snapshot!(DisplayRB(dataframe));
1415 }
1416
1417 {
1419 let query = QueryExpression {
1420 filtered_index,
1421 ..Default::default()
1422 };
1423 eprintln!("{query:#?}:");
1424
1425 let query_handle = query_engine.query(query.clone());
1426 assert_eq!(
1427 query_engine.query(query.clone()).into_iter().count() as u64,
1428 query_handle.num_rows()
1429 );
1430 let dataframe = concat_batches(
1431 query_handle.schema(),
1432 &query_handle.batch_iter().collect_vec(),
1433 )?;
1434 eprintln!("{}", format_record_batch(&dataframe.clone()));
1435
1436 assert_snapshot!(DisplayRB(dataframe));
1437 }
1438
1439 Ok(())
1440 }
1441
1442 #[test]
1443 fn sparse_fill_strategy_latestatglobal() -> anyhow::Result<()> {
1444 re_log::setup_logging();
1445
1446 let store = ChunkStoreHandle::new(create_nasty_store()?);
1447 eprintln!("{store}");
1448 let query_cache = QueryCache::new_handle(store.clone());
1449 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1450
1451 let filtered_index = Some(TimelineName::new("frame_nr"));
1452 let query = QueryExpression {
1453 filtered_index,
1454 sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
1455 ..Default::default()
1456 };
1457 eprintln!("{query:#?}:");
1458
1459 let query_handle = query_engine.query(query.clone());
1460 assert_eq!(
1461 query_engine.query(query.clone()).into_iter().count() as u64,
1462 query_handle.num_rows()
1463 );
1464 let dataframe = concat_batches(
1465 query_handle.schema(),
1466 &query_handle.batch_iter().collect_vec(),
1467 )?;
1468 eprintln!("{}", format_record_batch(&dataframe.clone()));
1469
1470 assert_snapshot!(DisplayRB(dataframe));
1471
1472 Ok(())
1473 }
1474
1475 #[test]
1476 fn filtered_index_range() -> anyhow::Result<()> {
1477 re_log::setup_logging();
1478
1479 let store = ChunkStoreHandle::new(create_nasty_store()?);
1480 eprintln!("{store}");
1481 let query_cache = QueryCache::new_handle(store.clone());
1482 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1483
1484 let filtered_index = Some(TimelineName::new("frame_nr"));
1485 let query = QueryExpression {
1486 filtered_index,
1487 filtered_index_range: Some(AbsoluteTimeRange::new(30, 60)),
1488 ..Default::default()
1489 };
1490 eprintln!("{query:#?}:");
1491
1492 let query_handle = query_engine.query(query.clone());
1493 assert_eq!(
1494 query_engine.query(query.clone()).into_iter().count() as u64,
1495 query_handle.num_rows()
1496 );
1497 let dataframe = concat_batches(
1498 query_handle.schema(),
1499 &query_handle.batch_iter().collect_vec(),
1500 )?;
1501 eprintln!("{}", format_record_batch(&dataframe.clone()));
1502
1503 assert_snapshot!(DisplayRB(dataframe));
1504
1505 Ok(())
1506 }
1507
1508 #[test]
1509 fn filtered_index_values() -> anyhow::Result<()> {
1510 re_log::setup_logging();
1511
1512 let store = ChunkStoreHandle::new(create_nasty_store()?);
1513 eprintln!("{store}");
1514 let query_cache = QueryCache::new_handle(store.clone());
1515 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1516
1517 let filtered_index = Some(TimelineName::new("frame_nr"));
1518 let query = QueryExpression {
1519 filtered_index,
1520 filtered_index_values: Some(
1521 [0, 30, 60, 90]
1522 .into_iter()
1523 .map(TimeInt::new_temporal)
1524 .chain(std::iter::once(TimeInt::STATIC))
1525 .collect(),
1526 ),
1527 ..Default::default()
1528 };
1529 eprintln!("{query:#?}:");
1530
1531 let query_handle = query_engine.query(query.clone());
1532 assert_eq!(
1533 query_engine.query(query.clone()).into_iter().count() as u64,
1534 query_handle.num_rows()
1535 );
1536 let dataframe = concat_batches(
1537 query_handle.schema(),
1538 &query_handle.batch_iter().collect_vec(),
1539 )?;
1540 eprintln!("{}", format_record_batch(&dataframe.clone()));
1541
1542 assert_snapshot!(DisplayRB(dataframe));
1543
1544 Ok(())
1545 }
1546
1547 #[test]
1548 fn using_index_values() -> anyhow::Result<()> {
1549 re_log::setup_logging();
1550
1551 let store = ChunkStoreHandle::new(create_nasty_store()?);
1552 eprintln!("{store}");
1553 let query_cache = QueryCache::new_handle(store.clone());
1554 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1555
1556 let filtered_index = Some(TimelineName::new("frame_nr"));
1557
1558 {
1560 let query = QueryExpression {
1561 filtered_index,
1562 using_index_values: Some(
1563 [0, 15, 30, 30, 45, 60, 75, 90]
1564 .into_iter()
1565 .map(TimeInt::new_temporal)
1566 .chain(std::iter::once(TimeInt::STATIC))
1567 .collect(),
1568 ),
1569 ..Default::default()
1570 };
1571 eprintln!("{query:#?}:");
1572
1573 let query_handle = query_engine.query(query.clone());
1574 assert_eq!(
1575 query_engine.query(query.clone()).into_iter().count() as u64,
1576 query_handle.num_rows()
1577 );
1578 let dataframe = concat_batches(
1579 query_handle.schema(),
1580 &query_handle.batch_iter().collect_vec(),
1581 )?;
1582 eprintln!("{}", format_record_batch(&dataframe.clone()));
1583
1584 assert_snapshot!(DisplayRB(dataframe));
1585 }
1586
1587 {
1589 let query = QueryExpression {
1590 filtered_index,
1591 using_index_values: Some(
1592 [0, 15, 30, 30, 45, 60, 75, 90]
1593 .into_iter()
1594 .map(TimeInt::new_temporal)
1595 .chain(std::iter::once(TimeInt::STATIC))
1596 .collect(),
1597 ),
1598 sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
1599 ..Default::default()
1600 };
1601 eprintln!("{query:#?}:");
1602
1603 let query_handle = query_engine.query(query.clone());
1604 assert_eq!(
1605 query_engine.query(query.clone()).into_iter().count() as u64,
1606 query_handle.num_rows()
1607 );
1608 let dataframe = concat_batches(
1609 query_handle.schema(),
1610 &query_handle.batch_iter().collect_vec(),
1611 )?;
1612 eprintln!("{}", format_record_batch(&dataframe.clone()));
1613
1614 assert_snapshot!(DisplayRB(dataframe));
1615 }
1616
1617 Ok(())
1618 }
1619
1620 #[test]
1621 fn filtered_is_not_null() -> anyhow::Result<()> {
1622 re_log::setup_logging();
1623
1624 let store = ChunkStoreHandle::new(create_nasty_store()?);
1625 eprintln!("{store}");
1626 let query_cache = QueryCache::new_handle(store.clone());
1627 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1628
1629 let filtered_index = Some(TimelineName::new("frame_nr"));
1630 let entity_path: EntityPath = "this/that".into();
1631
1632 {
1634 let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
1635
1636 let query = QueryExpression {
1637 filtered_index,
1638 filtered_is_not_null: Some(ComponentColumnSelector {
1639 entity_path: "no/such/entity".into(),
1640 component: component.to_string(),
1641 }),
1642 ..Default::default()
1643 };
1644 eprintln!("{query:#?}:");
1645
1646 let query_handle = query_engine.query(query.clone());
1647 assert_eq!(
1648 query_engine.query(query.clone()).into_iter().count() as u64,
1649 query_handle.num_rows()
1650 );
1651 let dataframe = concat_batches(
1652 query_handle.schema(),
1653 &query_handle.batch_iter().collect_vec(),
1654 )?;
1655 eprintln!("{}", format_record_batch(&dataframe.clone()));
1656
1657 assert_snapshot!(DisplayRB(dataframe));
1658 }
1659
1660 {
1662 let query = QueryExpression {
1663 filtered_index,
1664 filtered_is_not_null: Some(ComponentColumnSelector {
1665 entity_path: entity_path.clone(),
1666 component: "AFieldThatDoesntExist".to_owned(),
1667 }),
1668 ..Default::default()
1669 };
1670 eprintln!("{query:#?}:");
1671
1672 let query_handle = query_engine.query(query.clone());
1673 assert_eq!(
1674 query_engine.query(query.clone()).into_iter().count() as u64,
1675 query_handle.num_rows()
1676 );
1677 let dataframe = concat_batches(
1678 query_handle.schema(),
1679 &query_handle.batch_iter().collect_vec(),
1680 )?;
1681 eprintln!("{}", format_record_batch(&dataframe.clone()));
1682
1683 assert_snapshot!(DisplayRB(dataframe));
1684 }
1685
1686 {
1688 let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
1689
1690 let query = QueryExpression {
1691 filtered_index,
1692 filtered_is_not_null: Some(ComponentColumnSelector {
1693 entity_path: entity_path.clone(),
1694 component: component.to_string(),
1695 }),
1696 ..Default::default()
1697 };
1698 eprintln!("{query:#?}:");
1699
1700 let query_handle = query_engine.query(query.clone());
1701 assert_eq!(
1702 query_engine.query(query.clone()).into_iter().count() as u64,
1703 query_handle.num_rows()
1704 );
1705 let dataframe = concat_batches(
1706 query_handle.schema(),
1707 &query_handle.batch_iter().collect_vec(),
1708 )?;
1709 eprintln!("{}", format_record_batch(&dataframe.clone()));
1710
1711 assert_snapshot!(DisplayRB(dataframe));
1712 }
1713
1714 {
1716 let ComponentDescriptor { component, .. } = MyPoints::descriptor_colors();
1717
1718 let query = QueryExpression {
1719 filtered_index,
1720 filtered_is_not_null: Some(ComponentColumnSelector {
1721 entity_path: entity_path.clone(),
1722 component: component.to_string(),
1723 }),
1724 ..Default::default()
1725 };
1726 eprintln!("{query:#?}:");
1727
1728 let query_handle = query_engine.query(query.clone());
1729 assert_eq!(
1730 query_engine.query(query.clone()).into_iter().count() as u64,
1731 query_handle.num_rows()
1732 );
1733 let dataframe = concat_batches(
1734 query_handle.schema(),
1735 &query_handle.batch_iter().collect_vec(),
1736 )?;
1737 eprintln!("{}", format_record_batch(&dataframe.clone()));
1738
1739 assert_snapshot!(DisplayRB(dataframe));
1740 }
1741
1742 Ok(())
1743 }
1744
1745 #[test]
1746 fn view_contents() -> anyhow::Result<()> {
1747 re_log::setup_logging();
1748
1749 let store = ChunkStoreHandle::new(create_nasty_store()?);
1750 eprintln!("{store}");
1751 let query_cache = QueryCache::new_handle(store.clone());
1752 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1753
1754 let entity_path: EntityPath = "this/that".into();
1755 let filtered_index = Some(TimelineName::new("frame_nr"));
1756
1757 {
1759 let query = QueryExpression {
1760 filtered_index,
1761 view_contents: Some(
1762 [(entity_path.clone(), Some(Default::default()))]
1763 .into_iter()
1764 .collect(),
1765 ),
1766 ..Default::default()
1767 };
1768 eprintln!("{query:#?}:");
1769
1770 let query_handle = query_engine.query(query.clone());
1771 assert_eq!(
1772 query_engine.query(query.clone()).into_iter().count() as u64,
1773 query_handle.num_rows()
1774 );
1775 let dataframe = concat_batches(
1776 query_handle.schema(),
1777 &query_handle.batch_iter().collect_vec(),
1778 )?;
1779 eprintln!("{}", format_record_batch(&dataframe.clone()));
1780
1781 assert_snapshot!(DisplayRB(dataframe));
1782 }
1783
1784 {
1785 let query = QueryExpression {
1786 filtered_index,
1787 view_contents: Some(
1788 [(
1789 entity_path.clone(),
1790 Some(
1791 [
1792 MyPoints::descriptor_labels().component,
1793 MyPoints::descriptor_colors().component,
1794 ComponentIdentifier::new("AColumnThatDoesntEvenExist"),
1795 ]
1796 .into_iter()
1797 .collect(),
1798 ),
1799 )]
1800 .into_iter()
1801 .collect(),
1802 ),
1803 ..Default::default()
1804 };
1805 eprintln!("{query:#?}:");
1806
1807 let query_handle = query_engine.query(query.clone());
1808 assert_eq!(
1809 query_engine.query(query.clone()).into_iter().count() as u64,
1810 query_handle.num_rows()
1811 );
1812 let dataframe = concat_batches(
1813 query_handle.schema(),
1814 &query_handle.batch_iter().collect_vec(),
1815 )?;
1816 eprintln!("{}", format_record_batch(&dataframe.clone()));
1817
1818 assert_snapshot!(DisplayRB(dataframe));
1819 }
1820
1821 Ok(())
1822 }
1823
1824 #[test]
1825 fn selection() -> anyhow::Result<()> {
1826 re_log::setup_logging();
1827
1828 let store = ChunkStoreHandle::new(create_nasty_store()?);
1829 eprintln!("{store}");
1830 let query_cache = QueryCache::new_handle(store.clone());
1831 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1832
1833 let entity_path: EntityPath = "this/that".into();
1834 let filtered_index = TimelineName::new("frame_nr");
1835
1836 {
1838 let query = QueryExpression {
1839 filtered_index: Some(filtered_index),
1840 selection: Some(vec![]),
1841 ..Default::default()
1842 };
1843 eprintln!("{query:#?}:");
1844
1845 let query_handle = query_engine.query(query.clone());
1846 assert_eq!(
1847 query_engine.query(query.clone()).into_iter().count() as u64,
1848 query_handle.num_rows()
1849 );
1850 let dataframe = concat_batches(
1851 query_handle.schema(),
1852 &query_handle.batch_iter().collect_vec(),
1853 )?;
1854 eprintln!("{}", format_record_batch(&dataframe.clone()));
1855
1856 assert_snapshot!(DisplayRB(dataframe));
1857 }
1858
1859 {
1861 let query = QueryExpression {
1862 filtered_index: Some(filtered_index),
1863 selection: Some(vec![
1864 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1865 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1866 ColumnSelector::Time(TimeColumnSelector::from("ATimeColumnThatDoesntExist")),
1867 ]),
1868 ..Default::default()
1869 };
1870 eprintln!("{query:#?}:");
1871
1872 let query_handle = query_engine.query(query.clone());
1873 assert_eq!(
1874 query_engine.query(query.clone()).into_iter().count() as u64,
1875 query_handle.num_rows()
1876 );
1877 let dataframe = concat_batches(
1878 query_handle.schema(),
1879 &query_handle.batch_iter().collect_vec(),
1880 )?;
1881 eprintln!("{}", format_record_batch(&dataframe.clone()));
1882
1883 assert_snapshot!(DisplayRB(dataframe));
1884 }
1885
1886 {
1888 let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
1889
1890 let query = QueryExpression {
1891 filtered_index: Some(filtered_index),
1892 selection: Some(vec![
1893 ColumnSelector::Component(ComponentColumnSelector {
1895 entity_path: entity_path.clone(),
1896 component: component.to_string(),
1897 }),
1898 ColumnSelector::Component(ComponentColumnSelector {
1899 entity_path: entity_path.clone(),
1900 component: component.to_string(),
1901 }),
1902 ColumnSelector::Component(ComponentColumnSelector {
1904 entity_path: "non_existing_entity".into(),
1905 component: component.to_string(),
1906 }),
1907 ColumnSelector::Component(ComponentColumnSelector {
1909 entity_path: entity_path.clone(),
1910 component: "MyPoints:AFieldThatDoesntExist".into(),
1911 }),
1912 ColumnSelector::Component(ComponentColumnSelector {
1913 entity_path: entity_path.clone(),
1914 component: "AFieldThatDoesntExist".into(),
1915 }),
1916 ColumnSelector::Component(ComponentColumnSelector {
1917 entity_path: entity_path.clone(),
1918 component: "AArchetypeNameThatDoesNotExist:positions".into(),
1919 }),
1920 ]),
1921 ..Default::default()
1922 };
1923 eprintln!("{query:#?}:");
1924
1925 let query_handle = query_engine.query(query.clone());
1926 assert_eq!(
1927 query_engine.query(query.clone()).into_iter().count() as u64,
1928 query_handle.num_rows()
1929 );
1930 let dataframe = concat_batches(
1931 query_handle.schema(),
1932 &query_handle.batch_iter().collect_vec(),
1933 )?;
1934 eprintln!("{}", format_record_batch(&dataframe.clone()));
1935
1936 assert_snapshot!(DisplayRB(dataframe));
1937 }
1938
1939 {
1941 let ComponentDescriptor { component, .. } = MyPoints::descriptor_labels();
1942
1943 let query = QueryExpression {
1944 filtered_index: Some(filtered_index),
1945 selection: Some(vec![
1946 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1949 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1950 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1951 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1952 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1953 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1954 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1955 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1956 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1957 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
1958 ColumnSelector::Component(ComponentColumnSelector {
1960 entity_path: entity_path.clone(),
1961 component: component.to_string(),
1962 }),
1963 ]),
1964 ..Default::default()
1965 };
1966 eprintln!("{query:#?}:");
1967
1968 let query_handle = query_engine.query(query.clone());
1969 assert_eq!(
1970 query_engine.query(query.clone()).into_iter().count() as u64,
1971 query_handle.num_rows()
1972 );
1973 let dataframe = concat_batches(
1974 query_handle.schema(),
1975 &query_handle.batch_iter().collect_vec(),
1976 )?;
1977 eprintln!("{}", format_record_batch(&dataframe.clone()));
1978
1979 assert_snapshot!(DisplayRB(dataframe));
1980 }
1981
1982 Ok(())
1983 }
1984
1985 #[test]
1986 fn view_contents_and_selection() -> anyhow::Result<()> {
1987 re_log::setup_logging();
1988
1989 let store = ChunkStoreHandle::new(create_nasty_store()?);
1990 eprintln!("{store}");
1991 let query_cache = QueryCache::new_handle(store.clone());
1992 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
1993
1994 let entity_path: EntityPath = "this/that".into();
1995 let filtered_index = TimelineName::new("frame_nr");
1996
1997 {
1999 let query = QueryExpression {
2000 filtered_index: Some(filtered_index),
2001 view_contents: Some(
2002 [(
2003 entity_path.clone(),
2004 Some(
2005 [
2006 MyPoints::descriptor_colors().component,
2007 MyPoints::descriptor_labels().component,
2008 ]
2009 .into_iter()
2010 .collect(),
2011 ),
2012 )]
2013 .into_iter()
2014 .collect(),
2015 ),
2016 selection: Some(vec![
2017 ColumnSelector::Time(TimeColumnSelector::from(filtered_index)),
2018 ColumnSelector::Time(TimeColumnSelector::from(*Timeline::log_time().name())),
2019 ColumnSelector::Time(TimeColumnSelector::from(*Timeline::log_tick().name())),
2020 ColumnSelector::Component(ComponentColumnSelector {
2022 entity_path: entity_path.clone(),
2023 component: MyPoints::descriptor_points().component.to_string(),
2024 }),
2025 ColumnSelector::Component(ComponentColumnSelector {
2026 entity_path: entity_path.clone(),
2027 component: MyPoints::descriptor_colors().component.to_string(),
2028 }),
2029 ColumnSelector::Component(ComponentColumnSelector {
2030 entity_path: entity_path.clone(),
2031 component: MyPoints::descriptor_labels().component.to_string(),
2032 }),
2033 ]),
2034 ..Default::default()
2035 };
2036 eprintln!("{query:#?}:");
2037
2038 let query_handle = query_engine.query(query.clone());
2039 assert_eq!(
2040 query_engine.query(query.clone()).into_iter().count() as u64,
2041 query_handle.num_rows()
2042 );
2043 let dataframe = concat_batches(
2044 query_handle.schema(),
2045 &query_handle.batch_iter().collect_vec(),
2046 )?;
2047 eprintln!("{}", format_record_batch(&dataframe.clone()));
2048
2049 assert_snapshot!(DisplayRB(dataframe));
2050 }
2051
2052 Ok(())
2053 }
2054
2055 #[test]
2056 fn clears() -> anyhow::Result<()> {
2057 re_log::setup_logging();
2058
2059 let store = ChunkStoreHandle::new(create_nasty_store()?);
2060 extend_nasty_store_with_clears(&mut store.write())?;
2061 eprintln!("{store}");
2062
2063 let query_cache = QueryCache::new_handle(store.clone());
2064 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
2065
2066 let filtered_index = Some(TimelineName::new("frame_nr"));
2067 let entity_path = EntityPath::from("this/that");
2068
2069 {
2071 let query = QueryExpression {
2072 filtered_index,
2073 view_contents: Some([(entity_path.clone(), None)].into_iter().collect()),
2074 ..Default::default()
2075 };
2076 eprintln!("{query:#?}:");
2077
2078 let query_handle = query_engine.query(query.clone());
2079 assert_eq!(
2080 query_engine.query(query.clone()).into_iter().count() as u64,
2081 query_handle.num_rows()
2082 );
2083 let dataframe = concat_batches(
2084 query_handle.schema(),
2085 &query_handle.batch_iter().collect_vec(),
2086 )?;
2087 eprintln!("{}", format_record_batch(&dataframe.clone()));
2088
2089 assert_snapshot!(DisplayRB(dataframe));
2090 }
2091
2092 {
2094 let query = QueryExpression {
2095 filtered_index,
2096 view_contents: Some([(entity_path.clone(), None)].into_iter().collect()),
2097 sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
2098 ..Default::default()
2099 };
2100 eprintln!("{query:#?}:");
2101
2102 let query_handle = query_engine.query(query.clone());
2103 assert_eq!(
2104 query_engine.query(query.clone()).into_iter().count() as u64,
2105 query_handle.num_rows()
2106 );
2107 let dataframe = concat_batches(
2108 query_handle.schema(),
2109 &query_handle.batch_iter().collect_vec(),
2110 )?;
2111 eprintln!("{}", format_record_batch(&dataframe.clone()));
2112
2113 assert_snapshot!(DisplayRB(dataframe));
2118 }
2119
2120 Ok(())
2121 }
2122
2123 #[test]
2124 fn pagination() -> anyhow::Result<()> {
2125 re_log::setup_logging();
2126
2127 let store = ChunkStoreHandle::new(create_nasty_store()?);
2128 eprintln!("{store}");
2129 let query_cache = QueryCache::new_handle(store.clone());
2130 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
2131
2132 let filtered_index = Some(TimelineName::new("frame_nr"));
2133 let entity_path = EntityPath::from("this/that");
2134
2135 {
2137 let query = QueryExpression {
2138 filtered_index,
2139 ..Default::default()
2140 };
2141 eprintln!("{query:#?}:");
2142
2143 let query_handle = query_engine.query(query.clone());
2144 assert_eq!(
2145 query_engine.query(query.clone()).into_iter().count() as u64,
2146 query_handle.num_rows(),
2147 );
2148
2149 let expected_rows = query_handle.batch_iter().collect_vec();
2150
2151 for _ in 0..3 {
2152 for i in 0..expected_rows.len() {
2153 query_handle.seek_to_row(i);
2154
2155 let expected = concat_batches(
2156 query_handle.schema(),
2157 &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2158 )?;
2159 let got = concat_batches(
2160 query_handle.schema(),
2161 &query_handle.batch_iter().take(3).collect_vec(),
2162 )?;
2163
2164 let expected = format!("{:#?}", expected.columns());
2165 let got = format!("{:#?}", got.columns());
2166
2167 similar_asserts::assert_eq!(expected, got);
2168 }
2169 }
2170 }
2171
2172 {
2174 let ComponentDescriptor { component, .. } = MyPoints::descriptor_points();
2175 let query = QueryExpression {
2176 filtered_index,
2177 filtered_is_not_null: Some(ComponentColumnSelector {
2178 entity_path: entity_path.clone(),
2179 component: component.to_string(),
2180 }),
2181 ..Default::default()
2182 };
2183 eprintln!("{query:#?}:");
2184
2185 let query_handle = query_engine.query(query.clone());
2186 assert_eq!(
2187 query_engine.query(query.clone()).into_iter().count() as u64,
2188 query_handle.num_rows(),
2189 );
2190
2191 let expected_rows = query_handle.batch_iter().collect_vec();
2192
2193 for _ in 0..3 {
2194 for i in 0..expected_rows.len() {
2195 query_handle.seek_to_row(i);
2196
2197 let expected = concat_batches(
2198 query_handle.schema(),
2199 &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2200 )?;
2201 let got = concat_batches(
2202 query_handle.schema(),
2203 &query_handle.batch_iter().take(3).collect_vec(),
2204 )?;
2205
2206 let expected = format!("{:#?}", expected.columns());
2207 let got = format!("{:#?}", got.columns());
2208
2209 similar_asserts::assert_eq!(expected, got);
2210 }
2211 }
2212 }
2213
2214 {
2216 let query = QueryExpression {
2217 filtered_index,
2218 using_index_values: Some(
2219 [0, 15, 30, 30, 45, 60, 75, 90]
2220 .into_iter()
2221 .map(TimeInt::new_temporal)
2222 .chain(std::iter::once(TimeInt::STATIC))
2223 .collect(),
2224 ),
2225 ..Default::default()
2226 };
2227 eprintln!("{query:#?}:");
2228
2229 let query_handle = query_engine.query(query.clone());
2230 assert_eq!(
2231 query_engine.query(query.clone()).into_iter().count() as u64,
2232 query_handle.num_rows(),
2233 );
2234
2235 let expected_rows = query_handle.batch_iter().collect_vec();
2236
2237 for _ in 0..3 {
2238 for i in 0..expected_rows.len() {
2239 query_handle.seek_to_row(i);
2240
2241 let expected = concat_batches(
2242 query_handle.schema(),
2243 &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2244 )?;
2245 let got = concat_batches(
2246 query_handle.schema(),
2247 &query_handle.batch_iter().take(3).collect_vec(),
2248 )?;
2249
2250 let expected = format!("{:#?}", expected.columns());
2251 let got = format!("{:#?}", got.columns());
2252
2253 similar_asserts::assert_eq!(expected, got);
2254 }
2255 }
2256 }
2257
2258 {
2260 let query = QueryExpression {
2261 filtered_index,
2262 sparse_fill_strategy: SparseFillStrategy::LatestAtGlobal,
2263 ..Default::default()
2264 };
2265 eprintln!("{query:#?}:");
2266
2267 let query_handle = query_engine.query(query.clone());
2268 assert_eq!(
2269 query_engine.query(query.clone()).into_iter().count() as u64,
2270 query_handle.num_rows(),
2271 );
2272
2273 let expected_rows = query_handle.batch_iter().collect_vec();
2274
2275 for _ in 0..3 {
2276 for i in 0..expected_rows.len() {
2277 query_handle.seek_to_row(i);
2278
2279 let expected = concat_batches(
2280 query_handle.schema(),
2281 &expected_rows.iter().skip(i).take(3).cloned().collect_vec(),
2282 )?;
2283 let got = concat_batches(
2284 query_handle.schema(),
2285 &query_handle.batch_iter().take(3).collect_vec(),
2286 )?;
2287
2288 let expected = format!("{:#?}", expected.columns());
2289 let got = format!("{:#?}", got.columns());
2290
2291 similar_asserts::assert_eq!(expected, got);
2292 }
2293 }
2294 }
2295
2296 Ok(())
2297 }
2298
2299 #[test]
2300 fn query_static_any_values() -> anyhow::Result<()> {
2301 re_log::setup_logging();
2302
2303 let store = ChunkStore::new_handle(
2304 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
2305 ChunkStoreConfig::COMPACTION_DISABLED,
2306 );
2307
2308 let any_values = AnyValues::default()
2309 .with_component_from_data("yak", Arc::new(StringArray::from(vec!["yuk"])))
2310 .with_component_from_data("foo", Arc::new(StringArray::from(vec!["bar"])))
2311 .with_component_from_data("baz", Arc::new(UInt32Array::from(vec![42u32])));
2312
2313 let entity_path = EntityPath::from("test");
2314
2315 let chunk0 = Chunk::builder(entity_path.clone())
2316 .with_serialized_batches(
2317 RowId::new(),
2318 TimePoint::default(),
2319 any_values.as_serialized_batches(),
2320 )
2321 .build()?;
2322
2323 store.write().insert_chunk(&Arc::new(chunk0))?;
2324
2325 let engine = QueryEngine::from_store(store);
2326
2327 let query_expr = QueryExpression {
2328 view_contents: None,
2329 include_semantically_empty_columns: false,
2330 include_tombstone_columns: false,
2331 include_static_columns: re_chunk_store::StaticColumnSelection::Both,
2332 filtered_index: None,
2333 filtered_index_range: None,
2334 filtered_index_values: None,
2335 using_index_values: None,
2336 filtered_is_not_null: None,
2337 sparse_fill_strategy: re_chunk_store::SparseFillStrategy::None,
2338 selection: None,
2339 };
2340
2341 let query_handle = engine.query(query_expr);
2342
2343 let dataframe = concat_batches(
2344 query_handle.schema(),
2345 &query_handle.batch_iter().collect_vec(),
2346 )?;
2347 eprintln!("{}", format_record_batch(&dataframe.clone()));
2348
2349 assert_snapshot!(DisplayRB(dataframe));
2350
2351 Ok(())
2352 }
2353
2354 #[tokio::test]
2355 async fn async_barebones() -> anyhow::Result<()> {
2356 use tokio_stream::StreamExt as _;
2357
2358 re_log::setup_logging();
2359
2360 pub struct QueryHandleStream(pub QueryHandle<StorageEngine>);
2362
2363 impl tokio_stream::Stream for QueryHandleStream {
2364 type Item = ArrowRecordBatch;
2365
2366 #[inline]
2367 fn poll_next(
2368 self: std::pin::Pin<&mut Self>,
2369 cx: &mut std::task::Context<'_>,
2370 ) -> std::task::Poll<Option<Self::Item>> {
2371 let fut = self.0.next_row_batch_async();
2372 let fut = std::pin::pin!(fut);
2373
2374 use std::future::Future as _;
2375 fut.poll(cx)
2376 }
2377 }
2378
2379 let store = ChunkStoreHandle::new(create_nasty_store()?);
2380 eprintln!("{store}");
2381 let query_cache = QueryCache::new_handle(store.clone());
2382 let query_engine = QueryEngine::new(store.clone(), query_cache.clone());
2383
2384 let engine_guard = query_engine.engine.write_arc();
2385
2386 let filtered_index = Some(TimelineName::new("frame_nr"));
2387
2388 let handle_static = tokio::spawn({
2390 let query_engine = query_engine.clone();
2391 async move {
2392 let query = QueryExpression::default();
2393 eprintln!("{query:#?}:");
2394
2395 let query_handle = query_engine.query(query.clone());
2396 assert_eq!(
2397 QueryHandleStream(query_engine.query(query.clone()))
2398 .collect::<Vec<_>>()
2399 .await
2400 .len() as u64,
2401 query_handle.num_rows()
2402 );
2403 let dataframe = concat_batches(
2404 query_handle.schema(),
2405 &QueryHandleStream(query_engine.query(query.clone()))
2406 .collect::<Vec<_>>()
2407 .await,
2408 )?;
2409 eprintln!("{}", format_record_batch(&dataframe.clone()));
2410
2411 assert_snapshot!("async_barebones_static", DisplayRB(dataframe));
2412
2413 Ok::<_, anyhow::Error>(())
2414 }
2415 });
2416
2417 let handle_temporal = tokio::spawn({
2419 async move {
2420 let query = QueryExpression {
2421 filtered_index,
2422 ..Default::default()
2423 };
2424 eprintln!("{query:#?}:");
2425
2426 let query_handle = query_engine.query(query.clone());
2427 assert_eq!(
2428 QueryHandleStream(query_engine.query(query.clone()))
2429 .collect::<Vec<_>>()
2430 .await
2431 .len() as u64,
2432 query_handle.num_rows()
2433 );
2434 let dataframe = concat_batches(
2435 query_handle.schema(),
2436 &QueryHandleStream(query_engine.query(query.clone()))
2437 .collect::<Vec<_>>()
2438 .await,
2439 )?;
2440 eprintln!("{}", format_record_batch(&dataframe.clone()));
2441
2442 assert_snapshot!("async_barebones_temporal", DisplayRB(dataframe));
2443
2444 Ok::<_, anyhow::Error>(())
2445 }
2446 });
2447
2448 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
2449
2450 let handle_queries = tokio::spawn(async move {
2451 let mut handle_static = std::pin::pin!(handle_static);
2452 let mut handle_temporal = std::pin::pin!(handle_temporal);
2453
2454 {
2459 const RAW_WAKER_NOOP: std::task::RawWaker = {
2465 const VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
2466 |_| RAW_WAKER_NOOP, |_| {}, |_| {}, |_| {}, );
2471 std::task::RawWaker::new(std::ptr::null(), &VTABLE)
2472 };
2473
2474 #[expect(unsafe_code)]
2475 let mut cx = std::task::Context::from_waker(
2476 unsafe {
2478 &*std::ptr::from_ref::<std::task::RawWaker>(&RAW_WAKER_NOOP)
2479 .cast::<std::task::Waker>()
2480 },
2481 );
2482
2483 use std::future::Future as _;
2484 assert!(handle_static.as_mut().poll(&mut cx).is_pending());
2485 assert!(handle_temporal.as_mut().poll(&mut cx).is_pending());
2486 }
2487
2488 tx.send(()).unwrap();
2489
2490 handle_static.await??;
2491 handle_temporal.await??;
2492
2493 Ok::<_, anyhow::Error>(())
2494 });
2495
2496 rx.await?;
2497
2498 drop(engine_guard);
2501
2502 handle_queries.await??;
2503
2504 Ok(())
2505 }
2506
2507 fn create_nasty_store() -> anyhow::Result<ChunkStore> {
2510 let mut store = ChunkStore::new(
2511 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
2512 ChunkStoreConfig::COMPACTION_DISABLED,
2513 );
2514
2515 let entity_path = EntityPath::from("/this/that");
2516
2517 let frame1 = TimeInt::new_temporal(10);
2518 let frame2 = TimeInt::new_temporal(20);
2519 let frame3 = TimeInt::new_temporal(30);
2520 let frame4 = TimeInt::new_temporal(40);
2521 let frame5 = TimeInt::new_temporal(50);
2522 let frame6 = TimeInt::new_temporal(60);
2523 let frame7 = TimeInt::new_temporal(70);
2524
2525 let points1 = MyPoint::from_iter(0..1);
2526 let points2 = MyPoint::from_iter(1..2);
2527 let points3 = MyPoint::from_iter(2..3);
2528 let points4 = MyPoint::from_iter(3..4);
2529 let points5 = MyPoint::from_iter(4..5);
2530 let points6 = MyPoint::from_iter(5..6);
2531 let points7_1 = MyPoint::from_iter(6..7);
2532 let points7_2 = MyPoint::from_iter(7..8);
2533 let points7_3 = MyPoint::from_iter(8..9);
2534
2535 let colors3 = MyColor::from_iter(2..3);
2536 let colors4 = MyColor::from_iter(3..4);
2537 let colors5 = MyColor::from_iter(4..5);
2538 let colors7 = MyColor::from_iter(6..7);
2539
2540 let labels1 = vec![MyLabel("a".to_owned())];
2541 let labels2 = vec![MyLabel("b".to_owned())];
2542 let labels3 = vec![MyLabel("c".to_owned())];
2543
2544 let row_id1_1 = RowId::new();
2545 let row_id1_3 = RowId::new();
2546 let row_id1_5 = RowId::new();
2547 let row_id1_7_1 = RowId::new();
2548 let row_id1_7_2 = RowId::new();
2549 let row_id1_7_3 = RowId::new();
2550 let chunk1_1 = Chunk::builder(entity_path.clone())
2551 .with_sparse_component_batches(
2552 row_id1_1,
2553 [build_frame_nr(frame1), build_log_time(frame1.into())],
2554 [
2555 (MyPoints::descriptor_points(), Some(&points1 as _)),
2556 (MyPoints::descriptor_colors(), None),
2557 (MyPoints::descriptor_labels(), Some(&labels1 as _)), ],
2559 )
2560 .with_sparse_component_batches(
2561 row_id1_3,
2562 [build_frame_nr(frame3), build_log_time(frame3.into())],
2563 [
2564 (MyPoints::descriptor_points(), Some(&points3 as _)),
2565 (MyPoints::descriptor_colors(), Some(&colors3 as _)),
2566 ],
2567 )
2568 .with_sparse_component_batches(
2569 row_id1_5,
2570 [build_frame_nr(frame5), build_log_time(frame5.into())],
2571 [
2572 (MyPoints::descriptor_points(), Some(&points5 as _)),
2573 (MyPoints::descriptor_colors(), None),
2574 ],
2575 )
2576 .with_sparse_component_batches(
2577 row_id1_7_1,
2578 [build_frame_nr(frame7), build_log_time(frame7.into())],
2579 [(MyPoints::descriptor_points(), Some(&points7_1 as _))],
2580 )
2581 .with_sparse_component_batches(
2582 row_id1_7_2,
2583 [build_frame_nr(frame7), build_log_time(frame7.into())],
2584 [(MyPoints::descriptor_points(), Some(&points7_2 as _))],
2585 )
2586 .with_sparse_component_batches(
2587 row_id1_7_3,
2588 [build_frame_nr(frame7), build_log_time(frame7.into())],
2589 [(MyPoints::descriptor_points(), Some(&points7_3 as _))],
2590 )
2591 .build()?;
2592
2593 let chunk1_1 = Arc::new(chunk1_1);
2594 store.insert_chunk(&chunk1_1)?;
2595 let chunk1_2 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new()));
2596 store.insert_chunk(&chunk1_2)?; let chunk1_3 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new()));
2598 store.insert_chunk(&chunk1_3)?; let row_id2_2 = RowId::new();
2601 let row_id2_3 = RowId::new();
2602 let row_id2_4 = RowId::new();
2603 let chunk2 = Chunk::builder(entity_path.clone())
2604 .with_sparse_component_batches(
2605 row_id2_2,
2606 [build_frame_nr(frame2)],
2607 [(MyPoints::descriptor_points(), Some(&points2 as _))],
2608 )
2609 .with_sparse_component_batches(
2610 row_id2_3,
2611 [build_frame_nr(frame3)],
2612 [
2613 (MyPoints::descriptor_points(), Some(&points3 as _)),
2614 (MyPoints::descriptor_colors(), Some(&colors3 as _)),
2615 ],
2616 )
2617 .with_sparse_component_batches(
2618 row_id2_4,
2619 [build_frame_nr(frame4)],
2620 [(MyPoints::descriptor_points(), Some(&points4 as _))],
2621 )
2622 .build()?;
2623
2624 let chunk2 = Arc::new(chunk2);
2625 store.insert_chunk(&chunk2)?;
2626
2627 let row_id3_2 = RowId::new();
2628 let row_id3_4 = RowId::new();
2629 let row_id3_6 = RowId::new();
2630 let chunk3 = Chunk::builder(entity_path.clone())
2631 .with_sparse_component_batches(
2632 row_id3_2,
2633 [build_frame_nr(frame2)],
2634 [(MyPoints::descriptor_points(), Some(&points2 as _))],
2635 )
2636 .with_sparse_component_batches(
2637 row_id3_4,
2638 [build_frame_nr(frame4)],
2639 [(MyPoints::descriptor_points(), Some(&points4 as _))],
2640 )
2641 .with_sparse_component_batches(
2642 row_id3_6,
2643 [build_frame_nr(frame6)],
2644 [(MyPoints::descriptor_points(), Some(&points6 as _))],
2645 )
2646 .build()?;
2647
2648 let chunk3 = Arc::new(chunk3);
2649 store.insert_chunk(&chunk3)?;
2650
2651 let row_id4_4 = RowId::new();
2652 let row_id4_5 = RowId::new();
2653 let row_id4_7 = RowId::new();
2654 let chunk4 = Chunk::builder(entity_path.clone())
2655 .with_sparse_component_batches(
2656 row_id4_4,
2657 [build_frame_nr(frame4)],
2658 [(MyPoints::descriptor_colors(), Some(&colors4 as _))],
2659 )
2660 .with_sparse_component_batches(
2661 row_id4_5,
2662 [build_frame_nr(frame5)],
2663 [(MyPoints::descriptor_colors(), Some(&colors5 as _))],
2664 )
2665 .with_sparse_component_batches(
2666 row_id4_7,
2667 [build_frame_nr(frame7)],
2668 [(MyPoints::descriptor_colors(), Some(&colors7 as _))],
2669 )
2670 .build()?;
2671
2672 let chunk4 = Arc::new(chunk4);
2673 store.insert_chunk(&chunk4)?;
2674
2675 let row_id5_1 = RowId::new();
2676 let chunk5 = Chunk::builder(entity_path.clone())
2677 .with_sparse_component_batches(
2678 row_id5_1,
2679 TimePoint::default(),
2680 [(MyPoints::descriptor_labels(), Some(&labels2 as _))],
2681 )
2682 .build()?;
2683
2684 let chunk5 = Arc::new(chunk5);
2685 store.insert_chunk(&chunk5)?;
2686
2687 let row_id6_1 = RowId::new();
2688 let chunk6 = Chunk::builder(entity_path.clone())
2689 .with_sparse_component_batches(
2690 row_id6_1,
2691 TimePoint::default(),
2692 [(MyPoints::descriptor_labels(), Some(&labels3 as _))],
2693 )
2694 .build()?;
2695
2696 let chunk6 = Arc::new(chunk6);
2697 store.insert_chunk(&chunk6)?;
2698
2699 Ok(store)
2700 }
2701
2702 fn extend_nasty_store_with_clears(store: &mut ChunkStore) -> anyhow::Result<()> {
2703 let entity_path = EntityPath::from("/this/that");
2704 let entity_path_parent = EntityPath::from("/this");
2705 let entity_path_root = EntityPath::from("/");
2706
2707 let frame35 = TimeInt::new_temporal(35);
2708 let frame55 = TimeInt::new_temporal(55);
2709 let frame60 = TimeInt::new_temporal(60);
2710 let frame65 = TimeInt::new_temporal(65);
2711
2712 let clear_flat = components::ClearIsRecursive(false.into());
2713 let clear_recursive = components::ClearIsRecursive(true.into());
2714
2715 let row_id1_1 = RowId::new();
2716 let chunk1 = Chunk::builder(entity_path.clone())
2717 .with_sparse_component_batches(
2718 row_id1_1,
2719 TimePoint::default(),
2720 [(
2721 archetypes::Clear::descriptor_is_recursive(),
2722 Some(&clear_flat as _),
2723 )],
2724 )
2725 .build()?;
2726
2727 let chunk1 = Arc::new(chunk1);
2728 store.insert_chunk(&chunk1)?;
2729
2730 let row_id2_1 = RowId::new();
2741 let chunk2 = Chunk::builder(entity_path.clone())
2742 .with_sparse_component_batches(
2743 row_id2_1,
2744 [build_frame_nr(frame35), build_log_time(frame35.into())],
2745 [(
2746 archetypes::Clear::descriptor_is_recursive(),
2747 Some(&clear_recursive as _),
2748 )],
2749 )
2750 .build()?;
2751
2752 let chunk2 = Arc::new(chunk2);
2753 store.insert_chunk(&chunk2)?;
2754
2755 let row_id3_1 = RowId::new();
2756 let chunk3 = Chunk::builder(entity_path_root.clone())
2757 .with_sparse_component_batches(
2758 row_id3_1,
2759 [build_frame_nr(frame55), build_log_time(frame55.into())],
2760 [(
2761 archetypes::Clear::descriptor_is_recursive(),
2762 Some(&clear_flat as _),
2763 )],
2764 )
2765 .with_sparse_component_batches(
2766 row_id3_1,
2767 [build_frame_nr(frame60), build_log_time(frame60.into())],
2768 [(
2769 archetypes::Clear::descriptor_is_recursive(),
2770 Some(&clear_recursive as _),
2771 )],
2772 )
2773 .with_sparse_component_batches(
2774 row_id3_1,
2775 [build_frame_nr(frame65), build_log_time(frame65.into())],
2776 [(
2777 archetypes::Clear::descriptor_is_recursive(),
2778 Some(&clear_flat as _),
2779 )],
2780 )
2781 .build()?;
2782
2783 let chunk3 = Arc::new(chunk3);
2784 store.insert_chunk(&chunk3)?;
2785
2786 let row_id4_1 = RowId::new();
2787 let chunk4 = Chunk::builder(entity_path_parent.clone())
2788 .with_sparse_component_batches(
2789 row_id4_1,
2790 [build_frame_nr(frame60), build_log_time(frame60.into())],
2791 [(
2792 archetypes::Clear::descriptor_is_recursive(),
2793 Some(&clear_flat as _),
2794 )],
2795 )
2796 .build()?;
2797
2798 let chunk4 = Arc::new(chunk4);
2799 store.insert_chunk(&chunk4)?;
2800
2801 let row_id5_1 = RowId::new();
2802 let chunk5 = Chunk::builder(entity_path_parent.clone())
2803 .with_sparse_component_batches(
2804 row_id5_1,
2805 [build_frame_nr(frame65), build_log_time(frame65.into())],
2806 [(
2807 archetypes::Clear::descriptor_is_recursive(),
2808 Some(&clear_recursive as _),
2809 )],
2810 )
2811 .build()?;
2812
2813 let chunk5 = Arc::new(chunk5);
2814 store.insert_chunk(&chunk5)?;
2815
2816 Ok(())
2817 }
2818}