use std::sync::Arc;
use arrow::array::{Array as _, BinaryArray, FixedSizeBinaryArray, StringArray};
use arrow::buffer::{BooleanBuffer, ScalarBuffer};
use re_chunk::external::re_byte_size;
use re_chunk::{ChunkId, EntityPath};
use re_log_types::StoreId;
use re_sorbet::SorbetSchema;
use super::{RawRrdManifest, RrdManifestSha256, RrdManifestStaticMap, RrdManifestTemporalMap};
use crate::{CodecError, CodecResult};
#[derive(Clone)]
pub struct RrdManifest {
raw: RawRrdManifest,
recording_schema: SorbetSchema,
chunk_ids: FixedSizeBinaryArray,
chunk_entity_paths: StringArray,
chunk_is_static: BooleanBuffer,
chunk_num_rows: ScalarBuffer<u64>,
chunk_byte_offsets: ScalarBuffer<u64>,
chunk_byte_sizes: ScalarBuffer<u64>,
chunk_byte_sizes_uncompressed: ScalarBuffer<u64>,
chunk_keys: Option<BinaryArray>,
static_data_map: RrdManifestStaticMap,
temporal_data_map: RrdManifestTemporalMap,
}
impl std::fmt::Debug for RrdManifest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RrdManifest").finish_non_exhaustive()
}
}
impl PartialEq for RrdManifest {
fn eq(&self, other: &Self) -> bool {
self.raw == other.raw
}
}
impl re_byte_size::SizeBytes for RrdManifest {
fn heap_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
self.raw.heap_size_bytes()
+ self.static_data_map.heap_size_bytes()
+ self.temporal_data_map.heap_size_bytes()
}
}
impl RrdManifest {
pub fn try_new(manifest: RawRrdManifest) -> CodecResult<Self> {
re_tracing::profile_function!();
if cfg!(debug_assertions) {
manifest.sanity_check_heavy()?;
} else {
manifest.sanity_check_cheap()?;
}
let chunk_ids = manifest.col_chunk_id_raw()?.clone();
ChunkId::try_slice_from_arrow(&chunk_ids).map_err(|err| {
crate::CodecError::FrameDecoding(format!("chunk_id column has wrong datatype: {err}"))
})?;
let chunk_entity_paths = manifest.col_chunk_entity_path_raw()?.clone();
let chunk_is_static_array = manifest.col_chunk_is_static_raw()?;
let chunk_num_rows_array = manifest.col_chunk_num_rows_raw()?;
let chunk_byte_offsets_array = manifest.col_chunk_byte_offset_raw()?;
let chunk_byte_sizes_array = manifest.col_chunk_byte_size_raw()?;
let chunk_byte_sizes_uncompressed_array =
manifest.col_chunk_byte_size_uncompressed_raw()?;
if chunk_ids.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_id column has {} nulls",
chunk_ids.null_count()
)));
}
if chunk_entity_paths.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_entity_path column has {} nulls",
chunk_entity_paths.null_count()
)));
}
if chunk_is_static_array.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_is_static column has {} nulls",
chunk_is_static_array.null_count()
)));
}
if chunk_num_rows_array.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_num_rows column has {} nulls",
chunk_num_rows_array.null_count()
)));
}
if chunk_byte_offsets_array.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_byte_offset column has {} nulls",
chunk_byte_offsets_array.null_count()
)));
}
if chunk_byte_sizes_array.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_byte_size column has {} nulls",
chunk_byte_sizes_array.null_count()
)));
}
if chunk_byte_sizes_uncompressed_array.null_count() > 0 {
return Err(crate::CodecError::FrameDecoding(format!(
"chunk_byte_size_uncompressed column has {} nulls",
chunk_byte_sizes_uncompressed_array.null_count()
)));
}
let chunk_is_static = chunk_is_static_array.values().clone();
let chunk_num_rows = chunk_num_rows_array.values().clone();
let chunk_byte_offsets = chunk_byte_offsets_array.values().clone();
let chunk_byte_sizes = chunk_byte_sizes_array.values().clone();
let chunk_byte_sizes_uncompressed = chunk_byte_sizes_uncompressed_array.values().clone();
let chunk_keys = if manifest
.data
.schema_ref()
.column_with_name(RawRrdManifest::FIELD_CHUNK_KEY)
.is_some()
{
Some(manifest.col_chunk_key_raw()?.clone())
} else {
None
};
let static_data_map = manifest.calc_static_map()?;
let temporal_data_map = manifest.calc_temporal_map()?;
let recording_schema =
SorbetSchema::try_from_raw_arrow_schema(Arc::new(manifest.sorbet_schema.clone()))?;
Ok(Self {
raw: manifest,
recording_schema,
chunk_ids,
chunk_entity_paths,
chunk_is_static,
chunk_num_rows,
chunk_byte_offsets,
chunk_byte_sizes,
chunk_byte_sizes_uncompressed,
chunk_keys,
static_data_map,
temporal_data_map,
})
}
pub fn recording_schema(&self) -> &SorbetSchema {
&self.recording_schema
}
pub fn concat(manifests: &[&Self]) -> CodecResult<Self> {
re_tracing::profile_function!();
let raws: Vec<&RawRrdManifest> = manifests.iter().map(|m| &m.raw).collect();
let combined = RawRrdManifest::concat(&raws).map_err(|err| {
CodecError::FrameDecoding(format!("Failed to concatenate RRD manifests: {err}"))
})?;
Self::try_new(combined)
}
pub fn build_in_memory_from_chunks<'a>(
store_id: StoreId,
chunks: impl Iterator<Item = &'a re_chunk::Chunk>,
) -> CodecResult<Arc<Self>> {
let raw = RawRrdManifest::build_in_memory_from_chunks(store_id, chunks)?;
Ok(Arc::new(Self::try_new(raw)?))
}
#[inline]
pub fn raw(&self) -> &RawRrdManifest {
&self.raw
}
#[inline]
pub fn num_chunks(&self) -> usize {
self.chunk_ids.len()
}
#[inline]
pub fn store_id(&self) -> &StoreId {
&self.raw.store_id
}
#[inline]
pub fn sorbet_schema(&self) -> &arrow::datatypes::Schema {
&self.raw.sorbet_schema
}
#[inline]
pub fn sorbet_schema_sha256(&self) -> &[u8; 32] {
&self.raw.sorbet_schema_sha256
}
#[inline]
pub fn data(&self) -> &arrow::array::RecordBatch {
&self.raw.data
}
#[inline]
pub fn col_chunk_ids(&self) -> &[ChunkId] {
#[expect(clippy::unwrap_used)] ChunkId::try_slice_from_arrow(&self.chunk_ids).unwrap()
}
#[inline]
pub fn col_chunk_entity_path_raw(&self) -> &StringArray {
&self.chunk_entity_paths
}
pub fn col_chunk_entity_path(&self) -> impl Iterator<Item = EntityPath> {
self.chunk_entity_paths
.iter()
.flatten()
.map(EntityPath::parse_forgiving)
}
#[inline]
pub fn col_chunk_is_static_raw(&self) -> &BooleanBuffer {
&self.chunk_is_static
}
#[inline]
pub fn col_chunk_is_static(&self) -> impl Iterator<Item = bool> + '_ {
self.chunk_is_static.iter()
}
#[inline]
pub fn col_chunk_num_rows(&self) -> &[u64] {
&self.chunk_num_rows
}
#[inline]
pub fn col_chunk_byte_offset(&self) -> &[u64] {
&self.chunk_byte_offsets
}
#[inline]
pub fn col_chunk_byte_size(&self) -> &[u64] {
&self.chunk_byte_sizes
}
#[inline]
pub fn col_chunk_byte_size_uncompressed(&self) -> &[u64] {
&self.chunk_byte_sizes_uncompressed
}
#[inline]
pub fn col_chunk_key_raw(&self) -> Option<&BinaryArray> {
self.chunk_keys.as_ref()
}
pub fn compute_sha256(&self) -> Result<RrdManifestSha256, arrow::error::ArrowError> {
self.raw.compute_sha256()
}
#[inline]
pub fn static_map(&self) -> &RrdManifestStaticMap {
&self.static_data_map
}
#[inline]
pub fn temporal_map(&self) -> &RrdManifestTemporalMap {
&self.temporal_data_map
}
}