use std::collections::{BTreeMap, HashMap};
use arrow::array::RecordBatch;
use arrow::buffer::NullBuffer;
use arrow::datatypes::Field;
use arrow::{
array::{BinaryArray, BooleanArray, FixedSizeBinaryArray, StringArray, UInt64Array},
error::ArrowError,
};
use itertools::Itertools as _;
use re_chunk::external::nohash_hasher::IntMap;
use re_chunk::external::re_byte_size;
use re_chunk::{ArchetypeName, ChunkError, ChunkId, ComponentIdentifier, ComponentType, Timeline};
use re_log_types::external::re_tuid::Tuid;
use re_log_types::{AbsoluteTimeRange, EntityPath, StoreId, TimeType};
use re_types_core::ComponentDescriptor;
use crate::{CodecError, CodecResult, Decodable as _, StreamFooterEntry, ToApplication as _};
#[derive(Clone, Debug)]
pub struct RawRrdManifest {
pub store_id: StoreId,
pub sorbet_schema: arrow::datatypes::Schema,
pub sorbet_schema_sha256: [u8; 32],
pub data: arrow::array::RecordBatch,
}
impl re_byte_size::SizeBytes for RawRrdManifest {
fn heap_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
let Self {
store_id,
sorbet_schema,
sorbet_schema_sha256: _,
data,
} = self;
store_id.heap_size_bytes() + sorbet_schema.heap_size_bytes() + data.heap_size_bytes()
}
}
pub type RrdManifestStaticMap = IntMap<EntityPath, IntMap<ComponentIdentifier, ChunkId>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RrdManifestTemporalMapEntry {
pub time_range: AbsoluteTimeRange,
pub num_rows: u64,
}
impl re_byte_size::SizeBytes for RrdManifestTemporalMapEntry {
fn heap_size_bytes(&self) -> u64 {
0
}
#[inline]
fn is_pod() -> bool {
true
}
}
pub type RrdManifestTemporalMap = IntMap<
EntityPath,
IntMap<Timeline, IntMap<ComponentIdentifier, BTreeMap<ChunkId, RrdManifestTemporalMapEntry>>>,
>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RrdManifestSha256(pub [u8; 32]);
impl std::fmt::Display for RrdManifestSha256 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"RrdManifest#{}",
self.0
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>(),
))
}
}
impl RawRrdManifest {
pub fn concat(manifests: &[&Self]) -> Result<Self, ArrowError> {
re_tracing::profile_function!();
let first = manifests.first().ok_or_else(|| {
ArrowError::InvalidArgumentError("No manifests to concatenate".to_owned())
})?;
for other in &manifests[1..] {
if first.store_id != other.store_id {
return Err(ArrowError::SchemaError(
"Mismatching store_id in RawRrdManifest::concat".to_owned(),
));
}
if first.sorbet_schema_sha256 != other.sorbet_schema_sha256 {
return Err(ArrowError::SchemaError(
"Mismatching sorbet recording schemas in RawRrdManifest::concat".to_owned(),
));
}
if first.data.schema() != other.data.schema() {
re_log::debug!(
"Different schemas in the RrdManifest ({} columns in existing, {} in the new part)",
first.data.num_columns(),
other.data.num_columns(),
);
}
}
let batches: Vec<&RecordBatch> = manifests.iter().map(|m| &m.data).collect();
let data = arrow::compute::concat_batches(&first.data.schema(), batches)?;
Ok(Self {
store_id: first.store_id.clone(),
sorbet_schema: first.sorbet_schema.clone(),
sorbet_schema_sha256: first.sorbet_schema_sha256,
data,
})
}
pub fn from_rrd_bytes(rrd_bytes: &[u8]) -> CodecResult<Vec<Self>> {
let stream_footer = match crate::StreamFooter::from_rrd_bytes(rrd_bytes) {
Ok(footer) => footer,
Err(CodecError::FrameDecoding(_)) => return Ok(vec![]),
err @ Err(_) => err?,
};
let mut manifests = Vec::new();
for entry in stream_footer.entries {
let StreamFooterEntry {
rrd_footer_byte_span_from_start_excluding_header,
crc_excluding_header,
} = entry;
let rrd_footer_byte_span = rrd_footer_byte_span_from_start_excluding_header;
let rrd_footer_byte_span = rrd_footer_byte_span
.try_cast::<usize>()
.ok_or_else(|| {
CodecError::FrameDecoding(
"RRD footer too large for native bit width".to_owned(),
)
})?
.range();
let rrd_footer_bytes = &rrd_bytes[rrd_footer_byte_span];
let crc = crate::StreamFooter::compute_crc(rrd_footer_bytes);
if crc != crc_excluding_header {
return Err(CodecError::CrcMismatch {
expected: crc_excluding_header,
got: crc,
});
}
let rrd_footer =
re_protos::log_msg::v1alpha1::RrdFooter::from_rrd_bytes(rrd_footer_bytes)?;
manifests.extend(
rrd_footer
.manifests
.iter()
.map(|manifest| manifest.to_application(()))
.collect::<Result<Vec<_>, _>>()?,
);
}
Ok(manifests)
}
pub fn build_in_memory_from_chunks<'a>(
store_id: StoreId,
chunks: impl Iterator<Item = &'a re_chunk::Chunk>,
) -> CodecResult<Self> {
let mut rrd_manifest_builder = crate::RrdManifestBuilder::default();
let mut offset = 0;
for chunk in chunks {
let chunk_batch = chunk.to_chunk_batch()?;
use re_byte_size::SizeBytes as _;
let byte_size_uncompressed = chunk.heap_size_bytes();
let uncompressed_byte_span = re_span::Span {
start: offset,
len: byte_size_uncompressed,
};
offset += byte_size_uncompressed;
rrd_manifest_builder.append(
&chunk_batch,
uncompressed_byte_span,
byte_size_uncompressed,
)?;
}
let rrd_manifest = rrd_manifest_builder.build(store_id)?;
rrd_manifest.sanity_check_cheap()?;
rrd_manifest.sanity_check_heavy()?;
Ok(rrd_manifest)
}
pub fn compute_sha256(&self) -> Result<RrdManifestSha256, ArrowError> {
re_tracing::profile_function!();
let data_ipc = {
let mut data_ipc = Vec::new();
let mut w =
arrow::ipc::writer::StreamWriter::try_new(&mut data_ipc, self.data.schema_ref())?;
w.write(&self.data)?;
data_ipc
};
use sha2::Digest as _;
let mut hash = [0u8; 32];
let mut hasher = sha2::Sha256::new();
hasher.update(&data_ipc);
hasher.finalize_into(sha2::digest::generic_array::GenericArray::from_mut_slice(
&mut hash,
));
Ok(RrdManifestSha256(hash))
}
pub fn calc_static_map(&self) -> CodecResult<RrdManifestStaticMap> {
re_tracing::profile_function!();
use re_arrow_util::ArrowArrayDowncastRef as _;
let mut per_entity: RrdManifestStaticMap = IntMap::default();
let chunk_ids = self.col_chunk_id()?;
let chunk_entity_paths = self.col_chunk_entity_path()?;
let chunk_is_static = self.col_chunk_is_static()?;
let has_static_component_data =
itertools::izip!(self.data.schema_ref().fields().iter(), self.data.columns(),)
.filter(|(f, _c)| f.name().ends_with(":has_static_data"))
.map(|(f, c)| {
c.downcast_array_ref::<arrow::array::BooleanArray>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"'{}' should be a BooleanArray, but it's a {} instead",
f.name(),
c.data_type(),
)))
})
.map(|c| (f, c))
})
.collect::<Result<Vec<_>, _>>()?;
for (i, (chunk_id, is_static, entity_path)) in
itertools::izip!(chunk_ids, chunk_is_static, chunk_entity_paths).enumerate()
{
if !is_static {
continue;
}
for (f, has_static_component_data) in &has_static_component_data {
let has_static_component_data = has_static_component_data.value(i);
if !has_static_component_data {
continue;
}
let Some(component) = f.metadata().get("rerun:component") else {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"column '{}' is missing rerun:component metadata",
f.name()
),
}));
};
let component = ComponentIdentifier::new(component);
let per_component = per_entity.entry(entity_path.clone()).or_default();
per_component
.entry(component)
.and_modify(|id| *id = chunk_id)
.or_insert(chunk_id);
}
}
Ok(per_entity)
}
pub fn calc_temporal_map(&self) -> CodecResult<RrdManifestTemporalMap> {
re_tracing::profile_function!();
use re_arrow_util::ArrowArrayDowncastRef as _;
let fields = self.data.schema_ref().fields();
let columns = self.data.columns();
let indexes = fields
.iter()
.filter_map(|f| {
f.metadata()
.get("rerun:index")
.and_then(|index| f.metadata().get("rerun:component").map(|c| (index, c, f)))
})
.filter(|(_index, _component, field)| field.name().ends_with(":start"))
.collect_vec();
let mut per_entity: RrdManifestTemporalMap = Default::default();
let chunk_ids = self.col_chunk_id()?;
let chunk_entity_paths = self.col_chunk_entity_path()?;
let chunk_is_static = self.col_chunk_is_static()?;
struct IndexColumns<'a> {
index: &'a str,
component: &'a String,
time_type: TimeType,
col_start_nulls: NullBuffer,
col_start_raw: &'a [i64],
col_end_nulls: NullBuffer,
col_end_raw: &'a [i64],
col_num_rows_raw: &'a [u64],
}
let mut columns_per_index = HashMap::<String, IndexColumns<'_>>::new();
for (index, component, field) in indexes {
let index = index.as_str();
if index == "rerun:static" {
continue;
}
pub fn get_index_name(field: &arrow::datatypes::Field) -> Option<&str> {
field.metadata().get("rerun:index").map(|s| s.as_str())
}
pub fn is_specific_index(field: &arrow::datatypes::Field, index_name: &str) -> bool {
get_index_name(field) == Some(index_name)
}
let Some((_, col_start)) = itertools::izip!(fields, columns).find(|(f, _col)| {
is_specific_index(f, index)
&& f.name().ends_with(":start")
&& f.metadata().get("rerun:component") == Some(component)
}) else {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!("start index is missing for {component}"),
}));
};
let Some((_, col_end)) = itertools::izip!(fields, columns).find(|(f, _col)| {
is_specific_index(f, index)
&& f.name().ends_with(":end")
&& f.metadata().get("rerun:component") == Some(component)
}) else {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!("end index is missing for {component}"),
}));
};
let Some((field_num_rows, col_num_rows)) =
itertools::izip!(fields, columns).find(|(f, _col)| {
is_specific_index(f, index)
&& f.name().ends_with(":num_rows")
&& f.metadata().get("rerun:component") == Some(component)
})
else {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!("num_rows index is missing for {component}"),
}));
};
let (time_type, col_start_raw) =
TimeType::from_arrow_array(col_start).map_err(CodecError::ArrowDeserialization)?;
let (_, col_end_raw) =
TimeType::from_arrow_array(col_end).map_err(CodecError::ArrowDeserialization)?;
let col_num_rows_raw: &[u64] = col_num_rows
.downcast_array_ref::<UInt64Array>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"'{}' should be a BooleanArray, but it's a {} instead",
field_num_rows.name(),
col_num_rows.data_type(),
)))
})?
.values();
let col_start_nulls = col_start
.nulls()
.cloned()
.unwrap_or_else(|| NullBuffer::new_valid(col_start.len()));
let col_end_nulls = col_end
.nulls()
.cloned()
.unwrap_or_else(|| NullBuffer::new_valid(col_end.len()));
columns_per_index.insert(
field.name().to_owned(),
IndexColumns {
index,
component,
time_type,
col_start_nulls,
col_start_raw,
col_end_nulls,
col_end_raw,
col_num_rows_raw,
},
);
}
for (i, (chunk_id, is_static, entity_path)) in
itertools::izip!(chunk_ids, chunk_is_static, chunk_entity_paths).enumerate()
{
if is_static {
continue;
}
for columns in columns_per_index.values() {
let IndexColumns {
index,
component,
time_type,
col_start_nulls,
col_start_raw,
col_end_nulls,
col_end_raw,
col_num_rows_raw,
} = columns;
if !col_start_nulls.is_valid(i) || !col_end_nulls.is_valid(i) {
continue;
}
let component = ComponentIdentifier::new(component);
let timeline = Timeline::new(*index, *time_type);
let per_timeline = per_entity.entry(entity_path.clone()).or_default();
let per_component = per_timeline.entry(timeline).or_default();
let per_chunk = per_component.entry(component).or_default();
let start = col_start_raw[i];
let end = col_end_raw[i];
let num_rows = col_num_rows_raw[i];
let entry = RrdManifestTemporalMapEntry {
time_range: AbsoluteTimeRange::new(start, end),
num_rows,
};
per_chunk
.entry(chunk_id)
.and_modify(|tr| *tr = entry)
.or_insert(entry);
}
}
Ok(per_entity)
}
}
impl PartialEq for RawRrdManifest {
fn eq(&self, other: &Self) -> bool {
let Self {
store_id,
sorbet_schema,
sorbet_schema_sha256,
data,
} = self;
*store_id == other.store_id
&& *data == other.data
&& *sorbet_schema_sha256 == other.sorbet_schema_sha256
&& sorbet_schema.metadata() == other.sorbet_schema.metadata()
&& sorbet_schema.fields.len() == other.sorbet_schema.fields.len()
&& {
let sorted_fields = sorbet_schema.fields.iter().sorted_by_key(|f| f.name());
let other_sorted_fields = other
.sorbet_schema
.fields
.iter()
.sorted_by_key(|f| f.name());
itertools::izip!(sorted_fields, other_sorted_fields).all(|(f1, f2)| f1 == f2)
}
}
}
impl RawRrdManifest {
pub fn compute_sorbet_schema_sha256(
schema: &arrow::datatypes::Schema,
) -> Result<[u8; 32], ArrowError> {
let schema = {
let mut fields = schema.fields().to_vec();
fields.sort();
arrow::datatypes::Schema::new_with_metadata(fields, Default::default()) };
let partition_schema_ipc = {
let mut schema_ipc = Vec::new();
arrow::ipc::writer::StreamWriter::try_new(&mut schema_ipc, &schema)?;
schema_ipc
};
use sha2::Digest as _;
let mut hash = [0u8; 32];
let mut hasher = sha2::Sha256::new();
hasher.update(&partition_schema_ipc);
hasher.finalize_into(sha2::digest::generic_array::GenericArray::from_mut_slice(
&mut hash,
));
Ok(hash)
}
pub fn compute_column_name(
entity_path: Option<&EntityPath>,
strip_entity_prefix: Option<&str>,
component_desc: Option<&ComponentDescriptor>,
prefix: Option<&str>,
suffix: Option<&str>,
) -> String {
use re_types_core::reflection::ComponentDescriptorExt as _;
let full_name = [
prefix.map(ToOwned::to_owned),
entity_path.map(|p| {
let path = p.to_string();
let path = strip_entity_prefix
.and_then(|prefix| path.strip_prefix(prefix))
.unwrap_or(&path);
path.strip_suffix("/").unwrap_or(path).to_owned()
}),
component_desc
.and_then(|descr| descr.archetype)
.map(|archetype| archetype.short_name().to_owned()),
component_desc.map(|descr| descr.archetype_field_name().to_owned()),
suffix.map(ToOwned::to_owned),
]
.into_iter()
.flatten()
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join(":");
let sanitized = full_name.replace([',', ' ', '-', '.', '\\'], "_");
sanitized.trim_start_matches('_').to_owned()
}
}
impl RawRrdManifest {
pub fn sanity_check_cheap(&self) -> CodecResult<()> {
re_tracing::profile_function!();
self.check_global_columns_are_correct()?;
self.check_index_columns_are_correct()?;
self.check_manifest_schema_matches_sorbet_schema()?;
Ok(())
}
pub fn sanity_check_heavy(&self) -> CodecResult<()> {
re_tracing::profile_function!();
self.check_sorbet_schema_sha256_is_correct()?;
Ok(())
}
fn check_global_columns_are_correct(&self) -> CodecResult<()> {
_ = self.col_chunk_id()?;
_ = self.col_chunk_is_static()?;
_ = self.col_chunk_num_rows()?;
_ = self.col_chunk_entity_path()?;
_ = self.col_chunk_byte_size_uncompressed()?;
_ = self.col_chunk_byte_offset()?;
_ = self.col_chunk_byte_size()?;
if self
.data
.schema_ref()
.column_with_name(Self::FIELD_CHUNK_KEY)
.is_some()
{
_ = self.col_chunk_key_raw()?;
}
Ok(())
}
fn check_index_columns_are_correct(&self) -> CodecResult<()> {
{
for field in self.data.schema().fields() {
if let Some((_, suffix)) = field.name().rsplit_once(':') {
match suffix {
"start" | "end" => {
}
"has_static_data" => {
if field.data_type() != Self::field_chunk_is_static().data_type() {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' should be {} but is actually {}",
field.name(),
Self::field_chunk_is_static().data_type(),
field.data_type(),
),
}));
}
}
"num_rows" => {
if field.data_type() != Self::field_chunk_num_rows().data_type() {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' should be {} but is actually {}",
field.name(),
Self::field_chunk_num_rows().data_type(),
field.data_type(),
),
}));
}
}
suffix => {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' has invalid suffix '{suffix}'",
field.name(),
),
}));
}
}
} else {
match field.name().as_str() {
Self::FIELD_CHUNK_ID
| Self::FIELD_CHUNK_IS_STATIC
| Self::FIELD_CHUNK_NUM_ROWS
| Self::FIELD_CHUNK_BYTE_SIZE
| Self::FIELD_CHUNK_BYTE_SIZE_UNCOMPRESSED
| Self::FIELD_CHUNK_BYTE_OFFSET
| Self::FIELD_CHUNK_KEY
| Self::FIELD_CHUNK_ENTITY_PATH => {}
name if Self::COMMON_IMPL_SPECIFIC_FIELDS.contains(&name) => {}
name => {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"unexpected field '{name}' should not be present in an RRD manifest",
),
}));
}
}
}
}
}
{
for field in self.data.schema().fields() {
if let Some((prefix, suffix)) = field.name().rsplit_once(':') {
let counterpart = match suffix {
"start" => "end",
"end" => "start",
_ => continue,
};
let field_counterpart = self
.data
.schema_ref()
.field_with_name(&format!("{prefix}:{counterpart}"))
.map_err(|_err| {
CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' does not have matching `:{counterpart}` field",
field.name()
),
})
})?;
match field.data_type() {
arrow::datatypes::DataType::Int64
| arrow::datatypes::DataType::Timestamp(_, _)
| arrow::datatypes::DataType::Duration(_) => {}
datatype => {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' is {datatype} which is not a supported index datatype",
field.name(),
),
}));
}
}
if field.data_type() != field_counterpart.data_type() {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' is {} but field '{}' is {}",
field.name(),
field.data_type(),
field_counterpart.name(),
field_counterpart.data_type()
),
}));
}
}
}
}
{
for field in self.data.schema().fields() {
if let Some((prefix, "num_rows")) = field.name().rsplit_once(':') {
let field_num_rows = self
.data
.schema_ref()
.field_with_name(&format!("{prefix}:num_rows"))
.map_err(|_err| {
CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' does not have matching `:num_rows` field",
field.name()
),
})
})?;
match field_num_rows.data_type() {
arrow::datatypes::DataType::UInt64 => {}
datatype => {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"field '{}' is {datatype} while it should be UInt64Array",
field_num_rows.name(),
),
}));
}
}
}
}
}
Ok(())
}
fn check_manifest_schema_matches_sorbet_schema(&self) -> CodecResult<()> {
let any_static_chunks = self.col_chunk_is_static()?.any(|b| b);
let sorbet_indexes = self
.sorbet_schema
.fields()
.iter()
.filter_map(|f| {
let md = f.metadata();
(md.get("rerun:kind").map(|s| s.as_str()) == Some("index"))
.then(|| md.contains_key("rerun:index_name").then_some(f))
.flatten()
})
.unique()
.collect_vec();
let sorbet_columns = self
.sorbet_schema
.fields()
.iter()
.filter(|f| f.metadata().get("rerun:kind").map(|s| s.as_str()) == Some("data"))
.unique()
.collect_vec();
if any_static_chunks {
for column in &sorbet_columns {
let md = column.metadata();
let Some(component) = md.get("rerun:component") else {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"column '{}' is missing rerun:component metadata",
column.name()
),
}));
};
let descr = ComponentDescriptor {
archetype: md.get("rerun:archetype").map(|s| ArchetypeName::new(s)),
component: ComponentIdentifier::new(component),
component_type: md
.get("rerun:component_type")
.map(|s| ComponentType::new(s)),
};
let column_name = Self::compute_column_name(
None,
None,
Some(&descr),
None,
Some("has_static_data"),
);
self.data
.schema_ref()
.field_with_name(&column_name)
.map_err(|_err| {
CodecError::from(ChunkError::Malformed {
reason: format!("static index '{column_name}' is missing"),
})
})?;
}
}
let mut rrd_manifest_fields: HashMap<_, _> = self
.data
.schema_ref()
.fields()
.iter()
.filter(|f| f.name().ends_with(":start") || f.name().ends_with(":end"))
.map(|f| (f.name(), f))
.collect();
for sorbet_index in &sorbet_indexes {
let sorbet_index_name_normalized =
Self::compute_column_name(None, None, None, Some(sorbet_index.name()), None);
for suffix in ["start", "end"] {
let field = rrd_manifest_fields.remove(&format!("{sorbet_index_name_normalized}:{suffix}"))
.ok_or_else(|| {
CodecError::from(ChunkError::Malformed {
reason: format!(
"global index '{sorbet_index}' does not have matching `:{suffix}` field"
),
})
})?;
if sorbet_index.data_type() != field.data_type() {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"global index '{}' is {} but '{}' is {}",
sorbet_index.name(),
sorbet_index.data_type(),
field.name(),
field.data_type()
),
}));
}
}
for sorbet_column in &sorbet_columns {
let md = sorbet_column.metadata();
let Some(component) = md.get("rerun:component") else {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"column '{}' is missing rerun:component metadata",
sorbet_column.name()
),
}));
};
let descr = ComponentDescriptor {
archetype: md.get("rerun:archetype").map(|s| ArchetypeName::new(s)),
component: ComponentIdentifier::new(component),
component_type: md
.get("rerun:component_type")
.map(|s| ComponentType::new(s)),
};
for suffix in ["start", "end"] {
let column_name = Self::compute_column_name(
None,
None,
Some(&descr),
Some(sorbet_index.name()),
Some(suffix),
);
if md.get("rerun:is_static").map(|s| s.as_str()) == Some("true") {
_ = rrd_manifest_fields.remove(&column_name);
continue;
}
let Some(field) = rrd_manifest_fields.remove(&column_name) else {
continue;
};
if sorbet_index.data_type() != field.data_type() {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"local index '{}' is {} but '{}' is {}",
sorbet_index.name(),
sorbet_index.data_type(),
field.name(),
field.data_type()
),
}));
}
}
}
}
if !rrd_manifest_fields.is_empty() {
return Err(CodecError::from(ChunkError::Malformed {
reason: format!(
"detected dangling indexes (present in manifest but not in Sorbet schema): {:?}",
rrd_manifest_fields.keys()
),
}));
}
Ok(())
}
fn check_sorbet_schema_sha256_is_correct(&self) -> CodecResult<()> {
let expected_sorbet_schema_sha256 = Self::compute_sorbet_schema_sha256(&self.sorbet_schema)
.map_err(CodecError::ArrowDeserialization)?;
if self.sorbet_schema_sha256 != expected_sorbet_schema_sha256 {
return Err(CodecError::ArrowDeserialization(ArrowError::SchemaError(
format!(
"invalid schema hash: expected {} but got {}",
expected_sorbet_schema_sha256
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>(),
self.sorbet_schema_sha256
.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>(),
),
)));
}
Ok(())
}
}
impl RawRrdManifest {
pub const FIELD_CHUNK_ID: &str = "chunk_id";
pub const FIELD_CHUNK_IS_STATIC: &str = "chunk_is_static";
pub const FIELD_CHUNK_NUM_ROWS: &str = "chunk_num_rows";
pub const FIELD_CHUNK_ENTITY_PATH: &str = "chunk_entity_path";
pub const FIELD_CHUNK_BYTE_OFFSET: &str = "chunk_byte_offset";
pub const FIELD_CHUNK_BYTE_SIZE: &str = "chunk_byte_size";
pub const FIELD_CHUNK_BYTE_SIZE_UNCOMPRESSED: &str = "chunk_byte_size_uncompressed";
pub const FIELD_CHUNK_KEY: &str = "chunk_key";
pub const COMMON_IMPL_SPECIFIC_FIELDS: &[&str] = &[
"chunk_partition_id",
"chunk_partition_layer",
"rerun_partition_id",
"rerun_partition_layer",
"chunk_segment_id",
"chunk_segment_layer",
"rerun_segment_id",
"rerun_segment_layer",
];
pub fn field_chunk_id() -> Field {
use re_log_types::external::re_types_core::Loggable as _;
let nullable = false; Field::new(Self::FIELD_CHUNK_ID, ChunkId::arrow_datatype(), nullable)
}
pub fn field_chunk_is_static() -> Field {
let nullable = false; Field::new(
Self::FIELD_CHUNK_IS_STATIC,
arrow::datatypes::DataType::Boolean,
nullable,
)
}
pub fn field_chunk_num_rows() -> Field {
let nullable = false; Field::new(
Self::FIELD_CHUNK_NUM_ROWS,
arrow::datatypes::DataType::UInt64,
nullable,
)
}
pub fn field_chunk_entity_path() -> Field {
let nullable = false; Field::new(
Self::FIELD_CHUNK_ENTITY_PATH,
arrow::datatypes::DataType::Utf8,
nullable,
)
}
pub fn field_chunk_byte_offset() -> Field {
Self::any_byte_field(Self::FIELD_CHUNK_BYTE_OFFSET)
}
pub fn field_chunk_byte_size() -> Field {
Self::any_byte_field(Self::FIELD_CHUNK_BYTE_SIZE)
}
pub fn field_chunk_byte_size_uncompressed() -> Field {
Self::any_byte_field(Self::FIELD_CHUNK_BYTE_SIZE_UNCOMPRESSED)
}
pub fn field_chunk_key() -> Field {
let nullable = false; Field::new(
Self::FIELD_CHUNK_KEY,
arrow::datatypes::DataType::Binary,
nullable,
)
}
pub fn field_index_start(timeline: &Timeline, desc: Option<&ComponentDescriptor>) -> Field {
Self::any_index_field(timeline, timeline.datatype(), desc, "start")
}
pub fn field_index_end(timeline: &Timeline, desc: Option<&ComponentDescriptor>) -> Field {
Self::any_index_field(timeline, timeline.datatype(), desc, "end")
}
pub fn field_index_num_rows(timeline: &Timeline, desc: Option<&ComponentDescriptor>) -> Field {
Self::any_index_field(
timeline,
arrow::datatypes::DataType::UInt64,
desc,
"num_rows",
)
}
pub fn field_index_has_data(timeline: &Timeline, desc: &ComponentDescriptor) -> Field {
Self::any_index_field(
timeline,
arrow::datatypes::DataType::Boolean,
Some(desc),
"has_data",
)
}
pub fn field_has_static_data(desc: &ComponentDescriptor) -> Field {
let field_name =
Self::compute_column_name(None, None, Some(desc), None, Some("has_static_data"));
let mut metadata = std::collections::HashMap::default();
metadata.extend(
[
Some(("rerun:index".to_owned(), "rerun:static".to_owned())), desc.component_type.map(|component_type| {
(
"rerun:component_type".to_owned(),
component_type.full_name().to_owned(),
)
}),
desc.archetype
.as_ref()
.map(|name| ("rerun:archetype".to_owned(), name.full_name().to_owned())),
Some(("rerun:component".to_owned(), desc.component.to_string())),
]
.into_iter()
.flatten(),
);
let nullable = false;
Field::new(field_name, arrow::datatypes::DataType::Boolean, nullable)
.with_metadata(metadata)
}
fn any_index_field(
timeline: &Timeline,
datatype: arrow::datatypes::DataType,
desc: Option<&ComponentDescriptor>,
marker: &str,
) -> Field {
let index_name = timeline.name();
let field_name =
Self::compute_column_name(None, None, desc, Some(index_name), Some(marker));
let mut metadata = std::collections::HashMap::default();
metadata.extend([("rerun:index".to_owned(), timeline.name().to_string())]);
if let Some(desc) = desc {
metadata.extend(
[
desc.component_type.map(|component_type| {
(
"rerun:component_type".to_owned(),
component_type.full_name().to_owned(),
)
}),
desc.archetype
.as_ref()
.map(|name| ("rerun:archetype".to_owned(), name.full_name().to_owned())),
Some(("rerun:component".to_owned(), desc.component.to_string())),
]
.into_iter()
.flatten(),
);
}
let nullable = true; Field::new(field_name, datatype, nullable).with_metadata(metadata)
}
fn any_byte_field(name: &str) -> Field {
let nullable = false; Field::new(name, arrow::datatypes::DataType::UInt64, nullable)
}
}
impl RawRrdManifest {
pub fn col_chunk_entity_path_raw(&self) -> CodecResult<&StringArray> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_ENTITY_PATH;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<StringArray>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a StringArray",
)))
})
}
pub fn col_chunk_entity_path(&self) -> CodecResult<impl Iterator<Item = EntityPath>> {
let col_raw = self.col_chunk_entity_path_raw()?;
Ok(col_raw.iter().flatten().map(EntityPath::parse_forgiving))
}
pub fn col_chunk_id_raw(&self) -> CodecResult<&FixedSizeBinaryArray> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_ID;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<FixedSizeBinaryArray>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a FixedSizeBinaryArray",
)))
})
}
pub fn col_chunk_id(&self) -> CodecResult<impl Iterator<Item = ChunkId>> {
Ok(self
.col_chunk_id_raw()?
.iter()
.flatten()
.filter_map(|bytes| {
let bytes: [u8; 16] = bytes
.try_into()
.inspect_err(|err| {
tracing::error!(
%err,
?bytes,
"failed to parse chunk ID from fixed-size binary array"
);
})
.ok()?;
Some(ChunkId::from_tuid(Tuid::from_bytes(bytes)))
}))
}
pub fn col_chunk_is_static_raw(&self) -> CodecResult<&BooleanArray> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_IS_STATIC;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<BooleanArray>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a BooleanArray",
)))
})
}
pub fn col_chunk_is_static(&self) -> CodecResult<impl Iterator<Item = bool>> {
Ok(self.col_chunk_is_static_raw()?.iter().flatten())
}
pub fn col_chunk_num_rows_raw(&self) -> CodecResult<&UInt64Array> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_NUM_ROWS;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<UInt64Array>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a UInt64Array",
)))
})
}
pub fn col_chunk_num_rows(&self) -> CodecResult<impl Iterator<Item = u64>> {
Ok(self.col_chunk_num_rows_raw()?.iter().flatten())
}
pub fn col_chunk_byte_offset_raw(&self) -> CodecResult<&UInt64Array> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_BYTE_OFFSET;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<UInt64Array>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a UInt64Array",
)))
})
}
pub fn col_chunk_byte_offset(&self) -> CodecResult<impl Iterator<Item = u64>> {
Ok(self.col_chunk_byte_offset_raw()?.iter().flatten())
}
pub fn col_chunk_byte_size_raw(&self) -> CodecResult<&UInt64Array> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_BYTE_SIZE;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<UInt64Array>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a UInt64Array",
)))
})
}
pub fn col_chunk_byte_size(&self) -> CodecResult<impl Iterator<Item = u64>> {
Ok(self.col_chunk_byte_size_raw()?.iter().flatten())
}
pub fn col_chunk_byte_size_uncompressed_raw(&self) -> CodecResult<&UInt64Array> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_BYTE_SIZE_UNCOMPRESSED;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<UInt64Array>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a UInt64Array",
)))
})
}
pub fn col_chunk_byte_size_uncompressed(&self) -> CodecResult<impl Iterator<Item = u64>> {
Ok(self
.col_chunk_byte_size_uncompressed_raw()?
.iter()
.flatten())
}
pub fn col_chunk_key_raw(&self) -> CodecResult<&BinaryArray> {
use re_arrow_util::ArrowArrayDowncastRef as _;
let name = Self::FIELD_CHUNK_KEY;
self.data
.column_by_name(name)
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot read column: '{name}' is missing from batch",
)))
})?
.downcast_array_ref::<BinaryArray>()
.ok_or_else(|| {
CodecError::ArrowDeserialization(ArrowError::SchemaError(format!(
"cannot downcast column: '{name}' is not a BinaryArray"
)))
})
}
}