re_chunk_store/dataframe.rs
1//! All the APIs used specifically for `re_dataframe`.
2
3use std::{
4 collections::{BTreeMap, BTreeSet},
5 ops::{Deref, DerefMut},
6};
7
8use arrow::{
9 array::ListArray as ArrowListArray,
10 datatypes::{DataType as ArrowDatatype, Field as ArrowField},
11};
12use itertools::Itertools as _;
13
14use crate::{ChunkStore, ColumnMetadata};
15use re_chunk::{ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
16use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, Timeline};
17use re_sorbet::{
18 ChunkColumnDescriptors, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
19 IndexColumnDescriptor, TimeColumnSelector,
20};
21use tap::Tap as _;
22
23// --- Queries v2 ---
24
25/// Specifies how null values should be filled in the returned dataframe.
26#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
27pub enum SparseFillStrategy {
28 /// No sparse filling. Nulls stay nulls.
29 #[default]
30 None,
31
32 /// Fill null values using global-scope latest-at semantics.
33 ///
34 /// The latest-at semantics are applied on the entire dataset as opposed to just the current
35 /// view contents: it is possible to end up with values from outside the view!
36 LatestAtGlobal,
37 //
38 // TODO(cmc): `LatestAtView`?
39}
40
41impl std::fmt::Display for SparseFillStrategy {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 Self::None => f.write_str("none"),
45 Self::LatestAtGlobal => f.write_str("latest-at (global)"),
46 }
47 }
48}
49
50/// The view contents specify which subset of the database (i.e., which columns) the query runs on.
51///
52/// Contents are expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
53///
54/// Setting an entity's identifier to `None` means: everything.
55///
56// TODO(cmc): we need to be able to build that easily in a command-line context, otherwise it's just
57// very annoying. E.g. `--with /world/points:[positions, radius] --with /cam:[pinhole]`.
58#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
59pub struct ViewContentsSelector(pub BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>);
60
61impl ViewContentsSelector {
62 pub fn into_inner(self) -> BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>> {
63 self.0
64 }
65}
66
67impl Deref for ViewContentsSelector {
68 type Target = BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>;
69
70 #[inline]
71 fn deref(&self) -> &Self::Target {
72 &self.0
73 }
74}
75
76impl DerefMut for ViewContentsSelector {
77 #[inline]
78 fn deref_mut(&mut self) -> &mut Self::Target {
79 &mut self.0
80 }
81}
82
83impl FromIterator<(EntityPath, Option<BTreeSet<ComponentIdentifier>>)> for ViewContentsSelector {
84 fn from_iter<T: IntoIterator<Item = (EntityPath, Option<BTreeSet<ComponentIdentifier>>)>>(
85 iter: T,
86 ) -> Self {
87 Self(iter.into_iter().collect())
88 }
89}
90
91// TODO(cmc): Ultimately, this shouldn't be hardcoded to `Timeline`, but to a generic `I: Index`.
92// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
93pub type Index = TimelineName;
94
95// TODO(cmc): Ultimately, this shouldn't be hardcoded to `TimeInt`, but to a generic `I: Index`.
96// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
97pub type IndexValue = TimeInt;
98
99// TODO(cmc): Ultimately, this shouldn't be hardcoded to `AbsoluteTimeRange`, but to a generic `I: Index`.
100// `Index` in this case should also be implemented on tuples (`(I1, I2, ...)`).
101pub type IndexRange = AbsoluteTimeRange;
102
103/// Specifies whether static columns should be included in the query.
104#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
105pub enum StaticColumnSelection {
106 /// Both static and non-static columns should be included in the query.
107 #[default]
108 Both,
109
110 /// Only static columns should be included in the query.
111 StaticOnly,
112
113 /// Only non-static columns should be included in the query.
114 NonStaticOnly,
115}
116
117/// Describes a complete query for Rerun's dataframe API.
118///
119/// ## Terminology: view vs. selection vs. filtering vs. sampling
120///
121/// * The view contents specify which subset of the database (i.e., which columns) the query runs on,
122/// expressed as a set of [`EntityPath`]s and their associated [`re_types_core::ComponentIdentifier`]s.
123///
124/// * The filters filter out _rows_ of data from the view contents.
125/// A filter cannot possibly introduce new rows, it can only remove existing ones from the view contents.
126///
127/// * The samplers sample _rows_ of data from the view contents at user-specified values.
128/// Samplers don't necessarily return existing rows: they might introduce new ones if the sampled value
129/// isn't present in the view contents in the first place.
130///
131/// * The selection applies last and samples _columns_ of data from the filtered/sampled view contents.
132/// Selecting a column that isn't present in the view contents results in an empty column in the
133/// final dataframe (null array).
134///
135/// A very rough mental model, in SQL terms:
136/// ```text
137/// SELECT <Self::selection> FROM <Self::view_contents> WHERE <Self::filtered_*>
138/// ```
139//
140// TODO(cmc): ideally we'd like this to be the same type as the one used in the blueprint, possibly?
141#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
142pub struct QueryExpression {
143 /// The subset of the database that the query will run on: a set of [`EntityPath`]s and their
144 /// associated [`re_types_core::ComponentIdentifier`]s.
145 ///
146 /// Defaults to `None`, which means: everything.
147 ///
148 /// Example (pseudo-code):
149 /// ```text
150 /// view_contents = {
151 /// "world/points": [rr.Position3D, rr.Radius],
152 /// "metrics": [rr.Scalars]
153 /// }
154 /// ```
155 pub view_contents: Option<ViewContentsSelector>,
156
157 /// Whether the `view_contents` should ignore semantically empty columns.
158 ///
159 /// A semantically empty column is a column that either contains no data at all, or where all
160 /// values are either nulls or empty arrays (`[]`).
161 ///
162 /// `view_contents`: [`QueryExpression::view_contents`]
163 pub include_semantically_empty_columns: bool,
164
165 /// Whether the `view_contents` should ignore columns corresponding to `Clear`-related components.
166 ///
167 /// `view_contents`: [`QueryExpression::view_contents`]
168 /// `Clear`: [`re_types_core::archetypes::Clear`]
169 pub include_tombstone_columns: bool,
170
171 /// Whether the `view_contents` should include static columns.
172 ///
173 /// `view_contents`: [`QueryExpression::view_contents`]
174 pub include_static_columns: StaticColumnSelection,
175
176 /// The index used to filter out _rows_ from the view contents.
177 ///
178 /// Only rows where at least 1 column contains non-null data at that index will be kept in the
179 /// final dataset.
180 ///
181 /// If left unspecified, the results will only contain static data.
182 ///
183 /// Examples: `Some(TimelineName("frame"))`, `None` (only static data).
184 //
185 // TODO(cmc): this has to be a selector otherwise this is a horrible UX.
186 pub filtered_index: Option<Index>,
187
188 /// The range of index values used to filter out _rows_ from the view contents.
189 ///
190 /// Only rows where at least 1 of the view-contents contains non-null data within that range will be kept in
191 /// the final dataset.
192 ///
193 /// * This has no effect if `filtered_index` isn't set.
194 /// * This has no effect if [`QueryExpression::using_index_values`] is set.
195 ///
196 /// Example: `AbsoluteTimeRange(10, 20)`.
197 pub filtered_index_range: Option<IndexRange>,
198
199 /// The specific index values used to filter out _rows_ from the view contents.
200 ///
201 /// Only rows where at least 1 column contains non-null data at these specific values will be kept
202 /// in the final dataset.
203 ///
204 /// * This has no effect if `filtered_index` isn't set.
205 /// * This has no effect if [`QueryExpression::using_index_values`] is set.
206 /// * Using [`TimeInt::STATIC`] as index value has no effect.
207 ///
208 /// Example: `[TimeInt(12), TimeInt(14)]`.
209 pub filtered_index_values: Option<BTreeSet<IndexValue>>,
210
211 /// The specific index values used to sample _rows_ from the view contents.
212 ///
213 /// The final dataset will contain one row per sampled index value, regardless of whether data
214 /// existed for that index value in the view contents.
215 /// The semantics of the query are consistent with all other settings: the results will be
216 /// sorted on the `filtered_index`, and only contain unique index values.
217 ///
218 /// * This has no effect if `filtered_index` isn't set.
219 /// * If set, this overrides both [`QueryExpression::filtered_index_range`] and
220 /// [`QueryExpression::filtered_index_values`].
221 /// * Using [`TimeInt::STATIC`] as index value has no effect.
222 ///
223 /// Example: `[TimeInt(12), TimeInt(14)]`.
224 pub using_index_values: Option<BTreeSet<IndexValue>>,
225
226 /// The component column used to filter out _rows_ from the view contents.
227 ///
228 /// Only rows where this column contains non-null data be kept in the final dataset.
229 ///
230 /// Example: `ComponentColumnSelector("Points3D:positions")`.
231 //
232 // TODO(cmc): multi-pov support
233 pub filtered_is_not_null: Option<ComponentColumnSelector>,
234
235 /// Specifies how null values should be filled in the returned dataframe.
236 ///
237 /// Defaults to [`SparseFillStrategy::None`].
238 pub sparse_fill_strategy: SparseFillStrategy,
239
240 /// The specific _columns_ to sample from the final view contents.
241 ///
242 /// The order of the samples will be respected in the final result.
243 ///
244 /// Defaults to `None`, which means: everything.
245 ///
246 /// Example: `[ColumnSelector(Time("log_time")), ColumnSelector(Component("Points3D:position"))]`.
247 //
248 // TODO(cmc): the selection has to be on the QueryHandle, otherwise it's hell to use.
249 pub selection: Option<Vec<ColumnSelector>>,
250}
251
252impl QueryExpression {
253 pub fn is_static(&self) -> bool {
254 self.filtered_index.is_none()
255 }
256
257 pub fn min_latest_at(&self) -> Option<LatestAtQuery> {
258 let index = self.filtered_index?;
259
260 if let Some(using_index_values) = &self.using_index_values {
261 return Some(LatestAtQuery::new(
262 index,
263 using_index_values.first().copied()?,
264 ));
265 }
266
267 if let Some(filtered_index_values) = &self.filtered_index_values {
268 return Some(LatestAtQuery::new(
269 index,
270 filtered_index_values.first().copied()?,
271 ));
272 }
273
274 if let Some(filtered_index_range) = &self.filtered_index_range {
275 return Some(LatestAtQuery::new(index, filtered_index_range.min()));
276 }
277
278 None
279 }
280
281 pub fn max_range(&self) -> Option<RangeQuery> {
282 let index = self.filtered_index?;
283
284 if let Some(using_index_values) = &self.using_index_values {
285 return Some(RangeQuery::new(
286 index,
287 AbsoluteTimeRange::new(
288 using_index_values.first().copied()?,
289 using_index_values.last().copied()?,
290 ),
291 ));
292 }
293
294 if let Some(filtered_index_values) = &self.filtered_index_values {
295 return Some(RangeQuery::new(
296 index,
297 AbsoluteTimeRange::new(
298 filtered_index_values.first().copied()?,
299 filtered_index_values.last().copied()?,
300 ),
301 ));
302 }
303
304 if let Some(filtered_index_range) = &self.filtered_index_range {
305 return Some(RangeQuery::new(index, *filtered_index_range));
306 }
307
308 None
309 }
310}
311
312// ---
313
314impl ChunkStore {
315 /// Returns the full schema of the store.
316 ///
317 /// This will include a column descriptor for every timeline and every component on every
318 /// entity that has been written to the store so far.
319 ///
320 /// The order of the columns is guaranteed to be in a specific order:
321 /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
322 /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
323 pub fn schema(&self) -> ChunkColumnDescriptors {
324 re_tracing::profile_function!();
325
326 let indices = self
327 .timelines()
328 .values()
329 .map(|timeline| IndexColumnDescriptor::from(*timeline))
330 .collect();
331
332 let components = self
333 .per_column_metadata
334 .iter()
335 .flat_map(|(entity_path, per_identifier)| {
336 per_identifier
337 .values()
338 .map(move |(descr, _, datatype)| (entity_path, descr, datatype))
339 })
340 .filter_map(|(entity_path, component_descr, datatype)| {
341 let metadata =
342 self.lookup_column_metadata(entity_path, component_descr.component)?;
343
344 Some(((entity_path, component_descr), (metadata, datatype)))
345 })
346 .map(|((entity_path, component_descr), (metadata, datatype))| {
347 let ColumnMetadata {
348 is_static,
349 is_tombstone,
350 is_semantically_empty,
351 } = metadata;
352
353 if let Some(c) = component_descr.component_type {
354 c.sanity_check();
355 }
356
357 ComponentColumnDescriptor {
358 // NOTE: The data is always a at least a list, whether it's latest-at or range.
359 // It might be wrapped further in e.g. a dict, but at the very least
360 // it's a list.
361 store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR(
362 ArrowField::new("item", datatype.clone(), true).into(),
363 ),
364
365 entity_path: entity_path.clone(),
366 archetype: component_descr.archetype,
367 component: component_descr.component,
368 component_type: component_descr.component_type,
369 is_static,
370 is_tombstone,
371 is_semantically_empty,
372 }
373 })
374 .collect_vec()
375 .tap_mut(|components| components.sort());
376
377 ChunkColumnDescriptors {
378 row_id: self.row_id_descriptor(),
379 indices,
380 components,
381 }
382 .tap(|schema| schema.sanity_check())
383 }
384
385 #[expect(clippy::unused_self)]
386 pub fn row_id_descriptor(&self) -> re_sorbet::RowIdColumnDescriptor {
387 re_sorbet::RowIdColumnDescriptor::from_sorted(false)
388 }
389
390 /// Given a [`TimeColumnSelector`], returns the corresponding [`IndexColumnDescriptor`].
391 pub fn resolve_time_selector(&self, selector: &TimeColumnSelector) -> IndexColumnDescriptor {
392 let timelines = self.timelines();
393
394 let timeline = timelines
395 .get(&selector.timeline)
396 .copied()
397 .unwrap_or_else(|| {
398 re_log::warn_once!("Unknown timeline {selector:?}; assuming sequence timeline.");
399 Timeline::new_sequence(selector.timeline)
400 });
401
402 IndexColumnDescriptor::from(timeline)
403 }
404
405 /// Given a [`ComponentColumnSelector`], returns the corresponding [`ComponentColumnDescriptor`].
406 ///
407 /// If the component is not found in the store, a default descriptor is returned with a null datatype.
408 pub fn resolve_component_selector(
409 &self,
410 selector: &ComponentColumnSelector,
411 ) -> ComponentColumnDescriptor {
412 // Unfortunately, we can't return an error here, so we craft a default descriptor and
413 // add information to it that we find.
414
415 // TODO(#7699) This currently interns every string ever queried which could be wasteful, especially
416 // in long-running servers. In practice this probably doesn't matter.
417 let mut result = ComponentColumnDescriptor {
418 store_datatype: ArrowDatatype::Null,
419 component_type: None,
420 entity_path: selector.entity_path.clone(),
421 archetype: None,
422 component: selector.component.as_str().into(),
423 is_static: false,
424 is_tombstone: false,
425 is_semantically_empty: false,
426 };
427
428 let Some(per_identifier) = self.per_column_metadata.get(&selector.entity_path) else {
429 return result;
430 };
431
432 // We perform a scan over all component descriptors in the queried entity path.
433 let Some((component_descr, _, datatype)) =
434 per_identifier.get(&selector.component.as_str().into())
435 else {
436 return result;
437 };
438 result.store_datatype = datatype.clone();
439 result.archetype = component_descr.archetype;
440 result.component_type = component_descr.component_type;
441
442 if let Some(ColumnMetadata {
443 is_static,
444 is_tombstone,
445 is_semantically_empty,
446 }) = self.lookup_column_metadata(&selector.entity_path, component_descr.component)
447 {
448 result.is_static = is_static;
449 result.is_tombstone = is_tombstone;
450 result.is_semantically_empty = is_semantically_empty;
451 }
452
453 result
454 }
455
456 /// Returns the filtered schema for the given [`QueryExpression`].
457 ///
458 /// The order of the columns is guaranteed to be in a specific order:
459 /// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
460 /// * second, the component columns in lexical order (`Color`, `Radius, ...`).
461 pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
462 re_tracing::profile_function!();
463
464 let filter = Self::create_component_filter_from_query(query);
465
466 self.schema().filter_components(filter)
467 }
468
469 pub fn create_component_filter_from_query(
470 query: &QueryExpression,
471 ) -> impl Fn(&ComponentColumnDescriptor) -> bool {
472 let QueryExpression {
473 view_contents,
474 include_semantically_empty_columns,
475 include_tombstone_columns,
476 include_static_columns,
477 filtered_index: _,
478 filtered_index_range: _,
479 filtered_index_values: _,
480 using_index_values: _,
481 filtered_is_not_null: _,
482 sparse_fill_strategy: _,
483 selection: _,
484 } = query;
485
486 move |column: &ComponentColumnDescriptor| {
487 let is_part_of_view_contents = || {
488 view_contents.as_ref().is_none_or(|view_contents| {
489 view_contents
490 .get(&column.entity_path)
491 .is_some_and(|components| {
492 components
493 .as_ref()
494 .is_none_or(|components| components.contains(&column.component))
495 })
496 })
497 };
498
499 let passes_semantically_empty_check =
500 || *include_semantically_empty_columns || !column.is_semantically_empty;
501
502 let passes_tombstone_check = || *include_tombstone_columns || !column.is_tombstone;
503
504 let passes_static_check = || match include_static_columns {
505 StaticColumnSelection::Both => true,
506 StaticColumnSelection::StaticOnly => column.is_static,
507 StaticColumnSelection::NonStaticOnly => !column.is_static,
508 };
509
510 is_part_of_view_contents()
511 && passes_semantically_empty_check()
512 && passes_tombstone_check()
513 && passes_static_check()
514 }
515 }
516}