use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions, create_array};
use arrow::datatypes::{Field, Fields, Schema};
use itertools::{Either, Itertools as _};
use parking_lot::Mutex;
use re_arrow_util::RecordBatchExt as _;
use re_log_encoding::RawRrdManifest;
use re_log_types::{EntryId, StoreId, StoreKind, TimeType};
use re_protos::EntryName;
use re_protos::cloud::v1alpha1::ext as cloud_ext;
use re_protos::cloud::v1alpha1::ext::ScanDatasetManifestDataframe;
use re_protos::cloud::v1alpha1::ext::{DataSourceKind, DatasetDetails, DatasetEntry, EntryDetails};
use re_protos::cloud::v1alpha1::{EntryKind, ScanSegmentTableResponse};
use re_protos::common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, SegmentId};
use re_types_core::LayerName;
use crate::store::{
Error, LayerInfo, ResolvedStore, Segment, Source, SourceInsertOutcome, StoreSlotId, Tracked,
store_pool::StorePool,
};
pub struct DatasetInner {
name: EntryName,
details: DatasetDetails,
asset_layers: Vec<Arc<Source>>,
segments: HashMap<SegmentId, Segment>,
}
pub struct Dataset {
id: EntryId,
store_kind: StoreKind,
created_at: jiff::Timestamp,
inner: Tracked<DatasetInner>,
cached_schema: Mutex<Option<(jiff::Timestamp, Arc<Schema>)>>,
}
impl Dataset {
pub fn new(
id: EntryId,
name: EntryName,
store_kind: StoreKind,
details: DatasetDetails,
) -> Self {
Self {
id,
store_kind,
created_at: jiff::Timestamp::now(),
inner: Tracked::new(DatasetInner {
name,
details,
asset_layers: Default::default(),
segments: Default::default(),
}),
cached_schema: Mutex::new(None),
}
}
#[inline]
pub fn id(&self) -> EntryId {
self.id
}
#[inline]
pub fn name(&self) -> &EntryName {
&self.inner.name
}
pub fn set_name(&mut self, name: EntryName) {
if name != self.inner.name {
self.inner.modify().name = name;
}
}
#[inline]
pub fn store_kind(&self) -> StoreKind {
self.store_kind
}
#[inline]
pub fn entry_kind(&self) -> EntryKind {
match self.store_kind() {
StoreKind::Recording => EntryKind::Dataset,
StoreKind::Blueprint => EntryKind::BlueprintDataset,
}
}
#[inline]
pub fn updated_at(&self) -> jiff::Timestamp {
self.inner.updated_at()
}
pub fn segments(&self) -> &HashMap<SegmentId, Segment> {
&self.inner.segments
}
pub fn segment(&self, segment_id: &SegmentId) -> Result<&Segment, Error> {
self.inner
.segments
.get(segment_id)
.ok_or_else(|| Error::SegmentIdNotFound {
segment_id: segment_id.clone(),
entry_id: self.id,
})
}
pub fn segments_from_ids<'a>(
&'a self,
segment_ids: Option<&'a [SegmentId]>,
) -> impl Iterator<Item = (&'a SegmentId, &'a Segment)> {
if let Some(segment_ids) = segment_ids {
Either::Left(
segment_ids
.iter()
.filter_map(|id| self.inner.segments.get(id).map(|segment| (id, segment))),
)
} else {
Either::Right(self.inner.segments.iter())
}
}
pub fn dataset_details(&self) -> &DatasetDetails {
&self.inner.details
}
pub fn set_dataset_details(&mut self, details: DatasetDetails) {
if details != self.inner.details {
self.inner.modify().details = details;
}
}
pub fn as_entry_details(&self) -> EntryDetails {
EntryDetails {
id: self.id,
name: self.inner.name.clone(),
kind: self.entry_kind(),
created_at: self.created_at,
updated_at: self.inner.updated_at(),
}
}
pub fn as_dataset_entry(&self) -> DatasetEntry {
DatasetEntry {
details: EntryDetails {
id: self.id,
name: self.inner.name.clone(),
kind: self.entry_kind(),
created_at: self.created_at,
updated_at: self.inner.updated_at(),
},
dataset_details: self.inner.details.clone(),
handle: DatasetHandle {
id: Some(self.id),
store_kind: self.store_kind,
url: url::Url::parse(&format!("memory:///{}", self.id)).expect("valid url"),
},
}
}
pub fn iter_sources(&self) -> impl Iterator<Item = &Source> {
let asset_sources = self.inner.asset_layers.iter().map(|source| source.as_ref());
let segment_sources = self
.inner
.segments
.values()
.flat_map(|segment| segment.iter_sources().map(|(_, source)| source))
.filter(|source| source.layer_info().layer_class != re_types_core::LayerClass::Asset);
std::iter::chain(asset_sources, segment_sources)
}
pub fn schema(&self) -> arrow::error::Result<Schema> {
let mut cache = self.cached_schema.lock();
let updated_at = self.updated_at();
if let Some((cached_at, schema)) = cache.as_ref()
&& *cached_at == updated_at
{
return Ok(Schema::clone(schema));
}
let schema = Schema::try_merge(self.iter_sources().map(|source| source.schema()))?;
let schema_arc = Arc::new(schema.clone());
*cache = Some((updated_at, Arc::clone(&schema_arc)));
Ok(schema)
}
pub fn segment_ids(&self) -> impl Iterator<Item = SegmentId> {
self.inner.segments.keys().cloned()
}
pub fn segment_table(&self) -> Result<RecordBatch, Error> {
let row_count = self.inner.segments.len();
let mut all_segment_properties = Vec::with_capacity(row_count);
let mut segment_ids = Vec::with_capacity(row_count);
let mut layer_names = Vec::with_capacity(row_count);
let mut storage_urls = Vec::with_capacity(row_count);
let mut last_updated_at = Vec::with_capacity(row_count);
let mut num_chunks = Vec::with_capacity(row_count);
let mut size_bytes = Vec::with_capacity(row_count);
let mut all_index_ranges = Vec::with_capacity(row_count);
for (segment_id, segment) in &self.inner.segments {
let layer_count = segment.source_count();
let mut layer_names_row = Vec::with_capacity(layer_count);
let mut storage_urls_row = Vec::with_capacity(layer_count);
let mut current_segment_properties = BTreeMap::default();
let mut current_segment_indexes = BTreeMap::default();
for (layer_name, layer) in segment.iter_sources() {
layer_names_row.push(layer_name.clone());
storage_urls_row.push(format!("memory:///store/{}", layer.store_slot_id()));
let layer_properties = layer.compute_properties()?;
for (col_idx, field) in layer_properties.schema().fields().iter().enumerate() {
current_segment_properties.insert(
Arc::clone(field),
Arc::clone(layer_properties.column(col_idx)),
);
}
for (time_name, range) in layer.index_ranges() {
let entry = current_segment_indexes.entry(time_name).or_insert(range);
*entry = entry.union(range);
}
}
let properties_batch = RecordBatch::try_new_with_options(
Arc::new(Schema::new_with_metadata(
current_segment_properties
.keys()
.map(Arc::clone)
.collect::<Fields>(),
Default::default(),
)),
current_segment_properties.into_values().collect(),
&RecordBatchOptions::default().with_row_count(Some(1)),
)
.map_err(Error::failed_to_extract_properties)?;
let indexes_batch = RecordBatch::try_new_with_options(
Arc::new(Schema::new_with_metadata(
current_segment_indexes
.keys()
.flat_map(|timeline| {
["end", "start"].into_iter().map(|index_marker| {
let metadata: HashMap<_, _> = [
("rerun:index".to_owned(), timeline.name().to_string()),
("rerun:index_kind".to_owned(), timeline.typ().to_string()),
("rerun:index_marker".to_owned(), index_marker.to_owned()),
("rerun:kind".to_owned(), "index".to_owned()),
]
.into_iter()
.collect();
let field_name = format!("{}:{index_marker}", timeline.name());
let data_type = timeline.datatype();
Arc::new(
Field::new(field_name, data_type, true).with_metadata(metadata),
)
})
})
.collect_vec(),
HashMap::default(),
)),
current_segment_indexes
.into_iter()
.flat_map(|(timeline, range)| match timeline.typ() {
TimeType::Sequence => [
create_array!(Int64, [range.max().as_i64()]) as ArrayRef,
create_array!(Int64, [range.min().as_i64()]) as ArrayRef,
],
TimeType::DurationNs => [
create_array!(DurationNanosecond, [range.max().as_i64()]) as ArrayRef,
create_array!(DurationNanosecond, [range.min().as_i64()]) as ArrayRef,
],
TimeType::TimestampNs => [
create_array!(TimestampNanosecond, [range.max().as_i64()]) as ArrayRef,
create_array!(TimestampNanosecond, [range.min().as_i64()]) as ArrayRef,
],
})
.collect(),
&RecordBatchOptions::default().with_row_count(Some(1)),
)?;
all_segment_properties.push(properties_batch);
all_index_ranges.push(indexes_batch);
segment_ids.push(segment_id.clone());
layer_names.push(layer_names_row);
storage_urls.push(storage_urls_row);
last_updated_at.push(segment.last_updated_at().as_nanosecond() as i64);
num_chunks.push(segment.num_chunks());
size_bytes.push(segment.size_bytes());
}
let properties_record_batch =
re_arrow_util::concat_polymorphic_batches(all_segment_properties.as_slice())
.map_err(Error::failed_to_extract_properties)?;
let indexes_record_batch =
re_arrow_util::concat_polymorphic_batches(all_index_ranges.as_slice())?;
let base_record_batch = ScanSegmentTableResponse::create_dataframe(
segment_ids,
layer_names,
storage_urls,
last_updated_at,
num_chunks,
size_bytes,
)
.map_err(Error::failed_to_extract_properties)?;
base_record_batch
.concat_horizontally_with(&properties_record_batch)
.map_err(Error::failed_to_extract_properties)?
.concat_horizontally_with(&indexes_record_batch)
.map_err(Into::into)
}
pub fn dataset_manifest(&self) -> Result<RecordBatch, Error> {
self.dataset_manifest_filtered(None, None)
}
pub fn dataset_manifest_filtered(
&self,
segments_of_interest: Option<&HashSet<&SegmentId>>,
layers_of_interest: Option<&HashSet<&LayerName>>,
) -> Result<RecordBatch, Error> {
let segment_rows = self
.inner
.segments
.iter()
.filter(|(segment_id, _)| {
segments_of_interest.is_none_or(|segments| segments.contains(segment_id))
})
.flat_map(|(segment_id, segment)| {
itertools::izip!(
std::iter::repeat(segment_id),
segment.iter_sources().filter(|(name, _layer)| {
layers_of_interest.is_none_or(|layers| layers.contains(name))
})
)
})
.map(|(segment_id, (layer_name, source))| {
let segment_id = segment_id.to_string();
(layer_name, segment_id, source)
});
let layers: Vec<(&LayerName, String, &Source)> = segment_rows.collect();
let row_count = layers.len();
let mut layer_names = Vec::with_capacity(row_count);
let mut segment_ids = Vec::with_capacity(row_count);
let mut storage_urls = Vec::with_capacity(row_count);
let mut layer_types = Vec::with_capacity(row_count);
let mut registration_times = Vec::with_capacity(row_count);
let mut last_updated_at = Vec::with_capacity(row_count);
let mut num_chunks = Vec::with_capacity(row_count);
let mut size_bytes = Vec::with_capacity(row_count);
let mut schema_sha256s = Vec::with_capacity(row_count);
let mut registration_statuses = Vec::with_capacity(row_count);
let mut properties = Vec::with_capacity(row_count);
for (layer_name, segment_id, source) in layers {
layer_names.push(layer_name.clone());
storage_urls.push(format!("memory:///store/{}", source.store_slot_id()));
segment_ids.push(segment_id.into());
layer_types.push(source.data_source_kind().to_string());
registration_times.push(source.registration_time().as_nanosecond() as i64);
last_updated_at.push(source.last_updated_at().as_nanosecond() as i64);
num_chunks.push(source.num_chunks());
size_bytes.push(source.size_bytes());
schema_sha256s.push(
source
.schema_sha256()
.map_err(Error::failed_to_extract_properties)?,
);
registration_statuses.push(cloud_ext::LayerRegistrationStatus::Done.to_string());
properties.push(source.compute_properties()?);
}
let base_record_batch = ScanDatasetManifestDataframe::new(
layer_names,
segment_ids,
storage_urls,
layer_types,
registration_times,
last_updated_at,
num_chunks,
size_bytes,
schema_sha256s,
registration_statuses,
)
.into_record_batch()
.map_err(Error::failed_to_extract_properties)?;
let properties_record_batch =
re_arrow_util::concat_polymorphic_batches(properties.as_slice())
.map_err(Error::failed_to_extract_properties)?;
base_record_batch
.concat_horizontally_with(&properties_record_batch)
.map_err(Error::failed_to_extract_properties)
}
pub fn rrd_manifest(&self, segment_id: &SegmentId) -> Result<RawRrdManifest, Error> {
let partition = self.segment(segment_id)?;
let application_id = "n/a"; let segment_store_id =
StoreId::new(self.store_kind(), application_id, segment_id.to_string());
let per_layer: Vec<RawRrdManifest> = partition
.iter_sources()
.map(|(_, source)| source.rrd_manifest())
.try_collect()?;
RawRrdManifest::merge(segment_store_id, per_layer)
.map_err(|err| Error::RrdLoadingError(err.into()))
}
#[allow(clippy::allow_attributes)]
#[allow(clippy::unused_async)]
pub async fn add_source(
&mut self,
segment_id: SegmentId,
layer_info: Arc<LayerInfo>,
store_slot_id: StoreSlotId,
resolved: ResolvedStore,
on_duplicate: IfDuplicateBehavior,
) -> Result<(), Error> {
let layer_name = &layer_info.name;
re_log::debug!(?segment_id, ?layer_name, "add_layer");
if self
.inner
.asset_layers
.iter()
.any(|asset| asset.layer_info().name == *layer_name)
{
return Err(Error::LayerClassConflict(layer_name.clone()));
}
let current_schema = self.schema()?;
let new_layer_schema = {
let fields = resolved.schema().chunk_column_descriptors().arrow_fields();
Schema::new_with_metadata(fields, HashMap::default())
};
for new_field in new_layer_schema.fields() {
if let Ok(current_field) = current_schema.field_with_name(new_field.name())
&& current_field != new_field.as_ref()
{
re_arrow_util::reject_unsupported_widenings(new_field.data_type()).map_err(
|err| {
Error::SchemaConflict(format!(
"schema incompatibility on segment '{segment_id}', \
layer '{layer_name}': {err}"
))
},
)?;
}
}
let merged_schema =
Schema::try_merge([current_schema.clone(), new_layer_schema]).map_err(|err| {
Error::SchemaConflict(format!(
"schema incompatibility on segment '{segment_id}', layer '{layer_name}': {err}"
))
})?;
let source = Arc::new(Source::new(
store_slot_id,
resolved,
DataSourceKind::Rrd,
layer_info,
));
let outcome = self
.inner
.modify()
.segments
.entry(segment_id.clone())
.or_default()
.insert_source(source.clone(), on_duplicate)?;
if outcome == SourceInsertOutcome::Inserted {
let is_new_segment = self
.inner
.segments
.get(&segment_id)
.map(|s| s.source_count() == 1)
.unwrap_or(false);
if is_new_segment {
let asset_sources: Vec<Arc<Source>> = self.inner.asset_layers.clone();
for asset_source in asset_sources {
let outcome = self
.inner
.modify()
.segments
.get_mut(&segment_id)
.expect("segment exists")
.insert_source(asset_source, IfDuplicateBehavior::Error)?;
assert_eq!(
outcome,
SourceInsertOutcome::Inserted,
"insert_source with IfDuplicateBehavior::Error can only insert"
);
}
}
}
{
let mut cache = self.cached_schema.lock();
let updated_at = self.updated_at();
*cache = match outcome {
SourceInsertOutcome::Inserted => Some((updated_at, Arc::new(merged_schema))),
SourceInsertOutcome::Skipped => Some((updated_at, Arc::new(current_schema))),
SourceInsertOutcome::Overwritten => None,
};
}
Ok(())
}
#[allow(clippy::allow_attributes)]
#[allow(clippy::unused_async)]
pub async fn add_asset_source(
&mut self,
store_slot_id: StoreSlotId,
resolved: ResolvedStore,
layer_info: Arc<LayerInfo>,
on_duplicate: IfDuplicateBehavior,
) -> Result<(), Error> {
let layer_name = &layer_info.name;
re_log::debug!(?layer_name, "add_asset_source");
let current_schema = self.schema()?;
let new_layer_schema = {
let fields = resolved.schema().chunk_column_descriptors().arrow_fields();
Schema::new_with_metadata(fields, HashMap::default())
};
for new_field in new_layer_schema.fields() {
if let Ok(current_field) = current_schema.field_with_name(new_field.name())
&& current_field != new_field.as_ref()
{
re_arrow_util::reject_unsupported_widenings(new_field.data_type()).map_err(
|err| {
Error::SchemaConflict(format!(
"schema incompatibility on asset layer '{layer_name}': {err}"
))
},
)?;
}
}
let existing_pos = self
.inner
.asset_layers
.iter()
.position(|s| &s.layer_info().name == layer_name);
match (existing_pos, on_duplicate) {
(Some(_), IfDuplicateBehavior::Error) => {
return Err(Error::LayerAlreadyExists(layer_name.clone()));
}
(Some(i), IfDuplicateBehavior::Overwrite) => {
self.inner.modify().asset_layers.remove(i);
}
(Some(_), IfDuplicateBehavior::Skip) => {
re_log::info!("Ignoring asset layer '{layer_name}': already exists in dataset");
return Ok(());
}
(None, _) => {
if self
.inner
.segments
.values()
.any(|segment| segment.source(layer_name).is_some())
{
return Err(Error::LayerClassConflict(layer_name.clone()));
}
}
}
let source = Arc::new(Source::new(
store_slot_id,
resolved,
DataSourceKind::Rrd,
layer_info,
));
self.inner.modify().asset_layers.push(source.clone());
let segment_ids: Vec<SegmentId> = self.inner.segments.keys().cloned().collect();
for segment_id in segment_ids {
let outcome = self
.inner
.modify()
.segments
.get_mut(&segment_id)
.expect("segment exists")
.insert_source(source.clone(), IfDuplicateBehavior::Overwrite)?;
debug_assert_ne!(outcome, SourceInsertOutcome::Skipped);
}
*self.cached_schema.lock() = None;
Ok(())
}
#[allow(clippy::allow_attributes)]
#[allow(clippy::unused_async)]
pub async fn remove_layers(
&mut self,
segments_to_drop: Option<&HashSet<&SegmentId>>,
layers_to_drop: Option<&HashSet<&LayerName>>,
) -> Result<Vec<(SegmentId, LayerName)>, Error> {
re_log::debug!(?segments_to_drop, ?layers_to_drop, "remove_layers");
if segments_to_drop.is_none() {
self.inner.modify().asset_layers.retain(|source| {
let layer_name = &source.layer_info().name;
!layers_to_drop.is_none_or(|layers| layers.contains(layer_name))
});
}
let mut removed_layers = Vec::new();
{
let segments = &mut self.inner.modify().segments;
segments.retain(|segment_id, segment| {
if segments_to_drop.is_none_or(|segments| segments.contains(segment_id)) {
segment.retain_sources(|layer_name, _source| {
if layers_to_drop.is_none_or(|layers| layers.contains(layer_name)) {
removed_layers.push((segment_id.clone(), layer_name.clone()));
false
} else {
true
}
});
segment.source_count() > 0
} else {
true
}
});
}
Ok(removed_layers)
}
pub async fn register_rrd(
&mut self,
pool: &mut StorePool,
path: &Path,
layer_name: Option<LayerName>,
on_duplicate: IfDuplicateBehavior,
store_kind: StoreKind,
) -> Result<BTreeSet<SegmentId>, Error> {
re_log::info!("Loading RRD: {}", path.display());
let layer_name = layer_name.unwrap_or_else(LayerName::base);
let layer_info = Arc::new(LayerInfo {
name: layer_name.clone(),
layer_class: re_types_core::LayerClass::Segment,
});
let mut new_segment_ids = BTreeSet::default();
for (store_id, resolved) in ResolvedStore::load_rrd_file(path, store_kind)? {
let segment_id = SegmentId::new(store_id.recording_id().to_string());
let slot_id = pool.register(&resolved);
self.add_source(
segment_id.clone(),
layer_info.clone(),
slot_id,
resolved,
on_duplicate,
)
.await?;
new_segment_ids.insert(segment_id);
}
Ok(new_segment_ids)
}
}