use crate::coordinate::Coordinate;
use crate::event::{EventKind, HashChain};
use dashmap::DashMap;
use std::collections::{BTreeMap, HashSet};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
pub(crate) struct StoreIndex {
streams: DashMap<Arc<str>, BTreeMap<ClockKey, IndexEntry>>,
scope_entities: DashMap<Arc<str>, HashSet<Arc<str>>>,
by_fact: DashMap<EventKind, BTreeMap<ClockKey, IndexEntry>>,
by_id: DashMap<u128, IndexEntry>,
latest: DashMap<Arc<str>, IndexEntry>,
global_sequence: AtomicU64,
len: AtomicUsize,
pub(crate) entity_locks: DashMap<Arc<str>, Arc<parking_lot::Mutex<()>>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClockKey {
pub wall_ms: u64,
pub clock: u32,
pub uuid: u128,
}
#[derive(Clone, Debug)]
pub struct IndexEntry {
pub event_id: u128,
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub coord: Coordinate,
pub kind: EventKind,
pub wall_ms: u64,
pub clock: u32,
pub hash_chain: HashChain,
pub disk_pos: DiskPos,
pub global_sequence: u64,
}
#[derive(Clone, Debug)]
pub struct DiskPos {
pub segment_id: u64,
pub offset: u64,
pub length: u32,
}
impl Ord for ClockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.wall_ms
.cmp(&other.wall_ms)
.then(self.clock.cmp(&other.clock))
.then(self.uuid.cmp(&other.uuid))
}
}
impl PartialOrd for ClockKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl IndexEntry {
pub fn is_correlated(&self) -> bool {
self.event_id != self.correlation_id
}
pub fn is_caused_by(&self, event_id: u128) -> bool {
self.causation_id == Some(event_id)
}
pub fn is_root_cause(&self) -> bool {
self.causation_id.is_none()
}
}
impl StoreIndex {
pub(crate) fn new() -> Self {
Self {
streams: DashMap::new(),
scope_entities: DashMap::new(),
by_fact: DashMap::new(),
by_id: DashMap::new(),
latest: DashMap::new(),
global_sequence: AtomicU64::new(0),
len: AtomicUsize::new(0),
entity_locks: DashMap::new(),
}
}
pub(crate) fn insert(&self, entry: IndexEntry) {
let entity = entry.coord.entity_arc();
let scope = entry.coord.scope_arc();
let key = ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
};
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key.clone(), entry.clone());
self.scope_entities
.entry(scope)
.or_default()
.insert(Arc::clone(&entity));
self.by_fact
.entry(entry.kind)
.or_default()
.insert(key, entry.clone());
self.by_id.insert(entry.event_id, entry.clone());
self.latest.insert(entity, entry);
self.global_sequence.fetch_add(1, Ordering::SeqCst);
self.len.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn get_by_id(&self, event_id: u128) -> Option<IndexEntry> {
self.by_id.get(&event_id).map(|r| r.value().clone())
}
pub(crate) fn get_latest(&self, entity: &str) -> Option<IndexEntry> {
self.latest.get(entity).map(|r| r.value().clone())
}
pub(crate) fn stream(&self, entity: &str) -> Vec<IndexEntry> {
self.streams
.get(entity)
.map(|r| r.value().values().cloned().collect())
.unwrap_or_default()
}
pub(crate) fn query(&self, region: &crate::coordinate::Region) -> Vec<IndexEntry> {
use crate::coordinate::KindFilter;
let mut candidates: Vec<IndexEntry> = if let Some(ref prefix) = region.entity_prefix {
self.streams
.iter()
.filter(|r| r.key().as_ref().starts_with(prefix.as_ref()))
.flat_map(|r| r.value().values().cloned().collect::<Vec<_>>())
.collect()
} else if let Some(ref scope) = region.scope {
if let Some(entities) = self.scope_entities.get(scope.as_ref()) {
entities
.value()
.iter()
.flat_map(|entity| {
self.streams
.get(entity.as_ref())
.map(|r| r.value().values().cloned().collect::<Vec<_>>())
.unwrap_or_default()
})
.collect()
} else {
Vec::new()
}
} else if let Some(ref fact) = region.fact {
match fact {
KindFilter::Exact(k) => self
.by_fact
.get(k)
.map(|r| r.value().values().cloned().collect())
.unwrap_or_default(),
KindFilter::Category(c) => {
let cat = *c;
self.by_fact
.iter()
.filter(|r| r.key().category() == cat)
.flat_map(|r| r.value().values().cloned().collect::<Vec<_>>())
.collect()
}
KindFilter::Any => {
self.streams
.iter()
.flat_map(|r| r.value().values().cloned().collect::<Vec<_>>())
.collect()
}
}
} else {
self.streams
.iter()
.flat_map(|r| r.value().values().cloned().collect::<Vec<_>>())
.collect()
};
if region.entity_prefix.is_some() {
if let Some(ref scope) = region.scope {
candidates.retain(|e| e.coord.scope() == scope.as_ref());
}
}
if region.entity_prefix.is_some() || region.scope.is_some() {
if let Some(ref fact) = region.fact {
candidates.retain(|e| match fact {
KindFilter::Exact(k) => e.kind == *k,
KindFilter::Category(c) => e.kind.category() == *c,
KindFilter::Any => true,
});
}
}
if let Some((min, max)) = region.clock_range {
candidates.retain(|e| e.clock >= min && e.clock <= max);
}
candidates.sort_by_key(|e| e.global_sequence);
candidates
}
pub(crate) fn global_sequence(&self) -> u64 {
self.global_sequence.load(Ordering::SeqCst)
}
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub(crate) fn clear(&self) {
self.streams.clear();
self.scope_entities.clear();
self.by_fact.clear();
self.by_id.clear();
self.latest.clear();
self.global_sequence.store(0, Ordering::SeqCst);
self.len.store(0, Ordering::Relaxed);
}
}