re_chunk_store/
dataframe.rs

1//! All the APIs used specifically for `re_dataframe`.
2
3use std::{
4    collections::{BTreeMap, BTreeSet},
5    ops::{Deref, DerefMut},
6};
7
8use arrow::{
9    array::ListArray as ArrowListArray,
10    datatypes::{DataType as ArrowDatatype, Field as ArrowField},
11};
12use itertools::Itertools as _;
13
14use crate::{ChunkStore, ColumnMetadata};
15use re_chunk::{ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
16use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, Timeline};
17use re_sorbet::{
18    ChunkColumnDescriptors, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
19    IndexColumnDescriptor, TimeColumnSelector,
20};
21use tap::Tap as _;
22
23// --- Queries v2 ---
24
25/// Specifies how null values should be filled in the returned dataframe.
26#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
27pub enum SparseFillStrategy {
28    /// No sparse filling. Nulls stay nulls.
29    #[default]
30    None,
31
32    /// Fill null values using global-scope latest-at semantics.
33    ///
34    /// The latest-at semantics are applied on the entire dataset as opposed to just the current
35    /// view contents: it is possible to end up with values from outside the view!
36    LatestAtGlobal,
37    //
38    // TODO(cmc): `LatestAtView`?
39}
40
41impl std::fmt::Display for SparseFillStrategy {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            Self::None => f.write_str("none"),
45            Self::LatestAtGlobal => f.write_str("latest-at (global)"),
46        }
47    }
48}
49
50/// The view contents specify which subset of the database (i.e., which columns) the query runs on.
51///
52/// Contents are expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
53///
54/// Setting an entity's identifier to `None` means: everything.
55///
56// TODO(cmc): we need to be able to build that easily in a command-line context, otherwise it's just
57// very annoying. E.g. `--with /world/points:[positions, radius] --with /cam:[pinhole]`.
58#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
59pub struct ViewContentsSelector(pub BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>);
60
61impl ViewContentsSelector {
62    pub fn into_inner(self) -> BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>> {
63        self.0
64    }
65}
66
67impl Deref for ViewContentsSelector {
68    type Target = BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>;
69
70    #[inline]
71    fn deref(&self) -> &Self::Target {
72        &self.0
73    }
74}
75
76impl DerefMut for ViewContentsSelector {
77    #[inline]
78    fn deref_mut(&mut self) -> &mut Self::Target {
79        &mut self.0
80    }
81}
82
83impl FromIterator<(EntityPath, Option<BTreeSet<ComponentIdentifier>>)> for ViewContentsSelector {
84    fn from_iter<T: IntoIterator<Item = (EntityPath, Option<BTreeSet<ComponentIdentifier>>)>>(
85        iter: T,
86    ) -> Self {
87        Self(iter.into_iter().collect())
88    }
89}
90
91// TODO(cmc): Ultimately, this shouldn't be hardcoded to `Timeline`, but to a generic `I: Index`.
92//            `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
93pub type Index = TimelineName;
94
95// TODO(cmc): Ultimately, this shouldn't be hardcoded to `TimeInt`, but to a generic `I: Index`.
96//            `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
97pub type IndexValue = TimeInt;
98
99// TODO(cmc): Ultimately, this shouldn't be hardcoded to `AbsoluteTimeRange`, but to a generic `I: Index`.
100//            `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
101pub type IndexRange = AbsoluteTimeRange;
102
103/// Specifies whether static columns should be included in the query.
104#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
105pub enum StaticColumnSelection {
106    /// Both static and non-static columns should be included in the query.
107    #[default]
108    Both,
109
110    /// Only static columns should be included in the query.
111    StaticOnly,
112
113    /// Only non-static columns should be included in the query.
114    NonStaticOnly,
115}
116
117/// Describes a complete query for Rerun's dataframe API.
118///
119/// ## Terminology: view vs. selection vs. filtering vs. sampling
120///
121/// * The view contents specify which subset of the database (i.e., which columns) the query runs on,
122///   expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
123///
124/// * The filters filter out _rows_ of data from the view contents.
125///   A filter cannot possibly introduce new rows, it can only remove existing ones from the view contents.
126///
127/// * The samplers sample _rows_ of data from the view contents at user-specified values.
128///   Samplers don't necessarily return existing rows: they might introduce new ones if the sampled value
129///   isn't present in the view contents in the first place.
130///
131/// * The selection applies last and samples _columns_ of data from the filtered/sampled view contents.
132///   Selecting a column that isn't present in the view contents results in an empty column in the
133///   final dataframe (null array).
134///
135/// A very rough mental model, in SQL terms:
136/// ```text
137/// SELECT <Self::selection> FROM <Self::view_contents> WHERE <Self::filtered_*>
138/// ```
139//
140// TODO(cmc): ideally we'd like this to be the same type as the one used in the blueprint, possibly?
141#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
142pub struct QueryExpression {
143    /// The subset of the database that the query will run on: a set of [`EntityPath`]s and their
144    /// associated [`re_types_core::ComponentIdentifier`]s.
145    ///
146    /// Defaults to `None`, which means: everything.
147    ///
148    /// Example (pseudo-code):
149    /// ```text
150    /// view_contents = {
151    ///   "world/points": [rr.Position3D, rr.Radius],
152    ///   "metrics": [rr.Scalars]
153    /// }
154    /// ```
155    pub view_contents: Option<ViewContentsSelector>,
156
157    /// Whether the `view_contents` should ignore semantically empty columns.
158    ///
159    /// A semantically empty column is a column that either contains no data at all, or where all
160    /// values are either nulls or empty arrays (`[]`).
161    ///
162    /// `view_contents`: [`QueryExpression::view_contents`]
163    pub include_semantically_empty_columns: bool,
164
165    /// Whether the `view_contents` should ignore columns corresponding to `Clear`-related components.
166    ///
167    /// `view_contents`: [`QueryExpression::view_contents`]
168    /// `Clear`: [`re_types_core::archetypes::Clear`]
169    pub include_tombstone_columns: bool,
170
171    /// Whether the `view_contents` should include static columns.
172    ///
173    /// `view_contents`: [`QueryExpression::view_contents`]
174    pub include_static_columns: StaticColumnSelection,
175
176    /// The index used to filter out _rows_ from the view contents.
177    ///
178    /// Only rows where at least 1 column contains non-null data at that index will be kept in the
179    /// final dataset.
180    ///
181    /// If left unspecified, the results will only contain static data.
182    ///
183    /// Examples: `Some(TimelineName("frame"))`, `None` (only static data).
184    //
185    // TODO(cmc): this has to be a selector otherwise this is a horrible UX.
186    pub filtered_index: Option<Index>,
187
188    /// The range of index values used to filter out _rows_ from the view contents.
189    ///
190    /// Only rows where at least 1 of the view-contents contains non-null data within that range will be kept in
191    /// the final dataset.
192    ///
193    /// * This has no effect if `filtered_index` isn't set.
194    /// * This has no effect if [`QueryExpression::using_index_values`] is set.
195    ///
196    /// Example: `AbsoluteTimeRange(10, 20)`.
197    pub filtered_index_range: Option<IndexRange>,
198
199    /// The specific index values used to filter out _rows_ from the view contents.
200    ///
201    /// Only rows where at least 1 column contains non-null data at these specific values will be kept
202    /// in the final dataset.
203    ///
204    /// * This has no effect if `filtered_index` isn't set.
205    /// * This has no effect if [`QueryExpression::using_index_values`] is set.
206    /// * Using [`TimeInt::STATIC`] as index value has no effect.
207    ///
208    /// Example: `[TimeInt(12), TimeInt(14)]`.
209    pub filtered_index_values: Option<BTreeSet<IndexValue>>,
210
211    /// The specific index values used to sample _rows_ from the view contents.
212    ///
213    /// The final dataset will contain one row per sampled index value, regardless of whether data
214    /// existed for that index value in the view contents.
215    /// The semantics of the query are consistent with all other settings: the results will be
216    /// sorted on the `filtered_index`, and only contain unique index values.
217    ///
218    /// * This has no effect if `filtered_index` isn't set.
219    /// * If set, this overrides both [`QueryExpression::filtered_index_range`] and
220    ///   [`QueryExpression::filtered_index_values`].
221    /// * Using [`TimeInt::STATIC`] as index value has no effect.
222    ///
223    /// Example: `[TimeInt(12), TimeInt(14)]`.
224    pub using_index_values: Option<BTreeSet<IndexValue>>,
225
226    /// The component column used to filter out _rows_ from the view contents.
227    ///
228    /// Only rows where this column contains non-null data be kept in the final dataset.
229    ///
230    /// Example: `ComponentColumnSelector("Points3D:positions")`.
231    //
232    // TODO(cmc): multi-pov support
233    pub filtered_is_not_null: Option<ComponentColumnSelector>,
234
235    /// Specifies how null values should be filled in the returned dataframe.
236    ///
237    /// Defaults to [`SparseFillStrategy::None`].
238    pub sparse_fill_strategy: SparseFillStrategy,
239
240    /// The specific _columns_ to sample from the final view contents.
241    ///
242    /// The order of the samples will be respected in the final result.
243    ///
244    /// Defaults to `None`, which means: everything.
245    ///
246    /// Example: `[ColumnSelector(Time("log_time")), ColumnSelector(Component("Points3D:position"))]`.
247    //
248    // TODO(cmc): the selection has to be on the QueryHandle, otherwise it's hell to use.
249    pub selection: Option<Vec<ColumnSelector>>,
250}
251
252impl QueryExpression {
253    pub fn is_static(&self) -> bool {
254        self.filtered_index.is_none()
255    }
256
257    pub fn min_latest_at(&self) -> Option<LatestAtQuery> {
258        let index = self.filtered_index?;
259
260        if let Some(using_index_values) = &self.using_index_values {
261            return Some(LatestAtQuery::new(
262                index,
263                using_index_values.first().copied()?,
264            ));
265        }
266
267        if let Some(filtered_index_values) = &self.filtered_index_values {
268            return Some(LatestAtQuery::new(
269                index,
270                filtered_index_values.first().copied()?,
271            ));
272        }
273
274        if let Some(filtered_index_range) = &self.filtered_index_range {
275            return Some(LatestAtQuery::new(index, filtered_index_range.min()));
276        }
277
278        None
279    }
280
281    pub fn max_range(&self) -> Option<RangeQuery> {
282        let index = self.filtered_index?;
283
284        if let Some(using_index_values) = &self.using_index_values {
285            return Some(RangeQuery::new(
286                index,
287                AbsoluteTimeRange::new(
288                    using_index_values.first().copied()?,
289                    using_index_values.last().copied()?,
290                ),
291            ));
292        }
293
294        if let Some(filtered_index_values) = &self.filtered_index_values {
295            return Some(RangeQuery::new(
296                index,
297                AbsoluteTimeRange::new(
298                    filtered_index_values.first().copied()?,
299                    filtered_index_values.last().copied()?,
300                ),
301            ));
302        }
303
304        if let Some(filtered_index_range) = &self.filtered_index_range {
305            return Some(RangeQuery::new(index, *filtered_index_range));
306        }
307
308        None
309    }
310}
311
312// ---
313
314impl ChunkStore {
315    /// Returns the full schema of the store.
316    ///
317    /// This will include a column descriptor for every timeline and every component on every
318    /// entity that has been written to the store so far.
319    ///
320    /// The order of the columns is guaranteed to be in a specific order:
321    /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
322    /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
323    pub fn schema(&self) -> ChunkColumnDescriptors {
324        re_tracing::profile_function!();
325
326        let indices = self
327            .timelines()
328            .values()
329            .map(|timeline| IndexColumnDescriptor::from(*timeline))
330            .collect();
331
332        let components = self
333            .per_column_metadata
334            .iter()
335            .flat_map(|(entity_path, per_identifier)| {
336                per_identifier
337                    .values()
338                    .map(move |(descr, _, datatype)| (entity_path, descr, datatype))
339            })
340            .filter_map(|(entity_path, component_descr, datatype)| {
341                let metadata =
342                    self.lookup_column_metadata(entity_path, component_descr.component)?;
343
344                Some(((entity_path, component_descr), (metadata, datatype)))
345            })
346            .map(|((entity_path, component_descr), (metadata, datatype))| {
347                let ColumnMetadata {
348                    is_static,
349                    is_tombstone,
350                    is_semantically_empty,
351                } = metadata;
352
353                if let Some(c) = component_descr.component_type {
354                    c.sanity_check();
355                }
356
357                ComponentColumnDescriptor {
358                    // NOTE: The data is always a at least a list, whether it's latest-at or range.
359                    // It might be wrapped further in e.g. a dict, but at the very least
360                    // it's a list.
361                    store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR(
362                        ArrowField::new("item", datatype.clone(), true).into(),
363                    ),
364
365                    entity_path: entity_path.clone(),
366                    archetype: component_descr.archetype,
367                    component: component_descr.component,
368                    component_type: component_descr.component_type,
369                    is_static,
370                    is_tombstone,
371                    is_semantically_empty,
372                }
373            })
374            .collect_vec()
375            .tap_mut(|components| components.sort());
376
377        ChunkColumnDescriptors {
378            row_id: self.row_id_descriptor(),
379            indices,
380            components,
381        }
382        .tap(|schema| schema.sanity_check())
383    }
384
385    #[expect(clippy::unused_self)]
386    pub fn row_id_descriptor(&self) -> re_sorbet::RowIdColumnDescriptor {
387        re_sorbet::RowIdColumnDescriptor::from_sorted(false)
388    }
389
390    /// Given a [`TimeColumnSelector`], returns the corresponding [`IndexColumnDescriptor`].
391    pub fn resolve_time_selector(&self, selector: &TimeColumnSelector) -> IndexColumnDescriptor {
392        let timelines = self.timelines();
393
394        let timeline = timelines
395            .get(&selector.timeline)
396            .copied()
397            .unwrap_or_else(|| {
398                re_log::warn_once!("Unknown timeline {selector:?}; assuming sequence timeline.");
399                Timeline::new_sequence(selector.timeline)
400            });
401
402        IndexColumnDescriptor::from(timeline)
403    }
404
405    /// Given a [`ComponentColumnSelector`], returns the corresponding [`ComponentColumnDescriptor`].
406    ///
407    /// If the component is not found in the store, a default descriptor is returned with a null datatype.
408    pub fn resolve_component_selector(
409        &self,
410        selector: &ComponentColumnSelector,
411    ) -> ComponentColumnDescriptor {
412        // Unfortunately, we can't return an error here, so we craft a default descriptor and
413        // add information to it that we find.
414
415        // TODO(#7699) This currently interns every string ever queried which could be wasteful, especially
416        // in long-running servers. In practice this probably doesn't matter.
417        let mut result = ComponentColumnDescriptor {
418            store_datatype: ArrowDatatype::Null,
419            component_type: None,
420            entity_path: selector.entity_path.clone(),
421            archetype: None,
422            component: selector.component.as_str().into(),
423            is_static: false,
424            is_tombstone: false,
425            is_semantically_empty: false,
426        };
427
428        let Some(per_identifier) = self.per_column_metadata.get(&selector.entity_path) else {
429            return result;
430        };
431
432        // We perform a scan over all component descriptors in the queried entity path.
433        let Some((component_descr, _, datatype)) =
434            per_identifier.get(&selector.component.as_str().into())
435        else {
436            return result;
437        };
438        result.store_datatype = datatype.clone();
439        result.archetype = component_descr.archetype;
440        result.component_type = component_descr.component_type;
441
442        if let Some(ColumnMetadata {
443            is_static,
444            is_tombstone,
445            is_semantically_empty,
446        }) = self.lookup_column_metadata(&selector.entity_path, component_descr.component)
447        {
448            result.is_static = is_static;
449            result.is_tombstone = is_tombstone;
450            result.is_semantically_empty = is_semantically_empty;
451        }
452
453        result
454    }
455
456    /// Returns the filtered schema for the given [`QueryExpression`].
457    ///
458    /// The order of the columns is guaranteed to be in a specific order:
459    /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
460    /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
461    pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
462        re_tracing::profile_function!();
463
464        let filter = Self::create_component_filter_from_query(query);
465
466        self.schema().filter_components(filter)
467    }
468
469    pub fn create_component_filter_from_query(
470        query: &QueryExpression,
471    ) -> impl Fn(&ComponentColumnDescriptor) -> bool {
472        let QueryExpression {
473            view_contents,
474            include_semantically_empty_columns,
475            include_tombstone_columns,
476            include_static_columns,
477            filtered_index: _,
478            filtered_index_range: _,
479            filtered_index_values: _,
480            using_index_values: _,
481            filtered_is_not_null: _,
482            sparse_fill_strategy: _,
483            selection: _,
484        } = query;
485
486        move |column: &ComponentColumnDescriptor| {
487            let is_part_of_view_contents = || {
488                view_contents.as_ref().is_none_or(|view_contents| {
489                    view_contents
490                        .get(&column.entity_path)
491                        .is_some_and(|components| {
492                            components
493                                .as_ref()
494                                .is_none_or(|components| components.contains(&column.component))
495                        })
496                })
497            };
498
499            let passes_semantically_empty_check =
500                || *include_semantically_empty_columns || !column.is_semantically_empty;
501
502            let passes_tombstone_check = || *include_tombstone_columns || !column.is_tombstone;
503
504            let passes_static_check = || match include_static_columns {
505                StaticColumnSelection::Both => true,
506                StaticColumnSelection::StaticOnly => column.is_static,
507                StaticColumnSelection::NonStaticOnly => !column.is_static,
508            };
509
510            is_part_of_view_contents()
511                && passes_semantically_empty_check()
512                && passes_tombstone_check()
513                && passes_static_check()
514        }
515    }
516}