mod aosoa;
mod aosoa64simd;
mod projection_fast_paths;
mod routing;
mod soa;
mod soaos;
use crate::event::EventKind;
use crate::store::index::{ClockKey, DiskPos, IndexEntry, QueryHit, RoutingSummary};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use aosoa::AoSoAInner;
#[cfg(test)]
use aosoa::Tile;
use aosoa64simd::AoSoA64SimdInner;
#[cfg(test)]
use routing::{ProjectionSupport, ScanCapabilities, ScanRoute};
use soa::SoAInner;
pub(crate) use soaos::CachedProjectionSlot;
use soaos::SoAoSInner;
type ProjectionCandidates = (u64, u64, Vec<(u64, DiskPos)>);
#[inline]
fn event_kind_raw(kind: EventKind) -> u16 {
kind.as_raw_u16()
}
#[cfg(test)]
#[inline]
pub(super) fn apply_after_bounds(
v: &mut Vec<QueryHit>,
after_seq: u64,
started: bool,
limit: usize,
) {
if started {
v.retain(|h| h.global_sequence > after_seq);
}
v.sort_by_key(|h| h.global_sequence);
v.truncate(limit);
}
#[derive(Clone, Copy, Debug)]
enum EntryQuery<'a> {
Kind(EventKind),
Category(u8),
Scope(&'a str),
}
enum ColumnarVariant {
SoA(RwLock<SoAInner>),
#[cfg(test)]
AoSoA8(RwLock<AoSoAInner<8>>),
#[cfg(test)]
AoSoA16(RwLock<AoSoAInner<16>>),
AoSoA64(RwLock<AoSoAInner<64>>),
AoSoA64Simd(RwLock<AoSoA64SimdInner>),
SoAoS(RwLock<SoAoSInner>),
}
pub(crate) struct ColumnarIndex {
inner: ColumnarVariant,
}
impl ColumnarIndex {
pub(crate) fn new_soa() -> Self {
Self {
inner: ColumnarVariant::SoA(RwLock::new(SoAInner::new())),
}
}
#[cfg(test)]
pub(crate) fn new_aosoa8() -> Self {
Self {
inner: ColumnarVariant::AoSoA8(RwLock::new(AoSoAInner::<8>::new())),
}
}
#[cfg(test)]
pub(crate) fn new_aosoa16() -> Self {
Self {
inner: ColumnarVariant::AoSoA16(RwLock::new(AoSoAInner::<16>::new())),
}
}
pub(crate) fn new_aosoa64() -> Self {
Self {
inner: ColumnarVariant::AoSoA64(RwLock::new(AoSoAInner::<64>::new())),
}
}
pub(crate) fn new_aosoa64_simd() -> Self {
Self {
inner: ColumnarVariant::AoSoA64Simd(RwLock::new(AoSoA64SimdInner::new())),
}
}
pub(crate) fn new_soaos() -> Self {
Self {
inner: ColumnarVariant::SoAoS(RwLock::new(SoAoSInner::new())),
}
}
pub(crate) fn insert(&self, entry: &Arc<IndexEntry>) {
match &self.inner {
ColumnarVariant::SoA(lock) => lock.write().push(entry),
#[cfg(test)]
ColumnarVariant::AoSoA8(lock) => lock.write().push(entry),
#[cfg(test)]
ColumnarVariant::AoSoA16(lock) => lock.write().push(entry),
ColumnarVariant::AoSoA64(lock) => lock.write().push(entry),
ColumnarVariant::AoSoA64Simd(lock) => lock.write().push(entry),
ColumnarVariant::SoAoS(lock) => lock.write().push(entry),
}
}
pub(crate) fn rebuild_from_restore_base(
&self,
entries_by_sequence: &[Arc<IndexEntry>],
entries_by_entity: &[Arc<IndexEntry>],
routing: &RoutingSummary,
) {
match &self.inner {
ColumnarVariant::SoA(lock) => {
*lock.write() = SoAInner::from_entries(entries_by_sequence)
}
#[cfg(test)]
ColumnarVariant::AoSoA8(lock) => {
*lock.write() = AoSoAInner::<8>::from_entries(entries_by_sequence)
}
#[cfg(test)]
ColumnarVariant::AoSoA16(lock) => {
*lock.write() = AoSoAInner::<16>::from_entries(entries_by_sequence)
}
ColumnarVariant::AoSoA64(lock) => {
*lock.write() = AoSoAInner::<64>::from_entries(entries_by_sequence)
}
ColumnarVariant::AoSoA64Simd(lock) => {
*lock.write() = AoSoA64SimdInner::from_entries(entries_by_sequence)
}
ColumnarVariant::SoAoS(lock) => {
*lock.write() = SoAoSInner::from_restore_base(entries_by_entity, routing)
}
}
}
fn query_hits_sorted(&self, query: EntryQuery<'_>) -> Vec<QueryHit> {
let mut results = match &self.inner {
ColumnarVariant::SoA(lock) => lock.read().hits_candidates(&query),
ColumnarVariant::AoSoA64(lock) => lock.read().hits_candidates(&query),
ColumnarVariant::AoSoA64Simd(lock) => lock.read().hits_candidates(&query),
ColumnarVariant::SoAoS(lock) => lock.read().hits_candidates(&query),
#[cfg(test)]
ColumnarVariant::AoSoA8(lock) => lock.read().hits_candidates(&query),
#[cfg(test)]
ColumnarVariant::AoSoA16(lock) => lock.read().hits_candidates(&query),
};
results.sort_by_key(|h| h.global_sequence);
results
}
#[cfg(test)]
fn query_hits_sorted_after(
&self,
query: EntryQuery<'_>,
after_seq: u64,
started: bool,
limit: usize,
) -> Vec<QueryHit> {
match &self.inner {
ColumnarVariant::SoA(lock) => {
lock.read()
.hits_candidates_after(&query, after_seq, started, limit)
}
ColumnarVariant::AoSoA64(lock) => {
let mut v = lock.read().hits_candidates(&query);
apply_after_bounds(&mut v, after_seq, started, limit);
v
}
ColumnarVariant::AoSoA64Simd(lock) => {
let mut v = lock.read().hits_candidates(&query);
apply_after_bounds(&mut v, after_seq, started, limit);
v
}
ColumnarVariant::SoAoS(lock) => {
let mut v = lock.read().hits_candidates(&query);
apply_after_bounds(&mut v, after_seq, started, limit);
v
}
#[cfg(test)]
ColumnarVariant::AoSoA8(lock) => {
let mut v = lock.read().hits_candidates(&query);
apply_after_bounds(&mut v, after_seq, started, limit);
v
}
#[cfg(test)]
ColumnarVariant::AoSoA16(lock) => {
let mut v = lock.read().hits_candidates(&query);
apply_after_bounds(&mut v, after_seq, started, limit);
v
}
}
}
pub(crate) fn query_hits_by_kind(&self, target: EventKind) -> Vec<QueryHit> {
self.query_hits_sorted(EntryQuery::Kind(target))
}
pub(crate) fn query_hits_by_category(&self, category: u8) -> Vec<QueryHit> {
self.query_hits_sorted(EntryQuery::Category(category))
}
pub(crate) fn query_hits_by_scope(&self, scope: &str) -> Vec<QueryHit> {
self.query_hits_sorted(EntryQuery::Scope(scope))
}
#[cfg(test)]
pub(crate) fn query_hits_by_kind_after(
&self,
target: EventKind,
after_seq: u64,
started: bool,
limit: usize,
) -> Vec<QueryHit> {
self.query_hits_sorted_after(EntryQuery::Kind(target), after_seq, started, limit)
}
#[cfg(test)]
pub(crate) fn query_hits_by_category_after(
&self,
category: u8,
after_seq: u64,
started: bool,
limit: usize,
) -> Vec<QueryHit> {
self.query_hits_sorted_after(EntryQuery::Category(category), after_seq, started, limit)
}
#[cfg(test)]
pub(crate) fn query_hits_by_scope_after(
&self,
scope: &str,
after_seq: u64,
started: bool,
limit: usize,
) -> Vec<QueryHit> {
self.query_hits_sorted_after(EntryQuery::Scope(scope), after_seq, started, limit)
}
#[cfg(test)]
fn with_tile8<R>(&self, idx: usize, f: impl FnOnce(&Tile<8>) -> R) -> Option<R> {
match &self.inner {
ColumnarVariant::AoSoA8(lock) => lock.read().with_tile(idx, f),
ColumnarVariant::SoA(_)
| ColumnarVariant::AoSoA64(_)
| ColumnarVariant::AoSoA64Simd(_)
| ColumnarVariant::SoAoS(_) => None,
#[cfg(test)]
ColumnarVariant::AoSoA16(_) => None,
}
}
#[cfg(test)]
fn with_tile16<R>(&self, idx: usize, f: impl FnOnce(&Tile<16>) -> R) -> Option<R> {
match &self.inner {
ColumnarVariant::AoSoA16(lock) => lock.read().with_tile(idx, f),
ColumnarVariant::SoA(_)
| ColumnarVariant::AoSoA64(_)
| ColumnarVariant::AoSoA64Simd(_)
| ColumnarVariant::SoAoS(_) => None,
#[cfg(test)]
ColumnarVariant::AoSoA8(_) => None,
}
}
#[cfg(test)]
fn with_tile64<R>(&self, idx: usize, f: impl FnOnce(&Tile<64>) -> R) -> Option<R> {
match &self.inner {
ColumnarVariant::AoSoA64(lock) => lock.read().with_tile(idx, f),
ColumnarVariant::SoA(_)
| ColumnarVariant::AoSoA64Simd(_)
| ColumnarVariant::SoAoS(_) => None,
#[cfg(test)]
ColumnarVariant::AoSoA8(_) | ColumnarVariant::AoSoA16(_) => None,
}
}
pub(crate) fn clear(&self) {
match &self.inner {
ColumnarVariant::SoA(lock) => lock.write().clear(),
#[cfg(test)]
ColumnarVariant::AoSoA8(lock) => lock.write().clear(),
#[cfg(test)]
ColumnarVariant::AoSoA16(lock) => lock.write().clear(),
ColumnarVariant::AoSoA64(lock) => lock.write().clear(),
ColumnarVariant::AoSoA64Simd(lock) => lock.write().clear(),
ColumnarVariant::SoAoS(lock) => lock.write().clear(),
}
}
pub(crate) fn tile_count(&self) -> usize {
match &self.inner {
ColumnarVariant::AoSoA64(lock) => lock.read().tiles.len(),
ColumnarVariant::AoSoA64Simd(lock) => lock.read().tiles.len(),
ColumnarVariant::SoA(_) | ColumnarVariant::SoAoS(_) => 0,
#[cfg(test)]
ColumnarVariant::AoSoA8(_) | ColumnarVariant::AoSoA16(_) => 0,
}
}
}
pub(crate) struct ScanIndex {
by_fact: DashMap<EventKind, BTreeMap<ClockKey, Arc<IndexEntry>>>,
scope_entities: DashMap<Arc<str>, HashSet<Arc<str>>>,
soa: Option<ColumnarIndex>,
entity_groups: Option<ColumnarIndex>,
tiles64: Option<ColumnarIndex>,
tiles64_simd: Option<ColumnarIndex>,
}
impl ScanIndex {
pub(crate) fn for_config(config: &crate::store::IndexConfig) -> Self {
let soa = config.topology.soa_enabled();
let entity_groups = config.topology.entity_groups_enabled();
let tiles64 = config.topology.tiles64_enabled();
let tiles64_simd = config.topology.tiles64_simd_enabled();
Self {
by_fact: DashMap::new(),
scope_entities: DashMap::new(),
soa: soa.then(ColumnarIndex::new_soa),
entity_groups: entity_groups.then(ColumnarIndex::new_soaos),
tiles64: tiles64.then(ColumnarIndex::new_aosoa64),
tiles64_simd: tiles64_simd.then(ColumnarIndex::new_aosoa64_simd),
}
}
fn insert_base(&self, entry: &Arc<IndexEntry>) {
let key = ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
};
self.by_fact
.entry(entry.kind)
.or_default()
.insert(key, Arc::clone(entry));
self.scope_entities
.entry(entry.coord.scope_arc())
.or_default()
.insert(entry.coord.entity_arc());
}
pub(crate) fn insert(&self, entry: &Arc<IndexEntry>) {
self.insert_base(entry);
if let Some(idx) = &self.soa {
idx.insert(entry);
}
if let Some(idx) = &self.entity_groups {
idx.insert(entry);
}
if let Some(idx) = &self.tiles64 {
idx.insert(entry);
}
if let Some(idx) = &self.tiles64_simd {
idx.insert(entry);
}
}
pub(crate) fn rebuild_from_restore_base(
&self,
entries_by_sequence: &[Arc<IndexEntry>],
entries_by_entity: &[Arc<IndexEntry>],
routing: &RoutingSummary,
) {
self.by_fact.clear();
self.scope_entities.clear();
let mut by_fact =
std::collections::HashMap::<EventKind, BTreeMap<ClockKey, Arc<IndexEntry>>>::new();
let mut scope_entities = std::collections::HashMap::<Arc<str>, HashSet<Arc<str>>>::new();
for entry in entries_by_sequence {
let key = ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
};
by_fact
.entry(entry.kind)
.or_default()
.insert(key, Arc::clone(entry));
scope_entities
.entry(entry.coord.scope_arc())
.or_default()
.insert(entry.coord.entity_arc());
}
for (kind, map) in by_fact {
self.by_fact.insert(kind, map);
}
for (scope, entities) in scope_entities {
self.scope_entities.insert(scope, entities);
}
if let Some(idx) = &self.soa {
idx.rebuild_from_restore_base(entries_by_sequence, entries_by_entity, routing);
}
if let Some(idx) = &self.entity_groups {
idx.rebuild_from_restore_base(entries_by_sequence, entries_by_entity, routing);
}
if let Some(idx) = &self.tiles64 {
idx.rebuild_from_restore_base(entries_by_sequence, entries_by_entity, routing);
}
if let Some(idx) = &self.tiles64_simd {
idx.rebuild_from_restore_base(entries_by_sequence, entries_by_entity, routing);
}
}
pub(crate) fn scope_entity_set(&self, scope: &str) -> Option<HashSet<Arc<str>>> {
self.scope_entities.get(scope).map(|r| r.value().clone())
}
pub(crate) fn clear(&self) {
self.by_fact.clear();
self.scope_entities.clear();
if let Some(idx) = &self.soa {
idx.clear();
}
if let Some(idx) = &self.entity_groups {
idx.clear();
}
if let Some(idx) = &self.tiles64 {
idx.clear();
}
if let Some(idx) = &self.tiles64_simd {
idx.clear();
}
}
}
#[cfg(test)]
mod tests;