use std::sync::Arc;
use nohash_hasher::{IntMap, IntSet};
use re_chunk_store::{LatestAtQuery, RangeQuery, RowId};
use re_log_types::external::arrow;
use re_log_types::external::arrow::array::Array as _;
use re_log_types::hash::Hash64;
use re_log_types::{TimeInt, TimelineName};
use re_query::LatestAtResults;
use re_sdk_types::blueprint::datatypes::ComponentSourceKind;
use re_types_core::{Archetype, ComponentIdentifier};
use re_viewer_context::{
DataResult, QueryContext, QueryRange, ViewContext, ViewQuery, ViewerContext,
};
use crate::blueprint_resolved_results::{
BlueprintResolvedLatestAtResults, BlueprintResolvedRangeResults,
};
use crate::{BlueprintResolvedResults, ComponentMappingError};
fn cast_list_array(
source: &arrow::array::ListArray,
target_list_datatype: &arrow::datatypes::DataType,
) -> Result<arrow::array::ListArray, arrow::error::ArrowError> {
if source.data_type() == target_list_datatype {
return Ok(source.clone());
}
let casted = arrow::compute::cast(source, target_list_datatype)?;
casted
.as_any()
.downcast_ref::<arrow::array::ListArray>()
.cloned()
.ok_or_else(|| {
arrow::error::ArrowError::CastError(format!(
"Expected ListArray after cast, got {:?}",
casted.data_type()
))
})
}
fn transform_chunk(
target: &ComponentIdentifier,
source: &ComponentIdentifier,
selector: &Option<re_lenses_core::Selector>,
target_datatype: Option<&arrow::datatypes::DataType>,
chunk: &re_chunk_store::Chunk,
) -> Result<re_chunk_store::Chunk, ComponentMappingError> {
chunk.with_mapped_component(*source, *target, |arr| {
let transformed = if let Some(sel) = selector {
sel.execute_per_row(&arr)
.map_err(ComponentMappingError::SelectorExecutionFailed)?
.unwrap_or_else(|| {
arrow::array::ListArray::new_null(
arrow::datatypes::Field::new_list_field(arr.value_type(), true).into(),
arr.len(),
)
})
} else {
arr
};
if let Some(dt) = target_datatype {
let target_list_datatype = arrow::datatypes::DataType::List(Arc::new(
arrow::datatypes::Field::new_list_field(dt.clone(), true),
));
cast_list_array(&transformed, &target_list_datatype).map_err(|err| {
ComponentMappingError::CastFailed {
source_datatype: transformed.data_type().clone(),
target_datatype: target_list_datatype,
err: Arc::new(err),
}
})
} else {
Ok(transformed)
}
})
}
#[derive(Debug, PartialEq, Eq, Hash)]
struct ActiveRemapping {
target: ComponentIdentifier,
source: ComponentIdentifier,
selector: Option<re_lenses_core::Selector>,
}
fn component_not_found_error(
component: ComponentIdentifier,
entity_path: &re_log_types::EntityPath,
missing_virtual_chunks: &[re_chunk_store::ChunkId],
entity_db: &re_entity_db::EntityDb,
store_engine: &re_query::StorageEngineReadGuard<'_>,
timeline_name: re_log_types::TimelineName,
) -> ComponentMappingError {
if entity_db.entity_has_temporal_data_on_timeline_for_component(
store_engine,
&timeline_name,
entity_path,
component,
) {
ComponentMappingError::NoComponentDataForQuery(component)
} else {
if !missing_virtual_chunks.is_empty()
&& let Some(rrd_manifest) = entity_db.rrd_manifest_index().manifest()
{
let store = store_engine.store();
let timeline = store.schema().timelines().get(&timeline_name).copied();
for missing_root_chunk_id in missing_virtual_chunks
.iter()
.flat_map(|chunk_id| store.find_root_chunks(chunk_id))
{
if let Some(per_component) = rrd_manifest.static_map().get(entity_path)
&& per_component.get(&component) == Some(&missing_root_chunk_id)
{
return ComponentMappingError::NoComponentDataForQueryButIsFetchable(component);
}
if let Some(timeline) = &timeline
&& let Some(per_timeline) = rrd_manifest.temporal_map().get(entity_path)
&& let Some(per_component) = per_timeline.get(timeline)
&& let Some(per_chunk) = per_component.get(&component)
&& per_chunk.contains_key(&missing_root_chunk_id)
{
return ComponentMappingError::NoComponentDataForQueryButIsFetchable(component);
}
}
}
ComponentMappingError::ComponentNotPresentOnEntity(component)
}
}
pub fn range_with_blueprint_resolved_data<'a>(
ctx: &'a ViewContext<'a>,
_annotations: Option<&re_viewer_context::Annotations>,
range_query: &RangeQuery,
data_result: &'a re_viewer_context::DataResult,
components: impl IntoIterator<Item = ComponentIdentifier>,
visualizer_instruction: &re_viewer_context::VisualizerInstruction,
) -> BlueprintResolvedRangeResults<'a> {
re_tracing::profile_function!(data_result.entity_path.to_string());
let mut components = components.into_iter().collect::<IntSet<_>>();
let overrides = query_overrides(
ctx.viewer_ctx,
visualizer_instruction,
components.iter().copied(),
);
let mut active_remappings = Vec::new();
let mut component_sources = IntMap::default();
let store_results = {
for (target_component, source) in &visualizer_instruction.component_mappings {
let source = if let re_viewer_context::VisualizerComponentSource::SourceComponent {
source_component,
selector,
} = source
&& components.remove(target_component)
{
components.insert(*source_component);
if selector.is_empty() {
active_remappings.push(ActiveRemapping {
target: *target_component,
source: *source_component,
selector: None,
});
Ok(source.source_kind())
} else {
match selector.parse::<re_lenses_core::Selector>() {
Ok(selector) => {
active_remappings.push(ActiveRemapping {
target: *target_component,
source: *source_component,
selector: Some(selector),
});
Ok(source.source_kind())
}
Err(err) => Err(ComponentMappingError::SelectorParseFailed(err)),
}
}
} else {
Ok(source.source_kind())
};
component_sources.insert(*target_component, source);
}
let engine = ctx.recording_engine();
let mut results = engine.cache().range(
range_query,
&data_result.entity_path,
components.iter().copied(),
);
let reflection = ctx.viewer_ctx.reflection();
for ActiveRemapping {
target,
source,
selector,
} in &active_remappings
{
let target_datatype = reflection.lookup_datatype(*target);
if let Some(mut chunks) = results.components.get(source).cloned() {
'ctx: {
for chunk in &mut chunks {
let result =
transform_chunk(target, source, selector, target_datatype, chunk);
match result {
Ok(modified_chunk) => *chunk = modified_chunk,
Err(err) => {
component_sources.insert(*target, Err(err));
break 'ctx;
}
}
}
results.components.insert(*target, chunks);
}
} else {
component_sources.insert(
*target,
Err(component_not_found_error(
*source,
&data_result.entity_path,
&results.missing_virtual,
ctx.recording(),
&engine,
range_query.timeline,
)),
);
}
}
results
};
#[expect(clippy::iter_over_hash_type)] for component in &components {
match component_sources.entry(*component) {
std::collections::hash_map::Entry::Occupied(_) => {}
std::collections::hash_map::Entry::Vacant(entry) => {
let source = if has_non_empty_override(&overrides, *component) {
ComponentSourceKind::Override
} else if store_results.components.contains_key(component) {
ComponentSourceKind::SourceComponent
} else {
ComponentSourceKind::Default
};
entry.insert(Ok(source));
}
}
}
let query_context = QueryContext {
view_ctx: ctx,
target_entity_path: &data_result.entity_path,
instruction_id: Some(visualizer_instruction.id),
archetype_name: None,
query: LatestAtQuery::new(range_query.timeline, range_query.range.min),
};
BlueprintResolvedRangeResults {
overrides,
store_results,
query_context,
view_defaults: &ctx.query_result.view_defaults,
component_sources,
component_mappings_hash: Hash64::hash(&active_remappings),
}
}
pub fn latest_at_with_blueprint_resolved_data<'a>(
ctx: &'a ViewContext<'a>,
_annotations: Option<&'a re_viewer_context::Annotations>,
latest_at_query: &LatestAtQuery,
data_result: &'a re_viewer_context::DataResult,
components: impl IntoIterator<Item = ComponentIdentifier>,
visualizer_instruction: Option<&re_viewer_context::VisualizerInstruction>,
) -> BlueprintResolvedLatestAtResults<'a> {
let mut components = components.into_iter().collect::<IntSet<_>>();
let overrides = if let Some(visualizer_instruction) = visualizer_instruction {
query_overrides(
ctx.viewer_ctx,
visualizer_instruction,
components.iter().copied(),
)
} else {
query_overrides_at_path(
ctx.viewer_ctx,
data_result.override_base_path(),
components.iter().copied(),
)
};
let mut active_remappings = Vec::new();
let mut component_sources = IntMap::default();
if let Some(visualizer_instruction) = visualizer_instruction {
for (target_component, source) in &visualizer_instruction.component_mappings {
let source_result =
if let re_viewer_context::VisualizerComponentSource::SourceComponent {
source_component,
selector,
} = source
&& components.remove(target_component)
{
components.insert(*source_component);
if selector.is_empty() {
active_remappings.push(ActiveRemapping {
target: *target_component,
source: *source_component,
selector: None,
});
Ok(source.source_kind())
} else {
match selector.parse::<re_lenses_core::Selector>() {
Ok(selector) => {
active_remappings.push(ActiveRemapping {
target: *target_component,
source: *source_component,
selector: Some(selector),
});
Ok(source.source_kind())
}
Err(err) => Err(ComponentMappingError::SelectorParseFailed(err)),
}
}
} else {
Ok(source.source_kind())
};
component_sources.insert(*target_component, source_result);
}
}
let engine = ctx.viewer_ctx.recording_engine();
let mut store_results = engine.cache().latest_at(
latest_at_query,
&data_result.entity_path,
components.iter().copied(),
);
let reflection = ctx.viewer_ctx.reflection();
for ActiveRemapping {
target,
source,
selector,
} in &active_remappings
{
let target_datatype = reflection.lookup_datatype(*target);
if let Some(chunk) = store_results.components.get(source) {
let result = transform_chunk(target, source, selector, target_datatype, chunk);
match result {
Ok(modified_chunk) => {
let chunk = std::sync::Arc::new(modified_chunk)
.to_unit()
.expect("The source chunk was a unit chunk.");
store_results.components.insert(*target, chunk);
}
Err(err) => {
component_sources.insert(*target, Err(err));
}
}
} else {
component_sources.insert(
*target,
Err(component_not_found_error(
*source,
&data_result.entity_path,
&store_results.missing_virtual,
ctx.viewer_ctx.recording(),
&engine,
latest_at_query.timeline(),
)),
);
}
}
#[expect(clippy::iter_over_hash_type)] for component in &components {
match component_sources.entry(*component) {
std::collections::hash_map::Entry::Occupied(_) => {}
std::collections::hash_map::Entry::Vacant(entry) => {
let source = if has_non_empty_override(&overrides, *component) {
ComponentSourceKind::Override
} else if store_results.components.contains_key(component) {
ComponentSourceKind::SourceComponent
} else {
ComponentSourceKind::Default
};
entry.insert(Ok(source));
}
}
}
let query_context = QueryContext {
view_ctx: ctx,
target_entity_path: &data_result.entity_path,
instruction_id: visualizer_instruction.map(|instruction| instruction.id),
archetype_name: None,
query: latest_at_query.clone(),
};
BlueprintResolvedLatestAtResults {
overrides,
store_results,
view_defaults: &ctx.query_result.view_defaults,
query_context,
component_sources,
component_indices_hash: Hash64::hash(&active_remappings),
}
}
pub fn query_archetype_with_history<'a>(
ctx: &'a ViewContext<'a>,
timeline: &TimelineName,
timeline_cursor: TimeInt,
query_range: &QueryRange,
components: impl IntoIterator<Item = ComponentIdentifier>,
data_result: &'a re_viewer_context::DataResult,
visualizer_instruction: &re_viewer_context::VisualizerInstruction,
) -> BlueprintResolvedResults<'a> {
match query_range {
QueryRange::TimeRange(time_range) => {
let range_query = RangeQuery::new(
*timeline,
re_log_types::AbsoluteTimeRange::from_relative_time_range(
time_range,
timeline_cursor,
),
);
let results = range_with_blueprint_resolved_data(
ctx,
None,
&range_query,
data_result,
components,
visualizer_instruction,
);
(range_query, results).into()
}
QueryRange::LatestAt => {
let latest_query = LatestAtQuery::new(*timeline, timeline_cursor);
let results = latest_at_with_blueprint_resolved_data(
ctx,
None,
&latest_query,
data_result,
components,
Some(visualizer_instruction),
);
(latest_query, results).into()
}
}
}
fn has_non_empty_override(overrides: &LatestAtResults, component: ComponentIdentifier) -> bool {
overrides
.get(component)
.and_then(|chunk| chunk.non_empty_component_batch_raw(component))
.is_some()
}
fn query_overrides(
ctx: &ViewerContext<'_>,
visualizer_instruction: &re_viewer_context::VisualizerInstruction,
components: impl IntoIterator<Item = ComponentIdentifier>,
) -> LatestAtResults {
if visualizer_instruction.component_overrides.is_empty() {
LatestAtResults::empty("<overrides>".into(), ctx.current_query())
} else {
query_overrides_at_path(
ctx,
&visualizer_instruction.override_path,
components
.into_iter()
.filter(|c| visualizer_instruction.component_overrides.contains(c)),
)
}
}
fn query_overrides_at_path(
ctx: &ViewerContext<'_>,
blueprint_path: &re_log_types::EntityPath,
components: impl IntoIterator<Item = ComponentIdentifier>,
) -> LatestAtResults {
let mut overrides = LatestAtResults::empty("<overrides>".into(), ctx.current_query());
let blueprint_engine = &ctx.store_context.blueprint.storage_engine();
for component in components {
let component_override_result =
blueprint_engine
.cache()
.latest_at(ctx.blueprint_query, blueprint_path, [component]);
if let Some(value) = component_override_result.get(component) {
let index = value.index(&ctx.blueprint_query.timeline());
re_log::debug_assert!(index.is_some(), "{value:#?}");
let index = index.unwrap_or((TimeInt::STATIC, RowId::ZERO));
overrides.add(component, index, value.clone());
}
}
overrides
}
pub trait DataResultQuery {
fn latest_at_with_blueprint_resolved_data<'a, A: re_types_core::Archetype>(
&'a self,
ctx: &'a ViewContext<'a>,
latest_at_query: &'a LatestAtQuery,
visualizer_instruction: Option<&re_viewer_context::VisualizerInstruction>,
) -> BlueprintResolvedLatestAtResults<'a>;
fn latest_at_with_blueprint_resolved_data_for_component<'a>(
&'a self,
ctx: &'a ViewContext<'a>,
latest_at_query: &'a LatestAtQuery,
component: ComponentIdentifier,
visualizer_instruction: Option<&re_viewer_context::VisualizerInstruction>,
) -> BlueprintResolvedLatestAtResults<'a>;
fn query_components_with_history<'a>(
&'a self,
ctx: &'a ViewContext<'a>,
view_query: &ViewQuery<'_>,
component_descriptors: impl IntoIterator<Item = ComponentIdentifier>,
visualizer_instruction: &re_viewer_context::VisualizerInstruction,
) -> BlueprintResolvedResults<'a>;
fn query_archetype_with_history<'a, A: Archetype>(
&'a self,
ctx: &'a ViewContext<'a>,
view_query: &ViewQuery<'_>,
visualizer_instruction: &re_viewer_context::VisualizerInstruction,
) -> BlueprintResolvedResults<'a> {
self.query_components_with_history(
ctx,
view_query,
A::all_component_identifiers(),
visualizer_instruction,
)
}
}
impl DataResultQuery for DataResult {
fn latest_at_with_blueprint_resolved_data<'a, A: re_types_core::Archetype>(
&'a self,
ctx: &'a ViewContext<'a>,
latest_at_query: &'a LatestAtQuery,
visualizer_instruction: Option<&re_viewer_context::VisualizerInstruction>,
) -> BlueprintResolvedLatestAtResults<'a> {
latest_at_with_blueprint_resolved_data(
ctx,
None,
latest_at_query,
self,
A::all_component_identifiers(),
visualizer_instruction,
)
}
fn latest_at_with_blueprint_resolved_data_for_component<'a>(
&'a self,
ctx: &'a ViewContext<'a>,
latest_at_query: &'a LatestAtQuery,
component: ComponentIdentifier,
visualizer_instruction: Option<&re_viewer_context::VisualizerInstruction>,
) -> BlueprintResolvedLatestAtResults<'a> {
latest_at_with_blueprint_resolved_data(
ctx,
None,
latest_at_query,
self,
std::iter::once(component),
visualizer_instruction,
)
}
fn query_components_with_history<'a>(
&'a self,
ctx: &'a ViewContext<'a>,
view_query: &ViewQuery<'_>,
components: impl IntoIterator<Item = ComponentIdentifier>,
visualizer_instruction: &re_viewer_context::VisualizerInstruction,
) -> BlueprintResolvedResults<'a> {
query_archetype_with_history(
ctx,
&view_query.timeline,
view_query.latest_at,
self.query_range(),
components,
self,
visualizer_instruction,
)
}
}