use std::{
collections::{BTreeMap, BTreeSet},
ops::{Deref, DerefMut},
};
use arrow::{
array::ListArray as ArrowListArray,
datatypes::{DataType as ArrowDatatype, Field as ArrowField},
};
use itertools::Itertools as _;
use re_chunk::{ComponentIdentifier, LatestAtQuery, RangeQuery, TimelineName};
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline};
use re_sorbet::{
ChunkColumnDescriptors, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
IndexColumnDescriptor, TimeColumnSelector,
};
use tap::Tap as _;
use crate::{ChunkStore, ColumnMetadata};
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
pub enum SparseFillStrategy {
#[default]
None,
LatestAtGlobal,
}
impl std::fmt::Display for SparseFillStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => f.write_str("none"),
Self::LatestAtGlobal => f.write_str("latest-at (global)"),
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
pub struct ViewContentsSelector(pub BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>);
impl ViewContentsSelector {
pub fn into_inner(self) -> BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>> {
self.0
}
}
impl Deref for ViewContentsSelector {
type Target = BTreeMap<EntityPath, Option<BTreeSet<ComponentIdentifier>>>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ViewContentsSelector {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl FromIterator<(EntityPath, Option<BTreeSet<ComponentIdentifier>>)> for ViewContentsSelector {
fn from_iter<T: IntoIterator<Item = (EntityPath, Option<BTreeSet<ComponentIdentifier>>)>>(
iter: T,
) -> Self {
Self(iter.into_iter().collect())
}
}
pub type Index = TimelineName;
pub type IndexValue = TimeInt;
pub type IndexRange = ResolvedTimeRange;
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
pub enum StaticColumnSelection {
#[default]
Both,
StaticOnly,
NonStaticOnly,
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueryExpression {
pub view_contents: Option<ViewContentsSelector>,
pub include_semantically_empty_columns: bool,
pub include_tombstone_columns: bool,
pub include_static_columns: StaticColumnSelection,
pub filtered_index: Option<Index>,
pub filtered_index_range: Option<IndexRange>,
pub filtered_index_values: Option<BTreeSet<IndexValue>>,
pub using_index_values: Option<BTreeSet<IndexValue>>,
pub filtered_is_not_null: Option<ComponentColumnSelector>,
pub sparse_fill_strategy: SparseFillStrategy,
pub selection: Option<Vec<ColumnSelector>>,
}
impl QueryExpression {
pub fn is_static(&self) -> bool {
self.filtered_index.is_none()
}
pub fn min_latest_at(&self) -> Option<LatestAtQuery> {
let index = self.filtered_index?;
if let Some(using_index_values) = &self.using_index_values {
return Some(LatestAtQuery::new(
index,
using_index_values.first().copied()?,
));
}
if let Some(filtered_index_values) = &self.filtered_index_values {
return Some(LatestAtQuery::new(
index,
filtered_index_values.first().copied()?,
));
}
if let Some(filtered_index_range) = &self.filtered_index_range {
return Some(LatestAtQuery::new(index, filtered_index_range.min()));
}
None
}
pub fn max_range(&self) -> Option<RangeQuery> {
let index = self.filtered_index?;
if let Some(using_index_values) = &self.using_index_values {
return Some(RangeQuery::new(
index,
ResolvedTimeRange::new(
using_index_values.first().copied()?,
using_index_values.last().copied()?,
),
));
}
if let Some(filtered_index_values) = &self.filtered_index_values {
return Some(RangeQuery::new(
index,
ResolvedTimeRange::new(
filtered_index_values.first().copied()?,
filtered_index_values.last().copied()?,
),
));
}
if let Some(filtered_index_range) = &self.filtered_index_range {
return Some(RangeQuery::new(index, *filtered_index_range));
}
None
}
}
impl ChunkStore {
pub fn schema(&self) -> ChunkColumnDescriptors {
re_tracing::profile_function!();
let indices = self
.timelines()
.values()
.map(|timeline| IndexColumnDescriptor::from(*timeline))
.collect();
let components = self
.per_column_metadata
.iter()
.flat_map(|(entity_path, per_identifier)| {
per_identifier
.values()
.map(move |(descr, _, datatype)| (entity_path, descr, datatype))
})
.filter_map(|(entity_path, component_descr, datatype)| {
let metadata = self.lookup_column_metadata(entity_path, component_descr)?;
Some(((entity_path, component_descr), (metadata, datatype)))
})
.map(|((entity_path, component_descr), (metadata, datatype))| {
let ColumnMetadata {
is_static,
is_tombstone,
is_semantically_empty,
} = metadata;
if let Some(c) = component_descr.component_type {
c.sanity_check();
}
ComponentColumnDescriptor {
store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR(
ArrowField::new("item", datatype.clone(), true).into(),
),
entity_path: entity_path.clone(),
archetype: component_descr.archetype,
component: component_descr.component,
component_type: component_descr.component_type,
is_static,
is_tombstone,
is_semantically_empty,
}
})
.collect_vec()
.tap_mut(|components| components.sort());
ChunkColumnDescriptors {
row_id: self.row_id_descriptor(),
indices,
components,
}
.tap(|schema| schema.sanity_check())
}
#[expect(clippy::unused_self)]
pub fn row_id_descriptor(&self) -> re_sorbet::RowIdColumnDescriptor {
re_sorbet::RowIdColumnDescriptor::from_sorted(false)
}
pub fn resolve_time_selector(&self, selector: &TimeColumnSelector) -> IndexColumnDescriptor {
let timelines = self.timelines();
let timeline = timelines
.get(&selector.timeline)
.copied()
.unwrap_or_else(|| {
re_log::warn_once!("Unknown timeline {selector:?}; assuming sequence timeline.");
Timeline::new_sequence(selector.timeline)
});
IndexColumnDescriptor::from(timeline)
}
pub fn resolve_component_selector(
&self,
selector: &ComponentColumnSelector,
) -> ComponentColumnDescriptor {
let mut result = ComponentColumnDescriptor {
store_datatype: ArrowDatatype::Null,
component_type: None,
entity_path: selector.entity_path.clone(),
archetype: None,
component: selector.component.as_str().into(),
is_static: false,
is_tombstone: false,
is_semantically_empty: false,
};
let Some(per_identifier) = self.per_column_metadata.get(&selector.entity_path) else {
return result;
};
let Some((component_descr, _, datatype)) =
per_identifier.get(&selector.component.as_str().into())
else {
return result;
};
result.store_datatype = datatype.clone();
result.archetype = component_descr.archetype;
result.component_type = component_descr.component_type;
if let Some(ColumnMetadata {
is_static,
is_tombstone,
is_semantically_empty,
}) = self.lookup_column_metadata(&selector.entity_path, component_descr)
{
result.is_static = is_static;
result.is_tombstone = is_tombstone;
result.is_semantically_empty = is_semantically_empty;
};
result
}
pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
re_tracing::profile_function!();
let QueryExpression {
view_contents,
include_semantically_empty_columns,
include_tombstone_columns,
include_static_columns,
filtered_index: _,
filtered_index_range: _,
filtered_index_values: _,
using_index_values: _,
filtered_is_not_null: _,
sparse_fill_strategy: _,
selection: _,
} = query;
let filter = |column: &ComponentColumnDescriptor| {
let is_part_of_view_contents = || {
view_contents.as_ref().is_none_or(|view_contents| {
view_contents
.get(&column.entity_path)
.is_some_and(|components| {
components
.as_ref()
.is_none_or(|components| components.contains(&column.component))
})
})
};
let passes_semantically_empty_check =
|| *include_semantically_empty_columns || !column.is_semantically_empty;
let passes_tombstone_check = || *include_tombstone_columns || !column.is_tombstone;
let passes_static_check = || match include_static_columns {
StaticColumnSelection::Both => true,
StaticColumnSelection::StaticOnly => column.is_static,
StaticColumnSelection::NonStaticOnly => !column.is_static,
};
is_part_of_view_contents()
&& passes_semantically_empty_check()
&& passes_tombstone_check()
&& passes_static_check()
};
self.schema().filter_components(filter)
}
}