re_chunk_store/
dataframe.rs

1//! All the APIs used specifically for `re_dataframe`.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::ops::{Deref, DerefMut};
5
6use arrow::array::ListArray as ArrowListArray;
7use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField};
8use itertools::Itertools as _;
9use re_chunk::{ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
10use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, Timeline};
11use re_sorbet::{
12    ChunkColumnDescriptors, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
13    IndexColumnDescriptor, TimeColumnSelector,
14};
15use tap::Tap as _;
16
17use crate::{ChunkStore, ColumnMetadata};
18
19// --- Queries v2 ---
20
21/// Specifies how null values should be filled in the returned dataframe.
22#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
23pub enum SparseFillStrategy {
24    /// No sparse filling. Nulls stay nulls.
25    #[default]
26    None,
27
28    /// Fill null values using global-scope latest-at semantics.
29    ///
30    /// The latest-at semantics are applied on the entire dataset as opposed to just the current
31    /// view contents: it is possible to end up with values from outside the view!
32    LatestAtGlobal,
33    //
34    // TODO(cmc): `LatestAtView`?
35}
36
37impl std::fmt::Display for SparseFillStrategy {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::None => f.write_str("none"),
41            Self::LatestAtGlobal => f.write_str("latest-at (global)"),
42        }
43    }
44}
45
46/// The view contents specify which subset of the database (i.e., which columns) the query runs on.
47///
48/// Contents are expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
49///
50/// Setting an entity's identifier to `None` means: everything.
51///
52// TODO(cmc): we need to be able to build that easily in a command-line context, otherwise it's just
53// very annoying. E.g. `--with /world/points:[positions, radius] --with /cam:[pinhole]`.
54#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
55pub struct ViewContentsSelector(pub BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>);
56
57impl ViewContentsSelector {
58    pub fn into_inner(self) -> BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>> {
59        self.0
60    }
61}
62
63impl Deref for ViewContentsSelector {
64    type Target = BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>;
65
66    #[inline]
67    fn deref(&self) -> &Self::Target {
68        &self.0
69    }
70}
71
72impl DerefMut for ViewContentsSelector {
73    #[inline]
74    fn deref_mut(&mut self) -> &mut Self::Target {
75        &mut self.0
76    }
77}
78
79impl FromIterator<(EntityPath, Option<BTreeSet<ComponentIdentifier>>)> for ViewContentsSelector {
80    fn from_iter<T: IntoIterator<Item = (EntityPath, Option<BTreeSet<ComponentIdentifier>>)>>(
81        iter: T,
82    ) -> Self {
83        Self(iter.into_iter().collect())
84    }
85}
86
87// TODO(cmc): Ultimately, this shouldn't be hardcoded to `Timeline`, but to a generic `I: Index`.
88//            `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
89pub type Index = TimelineName;
90
91// TODO(cmc): Ultimately, this shouldn't be hardcoded to `TimeInt`, but to a generic `I: Index`.
92//            `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
93pub type IndexValue = TimeInt;
94
95// TODO(cmc): Ultimately, this shouldn't be hardcoded to `AbsoluteTimeRange`, but to a generic `I: Index`.
96//            `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
97pub type IndexRange = AbsoluteTimeRange;
98
99/// Specifies whether static columns should be included in the query.
100#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
101pub enum StaticColumnSelection {
102    /// Both static and non-static columns should be included in the query.
103    #[default]
104    Both,
105
106    /// Only static columns should be included in the query.
107    StaticOnly,
108
109    /// Only non-static columns should be included in the query.
110    NonStaticOnly,
111}
112
113/// Describes a complete query for Rerun's dataframe API.
114///
115/// ## Terminology: view vs. selection vs. filtering vs. sampling
116///
117/// * The view contents specify which subset of the database (i.e., which columns) the query runs on,
118///   expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
119///
120/// * The filters filter out _rows_ of data from the view contents.
121///   A filter cannot possibly introduce new rows, it can only remove existing ones from the view contents.
122///
123/// * The samplers sample _rows_ of data from the view contents at user-specified values.
124///   Samplers don't necessarily return existing rows: they might introduce new ones if the sampled value
125///   isn't present in the view contents in the first place.
126///
127/// * The selection applies last and samples _columns_ of data from the filtered/sampled view contents.
128///   Selecting a column that isn't present in the view contents results in an empty column in the
129///   final dataframe (null array).
130///
131/// A very rough mental model, in SQL terms:
132/// ```text
133/// SELECT <Self::selection> FROM <Self::view_contents> WHERE <Self::filtered_*>
134/// ```
135//
136// TODO(cmc): ideally we'd like this to be the same type as the one used in the blueprint, possibly?
137#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
138pub struct QueryExpression {
139    /// The subset of the database that the query will run on: a set of [`EntityPath`]s and their
140    /// associated [`re_types_core::ComponentIdentifier`]s.
141    ///
142    /// Defaults to `None`, which means: everything.
143    ///
144    /// Example (pseudo-code):
145    /// ```text
146    /// view_contents = {
147    ///   "world/points": [rr.Position3D, rr.Radius],
148    ///   "metrics": [rr.Scalars]
149    /// }
150    /// ```
151    pub view_contents: Option<ViewContentsSelector>,
152
153    /// Whether the `view_contents` should ignore semantically empty columns.
154    ///
155    /// A semantically empty column is a column that either contains no data at all, or where all
156    /// values are either nulls or empty arrays (`[]`).
157    ///
158    /// `view_contents`: [`QueryExpression::view_contents`]
159    pub include_semantically_empty_columns: bool,
160
161    /// Whether the `view_contents` should ignore columns corresponding to `Clear`-related components.
162    ///
163    /// `view_contents`: [`QueryExpression::view_contents`]
164    /// `Clear`: [`re_types_core::archetypes::Clear`]
165    pub include_tombstone_columns: bool,
166
167    /// Whether the `view_contents` should include static columns.
168    ///
169    /// `view_contents`: [`QueryExpression::view_contents`]
170    pub include_static_columns: StaticColumnSelection,
171
172    /// The index used to filter out _rows_ from the view contents.
173    ///
174    /// Only rows where at least 1 column contains non-null data at that index will be kept in the
175    /// final dataset.
176    ///
177    /// If left unspecified, the results will only contain static data.
178    ///
179    /// Examples: `Some(TimelineName("frame"))`, `None` (only static data).
180    //
181    // TODO(cmc): this has to be a selector otherwise this is a horrible UX.
182    pub filtered_index: Option<Index>,
183
184    /// The range of index values used to filter out _rows_ from the view contents.
185    ///
186    /// Only rows where at least 1 of the view-contents contains non-null data within that range will be kept in
187    /// the final dataset.
188    ///
189    /// * This has no effect if `filtered_index` isn't set.
190    /// * This has no effect if [`QueryExpression::using_index_values`] is set.
191    ///
192    /// Example: `AbsoluteTimeRange(10, 20)`.
193    pub filtered_index_range: Option<IndexRange>,
194
195    /// The specific index values used to filter out _rows_ from the view contents.
196    ///
197    /// Only rows where at least 1 column contains non-null data at these specific values will be kept
198    /// in the final dataset.
199    ///
200    /// * This has no effect if `filtered_index` isn't set.
201    /// * This has no effect if [`QueryExpression::using_index_values`] is set.
202    /// * Using [`TimeInt::STATIC`] as index value has no effect.
203    ///
204    /// Example: `[TimeInt(12), TimeInt(14)]`.
205    pub filtered_index_values: Option<BTreeSet<IndexValue>>,
206
207    /// The specific index values used to sample _rows_ from the view contents.
208    ///
209    /// The final dataset will contain one row per sampled index value, regardless of whether data
210    /// existed for that index value in the view contents.
211    /// The semantics of the query are consistent with all other settings: the results will be
212    /// sorted on the `filtered_index`, and only contain unique index values.
213    ///
214    /// * This has no effect if `filtered_index` isn't set.
215    /// * If set, this overrides both [`QueryExpression::filtered_index_range`] and
216    ///   [`QueryExpression::filtered_index_values`].
217    /// * Using [`TimeInt::STATIC`] as index value has no effect.
218    ///
219    /// Example: `[TimeInt(12), TimeInt(14)]`.
220    pub using_index_values: Option<BTreeSet<IndexValue>>,
221
222    /// The component column used to filter out _rows_ from the view contents.
223    ///
224    /// Only rows where this column contains non-null data be kept in the final dataset.
225    ///
226    /// Example: `ComponentColumnSelector("Points3D:positions")`.
227    //
228    // TODO(cmc): multi-pov support
229    pub filtered_is_not_null: Option<ComponentColumnSelector>,
230
231    /// Specifies how null values should be filled in the returned dataframe.
232    ///
233    /// Defaults to [`SparseFillStrategy::None`].
234    pub sparse_fill_strategy: SparseFillStrategy,
235
236    /// The specific _columns_ to sample from the final view contents.
237    ///
238    /// The order of the samples will be respected in the final result.
239    ///
240    /// Defaults to `None`, which means: everything.
241    ///
242    /// Example: `[ColumnSelector(Time("log_time")), ColumnSelector(Component("Points3D:position"))]`.
243    //
244    // TODO(cmc): the selection has to be on the QueryHandle, otherwise it's hell to use.
245    pub selection: Option<Vec<ColumnSelector>>,
246}
247
248impl QueryExpression {
249    pub fn is_static(&self) -> bool {
250        self.filtered_index.is_none()
251    }
252
253    pub fn min_latest_at(&self) -> Option<LatestAtQuery> {
254        let index = self.filtered_index?;
255
256        if let Some(using_index_values) = &self.using_index_values {
257            return Some(LatestAtQuery::new(
258                index,
259                using_index_values.first().copied()?,
260            ));
261        }
262
263        if let Some(filtered_index_values) = &self.filtered_index_values {
264            return Some(LatestAtQuery::new(
265                index,
266                filtered_index_values.first().copied()?,
267            ));
268        }
269
270        if let Some(filtered_index_range) = &self.filtered_index_range {
271            return Some(LatestAtQuery::new(index, filtered_index_range.min()));
272        }
273
274        None
275    }
276
277    pub fn max_range(&self) -> Option<RangeQuery> {
278        let index = self.filtered_index?;
279
280        if let Some(using_index_values) = &self.using_index_values {
281            return Some(RangeQuery::new(
282                index,
283                AbsoluteTimeRange::new(
284                    using_index_values.first().copied()?,
285                    using_index_values.last().copied()?,
286                ),
287            ));
288        }
289
290        if let Some(filtered_index_values) = &self.filtered_index_values {
291            return Some(RangeQuery::new(
292                index,
293                AbsoluteTimeRange::new(
294                    filtered_index_values.first().copied()?,
295                    filtered_index_values.last().copied()?,
296                ),
297            ));
298        }
299
300        if let Some(filtered_index_range) = &self.filtered_index_range {
301            return Some(RangeQuery::new(index, *filtered_index_range));
302        }
303
304        None
305    }
306}
307
308// ---
309
310impl ChunkStore {
311    /// Returns the full schema of the store.
312    ///
313    /// This will include a column descriptor for every timeline and every component on every
314    /// entity that has been written to the store so far.
315    ///
316    /// The order of the columns is guaranteed to be in a specific order:
317    /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
318    /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
319    pub fn schema(&self) -> ChunkColumnDescriptors {
320        re_tracing::profile_function!();
321
322        let indices = self
323            .timelines()
324            .values()
325            .map(|timeline| IndexColumnDescriptor::from(*timeline))
326            .collect();
327
328        let components = self
329            .per_column_metadata
330            .iter()
331            .flat_map(|(entity_path, per_identifier)| {
332                per_identifier
333                    .values()
334                    .map(move |(descr, _, datatype)| (entity_path, descr, datatype))
335            })
336            .filter_map(|(entity_path, component_descr, datatype)| {
337                let metadata =
338                    self.lookup_column_metadata(entity_path, component_descr.component)?;
339
340                Some(((entity_path, component_descr), (metadata, datatype)))
341            })
342            .map(|((entity_path, component_descr), (metadata, datatype))| {
343                let ColumnMetadata {
344                    is_static,
345                    is_tombstone,
346                    is_semantically_empty,
347                } = metadata;
348
349                if let Some(c) = component_descr.component_type {
350                    c.sanity_check();
351                }
352
353                ComponentColumnDescriptor {
354                    // NOTE: The data is always a at least a list, whether it's latest-at or range.
355                    // It might be wrapped further in e.g. a dict, but at the very least
356                    // it's a list.
357                    store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR(
358                        ArrowField::new("item", datatype.clone(), true).into(),
359                    ),
360
361                    entity_path: entity_path.clone(),
362                    archetype: component_descr.archetype,
363                    component: component_descr.component,
364                    component_type: component_descr.component_type,
365                    is_static,
366                    is_tombstone,
367                    is_semantically_empty,
368                }
369            })
370            .collect_vec()
371            .tap_mut(|components| components.sort());
372
373        ChunkColumnDescriptors {
374            row_id: self.row_id_descriptor(),
375            indices,
376            components,
377        }
378        .tap(|schema| schema.sanity_check())
379    }
380
381    #[expect(clippy::unused_self)]
382    pub fn row_id_descriptor(&self) -> re_sorbet::RowIdColumnDescriptor {
383        re_sorbet::RowIdColumnDescriptor::from_sorted(false)
384    }
385
386    /// Given a [`TimeColumnSelector`], returns the corresponding [`IndexColumnDescriptor`].
387    pub fn resolve_time_selector(&self, selector: &TimeColumnSelector) -> IndexColumnDescriptor {
388        let timelines = self.timelines();
389
390        let timeline = timelines
391            .get(&selector.timeline)
392            .copied()
393            .unwrap_or_else(|| {
394                re_log::warn_once!("Unknown timeline {selector:?}; assuming sequence timeline.");
395                Timeline::new_sequence(selector.timeline)
396            });
397
398        IndexColumnDescriptor::from(timeline)
399    }
400
401    /// Given a [`ComponentColumnSelector`], returns the corresponding [`ComponentColumnDescriptor`].
402    ///
403    /// If the component is not found in the store, a default descriptor is returned with a null datatype.
404    pub fn resolve_component_selector(
405        &self,
406        selector: &ComponentColumnSelector,
407    ) -> ComponentColumnDescriptor {
408        // Unfortunately, we can't return an error here, so we craft a default descriptor and
409        // add information to it that we find.
410
411        // TODO(#7699) This currently interns every string ever queried which could be wasteful, especially
412        // in long-running servers. In practice this probably doesn't matter.
413        let mut result = ComponentColumnDescriptor {
414            store_datatype: ArrowDatatype::Null,
415            component_type: None,
416            entity_path: selector.entity_path.clone(),
417            archetype: None,
418            component: selector.component.as_str().into(),
419            is_static: false,
420            is_tombstone: false,
421            is_semantically_empty: false,
422        };
423
424        let Some(per_identifier) = self.per_column_metadata.get(&selector.entity_path) else {
425            return result;
426        };
427
428        // We perform a scan over all component descriptors in the queried entity path.
429        let Some((component_descr, _, datatype)) =
430            per_identifier.get(&selector.component.as_str().into())
431        else {
432            return result;
433        };
434        result.store_datatype = datatype.clone();
435        result.archetype = component_descr.archetype;
436        result.component_type = component_descr.component_type;
437
438        if let Some(ColumnMetadata {
439            is_static,
440            is_tombstone,
441            is_semantically_empty,
442        }) = self.lookup_column_metadata(&selector.entity_path, component_descr.component)
443        {
444            result.is_static = is_static;
445            result.is_tombstone = is_tombstone;
446            result.is_semantically_empty = is_semantically_empty;
447        }
448
449        result
450    }
451
452    /// Returns the filtered schema for the given [`QueryExpression`].
453    ///
454    /// The order of the columns is guaranteed to be in a specific order:
455    /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
456    /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
457    pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
458        re_tracing::profile_function!();
459
460        let filter = Self::create_component_filter_from_query(query);
461
462        self.schema().filter_components(filter)
463    }
464
465    pub fn create_component_filter_from_query(
466        query: &QueryExpression,
467    ) -> impl Fn(&ComponentColumnDescriptor) -> bool {
468        let QueryExpression {
469            view_contents,
470            include_semantically_empty_columns,
471            include_tombstone_columns,
472            include_static_columns,
473            filtered_index: _,
474            filtered_index_range: _,
475            filtered_index_values: _,
476            using_index_values: _,
477            filtered_is_not_null: _,
478            sparse_fill_strategy: _,
479            selection: _,
480        } = query;
481
482        move |column: &ComponentColumnDescriptor| {
483            let is_part_of_view_contents = || {
484                view_contents.as_ref().is_none_or(|view_contents| {
485                    view_contents
486                        .get(&column.entity_path)
487                        .is_some_and(|components| {
488                            components
489                                .as_ref()
490                                .is_none_or(|components| components.contains(&column.component))
491                        })
492                })
493            };
494
495            let passes_semantically_empty_check =
496                || *include_semantically_empty_columns || !column.is_semantically_empty;
497
498            let passes_tombstone_check = || *include_tombstone_columns || !column.is_tombstone;
499
500            let passes_static_check = || match include_static_columns {
501                StaticColumnSelection::Both => true,
502                StaticColumnSelection::StaticOnly => column.is_static,
503                StaticColumnSelection::NonStaticOnly => !column.is_static,
504            };
505
506            is_part_of_view_contents()
507                && passes_semantically_empty_check()
508                && passes_tombstone_check()
509                && passes_static_check()
510        }
511    }
512}