use std::path::Path;
use std::sync::Arc;
use arrow::array::RecordBatch;
use nohash_hasher::IntSet;
use re_chunk_store::{
ChunkStore, ChunkStoreHandle, ChunkStoreHandleWeak, ChunkTrackingMode, LazyStore, QueryResults,
StoreSchema,
};
use re_log_encoding::{RrdChunkProvider, RrdManifest};
use re_log_types::{EntityPath, StoreId, StoreKind};
#[derive(Clone)]
pub enum ResolvedStore {
Eager(ChunkStoreHandle),
Lazy(Arc<LazyStore>),
}
impl ResolvedStore {
pub fn store_id(&self) -> StoreId {
match self {
Self::Eager(h) => h.read().id().clone(),
Self::Lazy(l) => l.store_id().clone(),
}
}
pub fn schema(&self) -> StoreSchema {
match self {
Self::Eager(h) => h.read().schema().clone(),
Self::Lazy(l) => l.schema(),
}
}
pub fn all_entities(&self) -> IntSet<EntityPath> {
match self {
Self::Eager(h) => h.read().all_entities(),
Self::Lazy(l) => l.all_entities(),
}
}
pub fn latest_at_relevant_chunks_for_all_components(
&self,
report_mode: ChunkTrackingMode,
query: &re_chunk_store::LatestAtQuery,
entity_path: &EntityPath,
include_static: bool,
) -> QueryResults {
match self {
Self::Eager(h) => h.read().latest_at_relevant_chunks_for_all_components(
report_mode,
query,
entity_path,
include_static,
),
Self::Lazy(l) => l.latest_at_relevant_chunks_for_all_components(
report_mode,
query,
entity_path,
include_static,
),
}
}
pub fn range_relevant_chunks_for_all_components(
&self,
report_mode: ChunkTrackingMode,
query: &re_chunk_store::RangeQuery,
entity_path: &EntityPath,
include_static: bool,
) -> QueryResults {
match self {
Self::Eager(h) => h.read().range_relevant_chunks_for_all_components(
report_mode,
query,
entity_path,
include_static,
),
Self::Lazy(l) => l.range_relevant_chunks_for_all_components(
report_mode,
query,
entity_path,
include_static,
),
}
}
pub fn manifest(&self) -> Option<&Arc<RrdManifest>> {
match self {
Self::Eager(_) => None,
Self::Lazy(l) => Some(l.manifest()),
}
}
pub fn extract_properties(&self) -> Result<RecordBatch, super::Error> {
match self {
Self::Eager(h) => h.read().extract_properties(),
Self::Lazy(l) => l.extract_properties(),
}
.map_err(super::Error::failed_to_extract_properties)
}
pub(crate) fn downgrade(&self) -> ResolvedStoreWeak {
match self {
Self::Eager(h) => ResolvedStoreWeak::Eager(h.downgrade()),
Self::Lazy(l) => ResolvedStoreWeak::Lazy(Arc::downgrade(l)),
}
}
pub fn load_rrd_file(
path: &Path,
store_kind: StoreKind,
) -> Result<Vec<(StoreId, Self)>, super::Error> {
let mut file = std::fs::File::open(path)?;
if let Ok(Some(footer)) = re_log_encoding::read_rrd_footer(&mut file) {
drop(file);
let mut out = Vec::with_capacity(footer.manifests.len());
for (store_id, raw_manifest) in footer.manifests {
if store_id.kind() != store_kind {
continue;
}
let store_file = std::fs::File::open(path)?;
let provider = Arc::new(
RrdChunkProvider::try_from_file(store_file, path, Arc::new(raw_manifest))
.map_err(|err| super::Error::RrdLoadingError(err.into()))?,
);
let lazy = Arc::new(LazyStore::new(provider));
out.push((store_id, Self::Lazy(lazy)));
}
Ok(out)
} else {
let contents = ChunkStore::handle_from_rrd_filepath(
&super::InMemoryStore::default_eager_chunk_store_config(),
path,
)
.map_err(super::Error::RrdLoadingError)?;
Ok(contents
.into_iter()
.filter(|(store_id, _)| store_id.kind() == store_kind)
.map(|(store_id, handle)| (store_id, Self::Eager(handle)))
.collect())
}
}
}
pub(crate) enum ResolvedStoreWeak {
Eager(ChunkStoreHandleWeak),
Lazy(std::sync::Weak<LazyStore>),
}
impl ResolvedStoreWeak {
pub fn upgrade(&self) -> Option<ResolvedStore> {
match self {
Self::Eager(w) => w.upgrade().map(ResolvedStore::Eager),
Self::Lazy(w) => w.upgrade().map(ResolvedStore::Lazy),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::sync::Arc;
use re_chunk::{Chunk, RowId, TimePoint, Timeline};
use re_log_types::example_components::{MyPoint, MyPoints};
use re_log_types::{
EntityPath, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource,
};
use super::ResolvedStore;
fn write_rrd(path: &std::path::Path, store_id: &StoreId, with_footer: bool) {
let entity_path = EntityPath::from("/test/entity");
let timeline = Timeline::new_sequence("frame");
let chunks: Vec<Arc<Chunk>> = (0..3)
.map(|i| {
let points = MyPoint::from_iter(i as u32..i as u32 + 1);
Arc::new(
Chunk::builder(entity_path.clone())
.with_sparse_component_batches(
RowId::new(),
TimePoint::default().with(timeline, i64::from(i)),
[(MyPoints::descriptor_points(), Some(&points as _))],
)
.build()
.unwrap(),
)
})
.collect();
let mut file = std::fs::File::create(path).unwrap();
let mut encoder = re_log_encoding::Encoder::new_eager(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::EncodingOptions::PROTOBUF_COMPRESSED,
&mut file,
)
.unwrap();
if !with_footer {
encoder.do_not_emit_footer();
}
encoder
.append(&LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *RowId::ZERO,
info: StoreInfo::new(store_id.clone(), StoreSource::Unknown),
}))
.unwrap();
for chunk in &chunks {
encoder
.append(&LogMsg::ArrowMsg(
store_id.clone(),
chunk.to_arrow_msg().unwrap(),
))
.unwrap();
}
encoder.finish().unwrap();
}
#[test]
fn enumerate_and_load_agree_on_store_ids() {
for with_footer in [true, false] {
let file = tempfile::NamedTempFile::new().unwrap();
let path = file.path();
let store_id = StoreId::random(StoreKind::Recording, "test");
write_rrd(path, &store_id, with_footer);
let validated: BTreeSet<StoreId> =
re_log_encoding::enumerate_rrd_stores(&mut std::fs::File::open(path).unwrap())
.unwrap()
.into_iter()
.filter(|id| id.kind() == StoreKind::Recording)
.collect();
let loaded: BTreeSet<StoreId> =
ResolvedStore::load_rrd_file(path, StoreKind::Recording)
.unwrap()
.into_iter()
.map(|(id, _)| id)
.collect();
assert_eq!(
validated, loaded,
"validate/load store-id sets must agree (with_footer={with_footer})"
);
assert_eq!(loaded, BTreeSet::from([store_id]));
}
}
}