use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields};
use re_log_types::EntityPath;
use crate::{
BatchType, ColumnDescriptor, ComponentColumnDescriptor, IndexColumnDescriptor,
RowIdColumnDescriptor, SorbetColumnDescriptors, SorbetError,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkColumnDescriptors {
pub row_id: RowIdColumnDescriptor,
pub indices: Vec<IndexColumnDescriptor>,
pub components: Vec<ComponentColumnDescriptor>,
}
impl ChunkColumnDescriptors {
#[inline]
#[track_caller]
pub fn sanity_check(&self) {
for component in &self.components {
component.sanity_check();
}
}
pub fn indices_and_components(&self) -> Vec<ColumnDescriptor> {
itertools::chain!(
self.indices.iter().cloned().map(ColumnDescriptor::Time),
self.components
.iter()
.cloned()
.map(ColumnDescriptor::Component),
)
.collect()
}
pub fn get_index_or_component(&self, index_ignoring_row_id: usize) -> Option<ColumnDescriptor> {
if index_ignoring_row_id < self.indices.len() {
Some(ColumnDescriptor::Time(
self.indices[index_ignoring_row_id].clone(),
))
} else {
self.components
.get(index_ignoring_row_id - self.indices.len())
.cloned()
.map(ColumnDescriptor::Component)
}
}
#[must_use]
#[inline]
pub fn filter_components(mut self, keep: impl Fn(&ComponentColumnDescriptor) -> bool) -> Self {
self.components.retain(keep);
self
}
pub fn arrow_fields(&self) -> Vec<ArrowField> {
std::iter::once(self.row_id.to_arrow_field())
.chain(self.indices.iter().map(|c| c.to_arrow_field()))
.chain(
self.components
.iter()
.map(|c| c.to_arrow_field(BatchType::Dataframe)),
)
.collect()
}
}
impl ChunkColumnDescriptors {
pub fn try_from_arrow_fields(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, SorbetError> {
Self::try_from(SorbetColumnDescriptors::try_from_arrow_fields(
chunk_entity_path,
fields,
)?)
}
}
impl TryFrom<SorbetColumnDescriptors> for ChunkColumnDescriptors {
type Error = SorbetError;
fn try_from(columns: SorbetColumnDescriptors) -> Result<Self, Self::Error> {
let SorbetColumnDescriptors { columns } = columns;
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();
for column in &columns {
match column.clone() {
ColumnDescriptor::RowId(descr) => {
if indices.is_empty() && components.is_empty() {
row_ids.push(descr);
} else {
let err = format!(
"RowId column must be the first column; but the columns were: {columns:?}"
);
return Err(SorbetError::InvalidColumnOrder(err));
}
}
ColumnDescriptor::Time(descr) => {
if components.is_empty() {
indices.push(descr);
} else {
return Err(SorbetError::InvalidColumnOrder(
"Index columns must come before any data columns".to_owned(),
));
}
}
ColumnDescriptor::Component(descr) => {
components.push(descr);
}
}
}
if row_ids.len() > 1 {
return Err(SorbetError::MultipleRowIdColumns(row_ids.len()));
}
let row_id = row_ids.pop().ok_or_else(|| {
re_log::debug!("Missing RowId column, but had these columns: {columns:#?}");
SorbetError::MissingRowIdColumn
})?;
Ok(Self {
row_id,
indices,
components,
})
}
}
impl From<ChunkColumnDescriptors> for SorbetColumnDescriptors {
fn from(columns: ChunkColumnDescriptors) -> Self {
let ChunkColumnDescriptors {
row_id,
indices,
components,
} = columns;
let columns = itertools::chain!(
std::iter::once(ColumnDescriptor::RowId(row_id.clone())),
indices.iter().cloned().map(ColumnDescriptor::Time),
components.iter().cloned().map(ColumnDescriptor::Component),
)
.collect();
Self { columns }
}
}