use std::collections::BTreeMap;
use nohash_hasher::IntMap;
use re_arrow_store::{DataStoreConfig, GarbageCollectionOptions};
use re_log_types::{
ApplicationId, ComponentPath, DataCell, DataRow, DataTable, EntityPath, EntityPathHash, LogMsg,
PathOp, RowId, SetStoreInfo, StoreId, StoreInfo, StoreKind, TimePoint, Timeline,
};
use re_types_core::{components::InstanceKey, Loggable};
use crate::{Error, TimesPerTimeline};
pub struct EntityDb {
pub entity_path_from_hash: IntMap<EntityPathHash, EntityPath>,
pub times_per_timeline: TimesPerTimeline,
pub tree: crate::EntityTree,
pub data_store: re_arrow_store::DataStore,
}
impl Default for EntityDb {
fn default() -> Self {
Self {
entity_path_from_hash: Default::default(),
times_per_timeline: Default::default(),
tree: crate::EntityTree::root(),
data_store: re_arrow_store::DataStore::new(
InstanceKey::name(),
DataStoreConfig::default(),
),
}
}
}
impl EntityDb {
pub fn entity_paths(&self) -> Vec<&EntityPath> {
use itertools::Itertools as _;
self.entity_path_from_hash.values().sorted().collect()
}
#[inline]
pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
self.entity_path_from_hash.get(entity_path_hash)
}
#[inline]
pub fn is_known_entity(&self, entity_path: &EntityPath) -> bool {
self.tree.subtree(entity_path).is_some()
}
#[inline]
pub fn is_logged_entity(&self, entity_path: &EntityPath) -> bool {
self.entity_path_from_hash.contains_key(&entity_path.hash())
}
fn register_entity_path(&mut self, entity_path: &EntityPath) {
self.entity_path_from_hash
.entry(entity_path.hash())
.or_insert_with(|| entity_path.clone());
}
fn add_data_row(&mut self, row: &DataRow) -> Result<(), Error> {
self.register_entity_path(&row.entity_path);
for (&timeline, &time_int) in row.timepoint().iter() {
self.times_per_timeline.insert(timeline, time_int);
}
for cell in row.cells().iter() {
let component_path =
ComponentPath::new(row.entity_path().clone(), cell.component_name());
let pending_clears = self.tree.add_data_msg(row.timepoint(), &component_path);
for (row_id, time_point) in pending_clears {
let cell =
DataCell::from_arrow_empty(cell.component_name(), cell.datatype().clone());
let row = DataRow::from_cells1(
row_id,
row.entity_path.clone(),
time_point.clone(),
cell.num_instances(),
cell,
)?;
self.data_store.insert_row(&row).ok();
self.tree.add_data_msg(&time_point, &component_path);
}
}
self.data_store.insert_row(row)?;
use re_types_core::components::ClearIsRecursive;
if let Some(idx) = row.find_cell(&ClearIsRecursive::name()) {
let cell = &row.cells()[idx];
let settings = cell.try_to_native_mono::<ClearIsRecursive>().unwrap();
let path_op = if settings.map_or(false, |s| s.0) {
PathOp::ClearRecursive(row.entity_path.clone())
} else {
PathOp::ClearComponents(row.entity_path.clone())
};
self.add_path_op(row.row_id().next(), row.timepoint(), &path_op);
}
Ok(())
}
fn add_path_op(&mut self, row_id: RowId, time_point: &TimePoint, path_op: &PathOp) {
let cleared_paths = self.tree.add_path_op(row_id, time_point, path_op);
let mut cells = BTreeMap::<EntityPath, Vec<DataCell>>::default();
for component_path in cleared_paths {
if let Some(data_type) = self
.data_store
.lookup_datatype(&component_path.component_name)
{
let cells = cells
.entry(component_path.entity_path.clone())
.or_insert_with(Vec::new);
cells.push(DataCell::from_arrow_empty(
component_path.component_name,
data_type.clone(),
));
self.tree.add_data_msg(time_point, &component_path);
}
}
let mut row_id = row_id;
for (ent_path, cells) in cells {
match DataRow::from_cells(row_id, time_point.clone(), ent_path, 0, cells) {
Ok(row) => {
self.data_store.insert_row(&row).ok();
}
Err(err) => {
re_log::error_once!("Failed to insert PathOp {path_op:?}: {err}");
}
}
row_id = row_id.next();
}
}
pub fn purge(&mut self, deleted: &re_arrow_store::Deleted) {
re_tracing::profile_function!();
let Self {
entity_path_from_hash: _,
times_per_timeline,
tree,
data_store: _, } = self;
let mut actually_deleted = Default::default();
{
re_tracing::profile_scope!("tree");
tree.purge(deleted, &mut actually_deleted);
}
{
re_tracing::profile_scope!("times_per_timeline");
for (timeline, times) in actually_deleted.timeful {
if let Some(time_set) = times_per_timeline.get_mut(&timeline) {
for time in times {
time_set.remove(&time);
}
}
}
}
}
}
pub struct StoreDb {
store_id: StoreId,
pub data_source: Option<re_smart_channel::SmartChannelSource>,
set_store_info: Option<SetStoreInfo>,
entity_db: EntityDb,
}
impl StoreDb {
pub fn new(store_id: StoreId) -> Self {
Self {
store_id,
data_source: None,
set_store_info: None,
entity_db: Default::default(),
}
}
pub fn from_info_and_rows(
store_info: StoreInfo,
rows: impl IntoIterator<Item = DataRow>,
) -> Result<Self, Error> {
let mut store_db = StoreDb::new(store_info.store_id.clone());
store_db.set_store_info(SetStoreInfo {
row_id: RowId::random(),
info: store_info,
});
for row in rows {
store_db.add_data_row(&row)?;
}
Ok(store_db)
}
#[inline]
pub fn entity_db(&self) -> &EntityDb {
&self.entity_db
}
pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
self.set_store_info.as_ref()
}
pub fn store_info(&self) -> Option<&StoreInfo> {
self.store_info_msg().map(|msg| &msg.info)
}
pub fn app_id(&self) -> Option<&ApplicationId> {
self.store_info().map(|ri| &ri.application_id)
}
#[inline]
pub fn store(&self) -> &re_arrow_store::DataStore {
&self.entity_db.data_store
}
pub fn store_kind(&self) -> StoreKind {
self.store_id.kind
}
pub fn store_id(&self) -> &StoreId {
&self.store_id
}
pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
self.times_per_timeline().timelines()
}
pub fn times_per_timeline(&self) -> &TimesPerTimeline {
&self.entity_db.times_per_timeline
}
pub fn num_timeless_messages(&self) -> usize {
self.entity_db.tree.num_timeless_messages()
}
pub fn num_rows(&self) -> usize {
self.entity_db.data_store.num_timeless_rows() as usize
+ self.entity_db.data_store.num_temporal_rows() as usize
}
pub fn generation(&self) -> re_arrow_store::StoreGeneration {
self.entity_db.data_store.generation()
}
pub fn is_empty(&self) -> bool {
self.set_store_info.is_none() && self.num_rows() == 0
}
pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
re_tracing::profile_function!();
debug_assert_eq!(msg.store_id(), self.store_id());
match &msg {
LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()),
LogMsg::ArrowMsg(_, arrow_msg) => {
let table = DataTable::from_arrow_msg(arrow_msg)?;
self.add_data_table(table)?;
}
}
Ok(())
}
pub fn add_data_table(&mut self, mut table: DataTable) -> Result<(), Error> {
table.compute_all_size_bytes();
for row in table.to_rows() {
let row = row?;
self.add_data_row(&row)?;
}
Ok(())
}
pub fn add_data_row(&mut self, row: &DataRow) -> Result<(), Error> {
self.entity_db.add_data_row(row)
}
pub fn set_store_info(&mut self, store_info: SetStoreInfo) {
self.set_store_info = Some(store_info);
}
pub fn gc_everything_but_the_latest_row(&mut self) {
re_tracing::profile_function!();
self.gc(GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::Everything,
gc_timeless: true,
protect_latest: 1, purge_empty_tables: true,
});
}
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
re_tracing::profile_function!();
assert!((0.0..=1.0).contains(&fraction_to_purge));
self.gc(GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(
fraction_to_purge as _,
),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
});
}
pub fn gc(&mut self, gc_options: GarbageCollectionOptions) {
re_tracing::profile_function!();
let (deleted, stats_diff) = self.entity_db.data_store.gc(gc_options);
re_log::trace!(
num_row_ids_dropped = deleted.row_ids.len(),
size_bytes_dropped = re_format::format_bytes(stats_diff.total.num_bytes as _),
"purged datastore"
);
let Self {
store_id: _,
data_source: _,
set_store_info: _,
entity_db,
} = self;
entity_db.purge(&deleted);
}
pub fn sort_key(&self) -> impl Ord + '_ {
self.store_info()
.map(|info| (info.application_id.0.as_str(), info.started))
}
}