use std::collections::BTreeMap;
use itertools::Itertools;
use nohash_hasher::IntMap;
use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, StoreEvent, StoreSubscriber,
};
use re_log_types::{
ApplicationId, DataCell, DataRow, DataTable, EntityPath, EntityPathHash, LogMsg, RowId,
SetStoreInfo, StoreId, StoreInfo, StoreKind, TimePoint, Timeline,
};
use re_types_core::{components::InstanceKey, Archetype, Loggable};
use crate::{ClearCascade, CompactedStoreEvents, Error, TimesPerTimeline};
pub struct EntityDb {
pub entity_path_from_hash: IntMap<EntityPathHash, EntityPath>,
pub times_per_timeline: TimesPerTimeline,
pub tree: crate::EntityTree,
pub data_store: DataStore,
}
impl EntityDb {
pub fn new(store_id: StoreId) -> Self {
Self {
entity_path_from_hash: Default::default(),
times_per_timeline: Default::default(),
tree: crate::EntityTree::root(),
data_store: re_arrow_store::DataStore::new(
store_id,
InstanceKey::name(),
DataStoreConfig::default(),
),
}
}
}
const MAX_INSERT_ROW_ATTEMPTS: usize = 1_000;
const DEFAULT_INSERT_ROW_STEP_SIZE: u64 = 100;
fn insert_row_with_retries(
store: &mut DataStore,
mut row: DataRow,
num_attempts: usize,
step_size: u64,
) -> re_arrow_store::WriteResult<StoreEvent> {
fn random_u64() -> u64 {
let mut bytes = [0_u8; 8];
getrandom::getrandom(&mut bytes).map_or(0, |_| u64::from_le_bytes(bytes))
}
for _ in 0..num_attempts {
match store.insert_row(&row) {
Ok(event) => return Ok(event),
Err(re_arrow_store::WriteError::ReusedRowId(_)) => {
re_log::debug!(row_id = %row.row_id(), "Found duplicated RowId, retrying…");
row.row_id = row.row_id.increment(random_u64() % step_size + 1);
}
Err(err) => return Err(err),
}
}
Err(re_arrow_store::WriteError::ReusedRowId(row.row_id()))
}
impl EntityDb {
pub fn entity_paths(&self) -> Vec<&EntityPath> {
use itertools::Itertools as _;
self.entity_path_from_hash.values().sorted().collect()
}
#[inline]
pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
self.entity_path_from_hash.get(entity_path_hash)
}
#[inline]
pub fn is_known_entity(&self, entity_path: &EntityPath) -> bool {
self.tree.subtree(entity_path).is_some()
}
#[inline]
pub fn is_logged_entity(&self, entity_path: &EntityPath) -> bool {
self.entity_path_from_hash.contains_key(&entity_path.hash())
}
fn register_entity_path(&mut self, entity_path: &EntityPath) {
self.entity_path_from_hash
.entry(entity_path.hash())
.or_insert_with(|| entity_path.clone());
}
pub fn add_data_row(&mut self, row: DataRow) -> Result<(), Error> {
re_tracing::profile_function!(format!("num_cells={}", row.num_cells()));
self.register_entity_path(&row.entity_path);
let store_event = insert_row_with_retries(
&mut self.data_store,
row,
MAX_INSERT_ROW_ATTEMPTS,
DEFAULT_INSERT_ROW_STEP_SIZE,
)?;
let store_events = &[store_event];
self.times_per_timeline.on_events(store_events);
let clear_cascade = self.tree.on_store_additions(store_events);
let store_events = self.on_clear_cascade(clear_cascade);
self.times_per_timeline.on_events(&store_events);
let clear_cascade = self.tree.on_store_additions(&store_events);
debug_assert!(clear_cascade.is_empty());
Ok(())
}
fn on_clear_cascade(&mut self, clear_cascade: ClearCascade) -> Vec<StoreEvent> {
let mut store_events = Vec::new();
let mut to_be_inserted =
BTreeMap::<RowId, BTreeMap<EntityPath, (TimePoint, Vec<DataCell>)>>::default();
for (row_id, per_entity) in clear_cascade.to_be_cleared {
for (entity_path, (timepoint, component_paths)) in per_entity {
let per_entity = to_be_inserted.entry(row_id).or_default();
let (cur_timepoint, cells) = per_entity.entry(entity_path).or_default();
*cur_timepoint = timepoint.union_max(cur_timepoint);
for component_path in component_paths {
if let Some(data_type) = self
.data_store
.lookup_datatype(&component_path.component_name)
{
cells.push(DataCell::from_arrow_empty(
component_path.component_name,
data_type.clone(),
));
}
}
}
}
for (row_id, per_entity) in to_be_inserted {
let mut row_id = row_id;
for (entity_path, (timepoint, cells)) in per_entity {
match DataRow::from_cells(row_id, timepoint.clone(), entity_path, 0, cells) {
Ok(row) => {
let res = insert_row_with_retries(
&mut self.data_store,
row,
MAX_INSERT_ROW_ATTEMPTS,
DEFAULT_INSERT_ROW_STEP_SIZE,
);
match res {
Ok(store_event) => {
row_id = store_event.row_id.next();
store_events.push(store_event);
}
Err(err) => {
re_log::error_once!(
"Failed to propagate EntityTree cascade: {err}"
);
}
}
}
Err(err) => {
re_log::error_once!("Failed to propagate EntityTree cascade: {err}");
}
}
}
}
store_events
}
pub fn on_store_deletions(&mut self, store_events: &[StoreEvent]) {
re_tracing::profile_function!();
let Self {
entity_path_from_hash: _,
times_per_timeline,
tree,
data_store: _,
} = self;
times_per_timeline.on_events(store_events);
let store_events = store_events.iter().collect_vec();
let compacted = CompactedStoreEvents::new(&store_events);
tree.on_store_deletions(&store_events, &compacted);
}
}
pub struct StoreDb {
store_id: StoreId,
pub data_source: Option<re_smart_channel::SmartChannelSource>,
set_store_info: Option<SetStoreInfo>,
entity_db: EntityDb,
last_modified_at: web_time::Instant,
}
impl StoreDb {
pub fn new(store_id: StoreId) -> Self {
Self {
store_id: store_id.clone(),
data_source: None,
set_store_info: None,
entity_db: EntityDb::new(store_id),
last_modified_at: web_time::Instant::now(),
}
}
pub fn from_info_and_rows(
store_info: StoreInfo,
rows: impl IntoIterator<Item = DataRow>,
) -> Result<Self, Error> {
let mut store_db = StoreDb::new(store_info.store_id.clone());
store_db.set_store_info(SetStoreInfo {
row_id: RowId::random(),
info: store_info,
});
for row in rows {
store_db.add_data_row(row)?;
}
Ok(store_db)
}
#[inline]
pub fn entity_db(&self) -> &EntityDb {
&self.entity_db
}
pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
self.set_store_info.as_ref()
}
pub fn store_info(&self) -> Option<&StoreInfo> {
self.store_info_msg().map(|msg| &msg.info)
}
pub fn app_id(&self) -> Option<&ApplicationId> {
self.store_info().map(|ri| &ri.application_id)
}
#[inline]
pub fn store(&self) -> &DataStore {
&self.entity_db.data_store
}
pub fn store_kind(&self) -> StoreKind {
self.store_id.kind
}
pub fn store_id(&self) -> &StoreId {
&self.store_id
}
pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
self.times_per_timeline().keys()
}
pub fn times_per_timeline(&self) -> &TimesPerTimeline {
&self.entity_db.times_per_timeline
}
pub fn time_histogram(&self, timeline: &Timeline) -> Option<&crate::TimeHistogram> {
self.entity_db().tree.recursive_time_histogram.get(timeline)
}
pub fn num_timeless_messages(&self) -> u64 {
self.entity_db.tree.num_timeless_messages()
}
pub fn num_rows(&self) -> usize {
self.entity_db.data_store.num_timeless_rows() as usize
+ self.entity_db.data_store.num_temporal_rows() as usize
}
pub fn generation(&self) -> re_arrow_store::StoreGeneration {
self.entity_db.data_store.generation()
}
pub fn last_modified_at(&self) -> web_time::Instant {
self.last_modified_at
}
pub fn is_empty(&self) -> bool {
self.set_store_info.is_none() && self.num_rows() == 0
}
pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
re_tracing::profile_function!();
debug_assert_eq!(msg.store_id(), self.store_id());
match &msg {
LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()),
LogMsg::ArrowMsg(_, arrow_msg) => {
let table = DataTable::from_arrow_msg(arrow_msg)?;
self.add_data_table(table)?;
}
}
Ok(())
}
pub fn add_data_table(&mut self, mut table: DataTable) -> Result<(), Error> {
table.compute_all_size_bytes();
for row in table.to_rows() {
self.add_data_row(row?)?;
}
self.last_modified_at = web_time::Instant::now();
Ok(())
}
pub fn add_data_row(&mut self, row: DataRow) -> Result<(), Error> {
self.entity_db.add_data_row(row).map(|_| ())
}
pub fn set_store_info(&mut self, store_info: SetStoreInfo) {
self.set_store_info = Some(store_info);
}
pub fn gc_everything_but_the_latest_row(&mut self) {
re_tracing::profile_function!();
self.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::Everything,
gc_timeless: true,
protect_latest: 1, purge_empty_tables: true,
dont_protect: [
re_types_core::components::ClearIsRecursive::name(),
re_types_core::archetypes::Clear::indicator().name(),
]
.into_iter()
.collect(),
});
}
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
re_tracing::profile_function!();
assert!((0.0..=1.0).contains(&fraction_to_purge));
self.gc(&GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(
fraction_to_purge as _,
),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
});
}
pub fn gc(&mut self, gc_options: &GarbageCollectionOptions) {
re_tracing::profile_function!();
let (store_events, stats_diff) = self.entity_db.data_store.gc(gc_options);
re_log::trace!(
num_row_ids_dropped = store_events.len(),
size_bytes_dropped = re_format::format_bytes(stats_diff.total.num_bytes as _),
"purged datastore"
);
self.entity_db.on_store_deletions(&store_events);
}
pub fn sort_key(&self) -> impl Ord + '_ {
self.store_info()
.map(|info| (info.application_id.0.as_str(), info.started))
}
}