use anyhow::Context as _;
use arrow::array::{FixedSizeListBuilder, Float64Builder};
use re_chunk::{Chunk, ChunkId, ChunkResult, EntityPath, RowId, TimePoint};
use re_sdk_types::archetypes::{Scalars, SeriesLines};
use crate::parsers::cdr;
use crate::parsers::decode::{MessageParser, ParserContext};
use crate::parsers::ros2msg::Ros2MessageParser;
use crate::parsers::ros2msg::definitions::std_msgs::Header;
use crate::parsers::util::fixed_size_list_builder;
pub trait ScalarExtractor: serde::de::DeserializeOwned {
fn extract_scalars(&self) -> Vec<(&str, f64)>;
fn header(&self) -> &Header;
}
pub struct ScalarMessageParser<T: ScalarExtractor> {
scalars: FixedSizeListBuilder<Float64Builder>,
field_names: Vec<String>,
num_rows: usize,
_marker: std::marker::PhantomData<T>,
}
impl<T: ScalarExtractor> ScalarMessageParser<T> {
fn init_field_names(&mut self, scalar_values: &Vec<(&str, f64)>) {
self.field_names = scalar_values
.iter()
.map(|(name, _)| (*name).to_owned())
.collect();
if scalar_values.len() != 1 {
#[expect(clippy::cast_possible_wrap)]
let num_scalars = scalar_values.len() as i32;
self.scalars = fixed_size_list_builder(num_scalars, self.num_rows);
}
}
fn metadata_chunk(entity_path: EntityPath, field_names: &[String]) -> ChunkResult<Chunk> {
Chunk::builder(entity_path)
.with_archetype(
RowId::new(),
TimePoint::default(), &SeriesLines::new().with_names(field_names.to_vec()),
)
.build()
}
}
impl<T: ScalarExtractor> Ros2MessageParser for ScalarMessageParser<T> {
fn new(num_rows: usize) -> Self {
Self {
scalars: fixed_size_list_builder(1, num_rows), field_names: Vec::new(),
num_rows,
_marker: std::marker::PhantomData,
}
}
}
impl<T: ScalarExtractor> MessageParser for ScalarMessageParser<T> {
fn append(&mut self, ctx: &mut ParserContext, msg: &mcap::Message<'_>) -> anyhow::Result<()> {
re_tracing::profile_function!();
let message = cdr::try_decode_message::<T>(&msg.data).with_context(|| {
format!(
"Failed to decode {} message from CDR data",
std::any::type_name::<T>()
)
})?;
ctx.add_timestamp_cell(crate::util::TimestampCell::from_nanos_ros2(
message.header().stamp.as_nanos() as u64,
ctx.time_type(),
));
let scalar_values = message.extract_scalars();
if self.field_names.is_empty() {
self.init_field_names(&scalar_values);
}
for field_name in &self.field_names {
self.scalars.values().append_value(
scalar_values
.iter()
.find_map(|(name, value)| (name == field_name).then_some(*value))
.unwrap_or(f64::NAN), );
}
self.scalars.append(true);
Ok(())
}
fn finalize(self: Box<Self>, ctx: ParserContext) -> anyhow::Result<Vec<Chunk>> {
re_tracing::profile_function!();
let Self {
mut scalars,
field_names,
num_rows: _,
_marker: _,
} = *self;
let entity_path = ctx.entity_path().clone();
let timelines = ctx.build_timelines();
Ok(vec![
Chunk::from_auto_row_ids(
ChunkId::new(),
entity_path.clone(),
timelines,
std::iter::once((Scalars::descriptor_scalars(), scalars.finish().into())).collect(),
)?,
Self::metadata_chunk(entity_path, &field_names)?,
])
}
}