re_chunk_store/dataframe.rs
1//! All the APIs used specifically for `re_dataframe`.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::ops::{Deref, DerefMut};
5
6use arrow::array::ListArray as ArrowListArray;
7use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField};
8use itertools::Itertools as _;
9use re_chunk::{ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
10use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, Timeline};
11use re_sorbet::{
12 ChunkColumnDescriptors, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
13 IndexColumnDescriptor, TimeColumnSelector,
14};
15use tap::Tap as _;
16
17use crate::{ChunkStore, ColumnMetadata};
18
19// --- Queries v2 ---
20
21/// Specifies how null values should be filled in the returned dataframe.
22#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
23pub enum SparseFillStrategy {
24 /// No sparse filling. Nulls stay nulls.
25 #[default]
26 None,
27
28 /// Fill null values using global-scope latest-at semantics.
29 ///
30 /// The latest-at semantics are applied on the entire dataset as opposed to just the current
31 /// view contents: it is possible to end up with values from outside the view!
32 LatestAtGlobal,
33 //
34 // TODO(cmc): `LatestAtView`?
35}
36
37impl std::fmt::Display for SparseFillStrategy {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::None => f.write_str("none"),
41 Self::LatestAtGlobal => f.write_str("latest-at (global)"),
42 }
43 }
44}
45
46/// The view contents specify which subset of the database (i.e., which columns) the query runs on.
47///
48/// Contents are expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
49///
50/// Setting an entity's identifier to `None` means: everything.
51///
52// TODO(cmc): we need to be able to build that easily in a command-line context, otherwise it's just
53// very annoying. E.g. `--with /world/points:[positions, radius] --with /cam:[pinhole]`.
54#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
55pub struct ViewContentsSelector(pub BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>);
56
57impl ViewContentsSelector {
58 pub fn into_inner(self) -> BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>> {
59 self.0
60 }
61}
62
63impl Deref for ViewContentsSelector {
64 type Target = BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>;
65
66 #[inline]
67 fn deref(&self) -> &Self::Target {
68 &self.0
69 }
70}
71
72impl DerefMut for ViewContentsSelector {
73 #[inline]
74 fn deref_mut(&mut self) -> &mut Self::Target {
75 &mut self.0
76 }
77}
78
79impl FromIterator<(EntityPath, Option<BTreeSet<ComponentIdentifier>>)> for ViewContentsSelector {
80 fn from_iter<T: IntoIterator<Item = (EntityPath, Option<BTreeSet<ComponentIdentifier>>)>>(
81 iter: T,
82 ) -> Self {
83 Self(iter.into_iter().collect())
84 }
85}
86
87// TODO(cmc): Ultimately, this shouldn't be hardcoded to `Timeline`, but to a generic `I: Index`.
88// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
89pub type Index = TimelineName;
90
91// TODO(cmc): Ultimately, this shouldn't be hardcoded to `TimeInt`, but to a generic `I: Index`.
92// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
93pub type IndexValue = TimeInt;
94
95// TODO(cmc): Ultimately, this shouldn't be hardcoded to `AbsoluteTimeRange`, but to a generic `I: Index`.
96// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
97pub type IndexRange = AbsoluteTimeRange;
98
99/// Specifies whether static columns should be included in the query.
100#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
101pub enum StaticColumnSelection {
102 /// Both static and non-static columns should be included in the query.
103 #[default]
104 Both,
105
106 /// Only static columns should be included in the query.
107 StaticOnly,
108
109 /// Only non-static columns should be included in the query.
110 NonStaticOnly,
111}
112
113/// Describes a complete query for Rerun's dataframe API.
114///
115/// ## Terminology: view vs. selection vs. filtering vs. sampling
116///
117/// * The view contents specify which subset of the database (i.e., which columns) the query runs on,
118/// expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
119///
120/// * The filters filter out _rows_ of data from the view contents.
121/// A filter cannot possibly introduce new rows, it can only remove existing ones from the view contents.
122///
123/// * The samplers sample _rows_ of data from the view contents at user-specified values.
124/// Samplers don't necessarily return existing rows: they might introduce new ones if the sampled value
125/// isn't present in the view contents in the first place.
126///
127/// * The selection applies last and samples _columns_ of data from the filtered/sampled view contents.
128/// Selecting a column that isn't present in the view contents results in an empty column in the
129/// final dataframe (null array).
130///
131/// A very rough mental model, in SQL terms:
132/// ```text
133/// SELECT <Self::selection> FROM <Self::view_contents> WHERE <Self::filtered_*>
134/// ```
135//
136// TODO(cmc): ideally we'd like this to be the same type as the one used in the blueprint, possibly?
137#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
138pub struct QueryExpression {
139 /// The subset of the database that the query will run on: a set of [`EntityPath`]s and their
140 /// associated [`re_types_core::ComponentIdentifier`]s.
141 ///
142 /// Defaults to `None`, which means: everything.
143 ///
144 /// Example (pseudo-code):
145 /// ```text
146 /// view_contents = {
147 /// "world/points": [rr.Position3D, rr.Radius],
148 /// "metrics": [rr.Scalars]
149 /// }
150 /// ```
151 pub view_contents: Option<ViewContentsSelector>,
152
153 /// Whether the `view_contents` should ignore semantically empty columns.
154 ///
155 /// A semantically empty column is a column that either contains no data at all, or where all
156 /// values are either nulls or empty arrays (`[]`).
157 ///
158 /// `view_contents`: [`QueryExpression::view_contents`]
159 pub include_semantically_empty_columns: bool,
160
161 /// Whether the `view_contents` should ignore columns corresponding to `Clear`-related components.
162 ///
163 /// `view_contents`: [`QueryExpression::view_contents`]
164 /// `Clear`: [`re_types_core::archetypes::Clear`]
165 pub include_tombstone_columns: bool,
166
167 /// Whether the `view_contents` should include static columns.
168 ///
169 /// `view_contents`: [`QueryExpression::view_contents`]
170 pub include_static_columns: StaticColumnSelection,
171
172 /// The index used to filter out _rows_ from the view contents.
173 ///
174 /// Only rows where at least 1 column contains non-null data at that index will be kept in the
175 /// final dataset.
176 ///
177 /// If left unspecified, the results will only contain static data.
178 ///
179 /// Examples: `Some(TimelineName("frame"))`, `None` (only static data).
180 //
181 // TODO(cmc): this has to be a selector otherwise this is a horrible UX.
182 pub filtered_index: Option<Index>,
183
184 /// The range of index values used to filter out _rows_ from the view contents.
185 ///
186 /// Only rows where at least 1 of the view-contents contains non-null data within that range will be kept in
187 /// the final dataset.
188 ///
189 /// * This has no effect if `filtered_index` isn't set.
190 /// * This has no effect if [`QueryExpression::using_index_values`] is set.
191 ///
192 /// Example: `AbsoluteTimeRange(10, 20)`.
193 pub filtered_index_range: Option<IndexRange>,
194
195 /// The specific index values used to filter out _rows_ from the view contents.
196 ///
197 /// Only rows where at least 1 column contains non-null data at these specific values will be kept
198 /// in the final dataset.
199 ///
200 /// * This has no effect if `filtered_index` isn't set.
201 /// * This has no effect if [`QueryExpression::using_index_values`] is set.
202 /// * Using [`TimeInt::STATIC`] as index value has no effect.
203 ///
204 /// Example: `[TimeInt(12), TimeInt(14)]`.
205 pub filtered_index_values: Option<BTreeSet<IndexValue>>,
206
207 /// The specific index values used to sample _rows_ from the view contents.
208 ///
209 /// The final dataset will contain one row per sampled index value, regardless of whether data
210 /// existed for that index value in the view contents.
211 /// The semantics of the query are consistent with all other settings: the results will be
212 /// sorted on the `filtered_index`, and only contain unique index values.
213 ///
214 /// * This has no effect if `filtered_index` isn't set.
215 /// * If set, this overrides both [`QueryExpression::filtered_index_range`] and
216 /// [`QueryExpression::filtered_index_values`].
217 /// * Using [`TimeInt::STATIC`] as index value has no effect.
218 ///
219 /// Example: `[TimeInt(12), TimeInt(14)]`.
220 pub using_index_values: Option<BTreeSet<IndexValue>>,
221
222 /// The component column used to filter out _rows_ from the view contents.
223 ///
224 /// Only rows where this column contains non-null data be kept in the final dataset.
225 ///
226 /// Example: `ComponentColumnSelector("Points3D:positions")`.
227 //
228 // TODO(cmc): multi-pov support
229 pub filtered_is_not_null: Option<ComponentColumnSelector>,
230
231 /// Specifies how null values should be filled in the returned dataframe.
232 ///
233 /// Defaults to [`SparseFillStrategy::None`].
234 pub sparse_fill_strategy: SparseFillStrategy,
235
236 /// The specific _columns_ to sample from the final view contents.
237 ///
238 /// The order of the samples will be respected in the final result.
239 ///
240 /// Defaults to `None`, which means: everything.
241 ///
242 /// Example: `[ColumnSelector(Time("log_time")), ColumnSelector(Component("Points3D:position"))]`.
243 //
244 // TODO(cmc): the selection has to be on the QueryHandle, otherwise it's hell to use.
245 pub selection: Option<Vec<ColumnSelector>>,
246}
247
248impl QueryExpression {
249 pub fn is_static(&self) -> bool {
250 self.filtered_index.is_none()
251 }
252
253 pub fn min_latest_at(&self) -> Option<LatestAtQuery> {
254 let index = self.filtered_index?;
255
256 if let Some(using_index_values) = &self.using_index_values {
257 return Some(LatestAtQuery::new(
258 index,
259 using_index_values.first().copied()?,
260 ));
261 }
262
263 if let Some(filtered_index_values) = &self.filtered_index_values {
264 return Some(LatestAtQuery::new(
265 index,
266 filtered_index_values.first().copied()?,
267 ));
268 }
269
270 if let Some(filtered_index_range) = &self.filtered_index_range {
271 return Some(LatestAtQuery::new(index, filtered_index_range.min()));
272 }
273
274 None
275 }
276
277 pub fn max_range(&self) -> Option<RangeQuery> {
278 let index = self.filtered_index?;
279
280 if let Some(using_index_values) = &self.using_index_values {
281 return Some(RangeQuery::new(
282 index,
283 AbsoluteTimeRange::new(
284 using_index_values.first().copied()?,
285 using_index_values.last().copied()?,
286 ),
287 ));
288 }
289
290 if let Some(filtered_index_values) = &self.filtered_index_values {
291 return Some(RangeQuery::new(
292 index,
293 AbsoluteTimeRange::new(
294 filtered_index_values.first().copied()?,
295 filtered_index_values.last().copied()?,
296 ),
297 ));
298 }
299
300 if let Some(filtered_index_range) = &self.filtered_index_range {
301 return Some(RangeQuery::new(index, *filtered_index_range));
302 }
303
304 None
305 }
306}
307
308// ---
309
310impl ChunkStore {
311 /// Returns the full schema of the store.
312 ///
313 /// This will include a column descriptor for every timeline and every component on every
314 /// entity that has been written to the store so far.
315 ///
316 /// The order of the columns is guaranteed to be in a specific order:
317 /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
318 /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
319 pub fn schema(&self) -> ChunkColumnDescriptors {
320 re_tracing::profile_function!();
321
322 let indices = self
323 .timelines()
324 .values()
325 .map(|timeline| IndexColumnDescriptor::from(*timeline))
326 .collect();
327
328 let components = self
329 .per_column_metadata
330 .iter()
331 .flat_map(|(entity_path, per_identifier)| {
332 per_identifier
333 .values()
334 .map(move |(descr, _, datatype)| (entity_path, descr, datatype))
335 })
336 .filter_map(|(entity_path, component_descr, datatype)| {
337 let metadata =
338 self.lookup_column_metadata(entity_path, component_descr.component)?;
339
340 Some(((entity_path, component_descr), (metadata, datatype)))
341 })
342 .map(|((entity_path, component_descr), (metadata, datatype))| {
343 let ColumnMetadata {
344 is_static,
345 is_tombstone,
346 is_semantically_empty,
347 } = metadata;
348
349 if let Some(c) = component_descr.component_type {
350 c.sanity_check();
351 }
352
353 ComponentColumnDescriptor {
354 // NOTE: The data is always a at least a list, whether it's latest-at or range.
355 // It might be wrapped further in e.g. a dict, but at the very least
356 // it's a list.
357 store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR(
358 ArrowField::new("item", datatype.clone(), true).into(),
359 ),
360
361 entity_path: entity_path.clone(),
362 archetype: component_descr.archetype,
363 component: component_descr.component,
364 component_type: component_descr.component_type,
365 is_static,
366 is_tombstone,
367 is_semantically_empty,
368 }
369 })
370 .collect_vec()
371 .tap_mut(|components| components.sort());
372
373 ChunkColumnDescriptors {
374 row_id: self.row_id_descriptor(),
375 indices,
376 components,
377 }
378 .tap(|schema| schema.sanity_check())
379 }
380
381 #[expect(clippy::unused_self)]
382 pub fn row_id_descriptor(&self) -> re_sorbet::RowIdColumnDescriptor {
383 re_sorbet::RowIdColumnDescriptor::from_sorted(false)
384 }
385
386 /// Given a [`TimeColumnSelector`], returns the corresponding [`IndexColumnDescriptor`].
387 pub fn resolve_time_selector(&self, selector: &TimeColumnSelector) -> IndexColumnDescriptor {
388 let timelines = self.timelines();
389
390 let timeline = timelines
391 .get(&selector.timeline)
392 .copied()
393 .unwrap_or_else(|| {
394 re_log::warn_once!("Unknown timeline {selector:?}; assuming sequence timeline.");
395 Timeline::new_sequence(selector.timeline)
396 });
397
398 IndexColumnDescriptor::from(timeline)
399 }
400
401 /// Given a [`ComponentColumnSelector`], returns the corresponding [`ComponentColumnDescriptor`].
402 ///
403 /// If the component is not found in the store, a default descriptor is returned with a null datatype.
404 pub fn resolve_component_selector(
405 &self,
406 selector: &ComponentColumnSelector,
407 ) -> ComponentColumnDescriptor {
408 // Unfortunately, we can't return an error here, so we craft a default descriptor and
409 // add information to it that we find.
410
411 // TODO(#7699) This currently interns every string ever queried which could be wasteful, especially
412 // in long-running servers. In practice this probably doesn't matter.
413 let mut result = ComponentColumnDescriptor {
414 store_datatype: ArrowDatatype::Null,
415 component_type: None,
416 entity_path: selector.entity_path.clone(),
417 archetype: None,
418 component: selector.component.as_str().into(),
419 is_static: false,
420 is_tombstone: false,
421 is_semantically_empty: false,
422 };
423
424 let Some(per_identifier) = self.per_column_metadata.get(&selector.entity_path) else {
425 return result;
426 };
427
428 // We perform a scan over all component descriptors in the queried entity path.
429 let Some((component_descr, _, datatype)) =
430 per_identifier.get(&selector.component.as_str().into())
431 else {
432 return result;
433 };
434 result.store_datatype = datatype.clone();
435 result.archetype = component_descr.archetype;
436 result.component_type = component_descr.component_type;
437
438 if let Some(ColumnMetadata {
439 is_static,
440 is_tombstone,
441 is_semantically_empty,
442 }) = self.lookup_column_metadata(&selector.entity_path, component_descr.component)
443 {
444 result.is_static = is_static;
445 result.is_tombstone = is_tombstone;
446 result.is_semantically_empty = is_semantically_empty;
447 }
448
449 result
450 }
451
452 /// Returns the filtered schema for the given [`QueryExpression`].
453 ///
454 /// The order of the columns is guaranteed to be in a specific order:
455 /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
456 /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
457 pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
458 re_tracing::profile_function!();
459
460 let filter = Self::create_component_filter_from_query(query);
461
462 self.schema().filter_components(filter)
463 }
464
465 pub fn create_component_filter_from_query(
466 query: &QueryExpression,
467 ) -> impl Fn(&ComponentColumnDescriptor) -> bool {
468 let QueryExpression {
469 view_contents,
470 include_semantically_empty_columns,
471 include_tombstone_columns,
472 include_static_columns,
473 filtered_index: _,
474 filtered_index_range: _,
475 filtered_index_values: _,
476 using_index_values: _,
477 filtered_is_not_null: _,
478 sparse_fill_strategy: _,
479 selection: _,
480 } = query;
481
482 move |column: &ComponentColumnDescriptor| {
483 let is_part_of_view_contents = || {
484 view_contents.as_ref().is_none_or(|view_contents| {
485 view_contents
486 .get(&column.entity_path)
487 .is_some_and(|components| {
488 components
489 .as_ref()
490 .is_none_or(|components| components.contains(&column.component))
491 })
492 })
493 };
494
495 let passes_semantically_empty_check =
496 || *include_semantically_empty_columns || !column.is_semantically_empty;
497
498 let passes_tombstone_check = || *include_tombstone_columns || !column.is_tombstone;
499
500 let passes_static_check = || match include_static_columns {
501 StaticColumnSelection::Both => true,
502 StaticColumnSelection::StaticOnly => column.is_static,
503 StaticColumnSelection::NonStaticOnly => !column.is_static,
504 };
505
506 is_part_of_view_contents()
507 && passes_semantically_empty_check()
508 && passes_tombstone_check()
509 && passes_static_check()
510 }
511 }
512}