pub(crate) mod columnar;
pub(crate) mod interner;
mod projection_bridge;
mod query;
mod restore;
mod visibility;
use self::columnar::ScanIndex;
use self::interner::StringInterner;
pub(crate) use self::projection_bridge::{projection_kind_matches, ProjectionReplayPlan};
use self::restore::RestoreBase;
pub(crate) use self::restore::{recommended_restore_chunk_count, RoutingSummary};
use self::visibility::SequenceGate;
use crate::coordinate::Coordinate;
use crate::event::{EventKind, HashChain};
use crate::store::config::IndexConfig;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub(crate) struct StoreIndex {
streams: DashMap<Arc<str>, BTreeMap<ClockKey, Arc<IndexEntry>>>,
pub(crate) scan: ScanIndex,
by_id: DashMap<u128, Arc<IndexEntry>>,
latest: DashMap<Arc<str>, Arc<IndexEntry>>,
pub(crate) sequence: SequenceGate,
len: AtomicUsize,
pub(crate) interner: Arc<StringInterner>,
swap_gate: RwLock<()>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClockKey {
pub wall_ms: u64,
pub clock: u32,
pub uuid: u128,
}
#[derive(Clone, Debug)]
pub struct IndexEntry {
pub event_id: u128,
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub coord: Coordinate,
pub(crate) entity_id: self::interner::InternId,
pub(crate) scope_id: self::interner::InternId,
pub kind: EventKind,
pub wall_ms: u64,
pub clock: u32,
pub dag_lane: u32,
pub dag_depth: u32,
pub hash_chain: HashChain,
pub disk_pos: DiskPos,
pub global_sequence: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct DiskPos {
pub segment_id: u64,
pub offset: u64,
pub length: u32,
}
impl DiskPos {
pub const fn new(segment_id: u64, offset: u64, length: u32) -> Self {
Self {
segment_id,
offset,
length,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct QueryHit {
pub(crate) event_id: u128,
pub(crate) global_sequence: u64,
pub(crate) disk_pos: DiskPos,
pub(crate) kind: EventKind,
pub(crate) clock: u32,
}
impl QueryHit {
pub(crate) fn from_entry(entry: &IndexEntry) -> Self {
Self {
event_id: entry.event_id,
global_sequence: entry.global_sequence,
disk_pos: entry.disk_pos,
kind: entry.kind,
clock: entry.clock,
}
}
}
impl Ord for ClockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.wall_ms
.cmp(&other.wall_ms)
.then(self.clock.cmp(&other.clock))
.then(self.uuid.cmp(&other.uuid))
}
}
impl PartialOrd for ClockKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl IndexEntry {
pub fn is_correlated(&self) -> bool {
self.event_id != self.correlation_id
}
pub fn is_caused_by(&self, event_id: u128) -> bool {
self.causation_id == Some(event_id)
}
pub fn is_root_cause(&self) -> bool {
self.causation_id.is_none()
}
}
impl StoreIndex {
#[cfg(test)]
pub(crate) fn new() -> Self {
Self::with_config(&IndexConfig::default())
}
pub(crate) fn with_config(config: &IndexConfig) -> Self {
Self {
streams: DashMap::new(),
scan: ScanIndex::for_config(config),
by_id: DashMap::new(),
latest: DashMap::new(),
sequence: SequenceGate::new(),
len: AtomicUsize::new(0),
interner: Arc::new(StringInterner::new()),
swap_gate: RwLock::new(()),
}
}
pub(crate) fn reserve_sequences(&self, n: u64) -> u64 {
self.sequence.reserve(n)
}
pub(crate) fn insert(&self, entry: IndexEntry) {
let _read = self.swap_gate.read();
self.insert_inner(entry);
self.sequence.advance();
}
fn insert_inner(&self, entry: IndexEntry) {
let entity = entry.coord.entity_arc();
debug_assert_eq!(entry.entity_id, self.interner.intern(entry.coord.entity()));
debug_assert_eq!(entry.scope_id, self.interner.intern(entry.coord.scope()));
let key = ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
};
let arc_entry = Arc::new(entry);
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key, Arc::clone(&arc_entry));
self.scan.insert(&arc_entry);
self.by_id
.insert(arc_entry.event_id, Arc::clone(&arc_entry));
self.latest.insert(entity, arc_entry);
self.len.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn insert_batch(&self, entries: Vec<IndexEntry>) {
let _read = self.swap_gate.read();
if entries.is_empty() {
return;
}
let arc_entries: Vec<Arc<IndexEntry>> = entries.into_iter().map(Arc::new).collect();
for arc_entry in &arc_entries {
let entity = arc_entry.coord.entity_arc();
let key = ClockKey {
wall_ms: arc_entry.wall_ms,
clock: arc_entry.clock,
uuid: arc_entry.event_id,
};
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key, Arc::clone(arc_entry));
self.scan.insert(arc_entry);
self.by_id.insert(arc_entry.event_id, Arc::clone(arc_entry));
self.latest.insert(entity, Arc::clone(arc_entry));
self.len.fetch_add(1, Ordering::Relaxed);
}
}
#[allow(clippy::expect_used)]
fn restore_sorted_entries_impl(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
chunk_count: usize,
routing_hint: Option<&RoutingSummary>,
before_publish: impl FnOnce(&Self),
) -> Result<(), crate::store::StoreError> {
self.streams.clear();
self.scan.clear();
self.by_id.clear();
self.latest.clear();
self.sequence.clear();
let restored = RestoreBase::from_sorted_entries(entries, chunk_count, routing_hint);
let mut by_id =
HashMap::<u128, Arc<IndexEntry>>::with_capacity(restored.entries_by_sequence.len());
let mut latest =
HashMap::<Arc<str>, Arc<IndexEntry>>::with_capacity(restored.routing.entity_runs.len());
for run in &restored.routing.entity_runs {
let start = usize::try_from(run.start)
.expect("invariant: entity run index fits usize on any supported target");
let len = usize::try_from(run.len)
.expect("invariant: entity run length fits usize on any supported target");
let end = start
.checked_add(len)
.expect("invariant: entity run start+len fits usize on supported targets");
let slice = &restored.entries_by_entity[start..end];
if slice.is_empty() {
continue;
}
let entity = slice[0].coord.entity_arc();
let stream: BTreeMap<ClockKey, Arc<IndexEntry>> = slice
.iter()
.map(|entry| {
(
ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
},
Arc::clone(entry),
)
})
.collect();
latest.insert(Arc::clone(&entity), Arc::clone(&slice[slice.len() - 1]));
self.streams.insert(entity, stream);
}
self.scan.rebuild_from_restore_base(
&restored.entries_by_sequence,
&restored.entries_by_entity,
&restored.routing,
);
for entry in &restored.entries_by_sequence {
by_id.insert(entry.event_id, Arc::clone(entry));
}
for (event_id, entry) in by_id {
self.by_id.insert(event_id, entry);
}
for (entity, entry) in latest {
self.latest.insert(entity, entry);
}
self.len
.store(restored.entries_by_sequence.len(), Ordering::Relaxed);
before_publish(self);
let next_sequence = restored
.entries_by_sequence
.last()
.map(|entry| entry.global_sequence.saturating_add(1))
.unwrap_or(allocator_hint)
.max(allocator_hint);
self.sequence.restore_allocator(next_sequence);
self.publish(next_sequence, "restore_sorted_entries")?;
Ok(())
}
#[cfg(test)]
pub(crate) fn restore_sorted_entries(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
) -> Result<(), crate::store::StoreError> {
self.restore_sorted_entries_impl(entries, allocator_hint, 1, None, |_| {})
}
pub(crate) fn restore_sorted_entries_with_routing(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
routing: &RoutingSummary,
) -> Result<(), crate::store::StoreError> {
let chunk_count = usize::try_from(routing.chunk_count).unwrap_or(1).max(1);
self.restore_sorted_entries_impl(
entries,
allocator_hint,
chunk_count,
Some(routing),
|_| {},
)
}
#[cfg(test)]
pub(crate) fn restore_sorted_entries_with_before_publish(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
before_publish: impl FnOnce(&Self),
) -> Result<(), crate::store::StoreError> {
self.restore_sorted_entries_impl(entries, allocator_hint, 1, None, before_publish)
}
pub(crate) fn all_entries(&self) -> Vec<IndexEntry> {
self.by_id
.iter()
.map(|r| r.value().as_ref().clone())
.collect()
}
pub(crate) fn hlc_for_global_sequence(
&self,
global_sequence: u64,
) -> Option<crate::store::stats::HlcPoint> {
self.by_id
.iter()
.find(|entry| entry.value().global_sequence == global_sequence)
.map(|entry| crate::store::stats::HlcPoint {
wall_ms: entry.value().wall_ms,
global_sequence,
})
}
pub(crate) fn global_sequence(&self) -> u64 {
self.sequence.allocated()
}
pub(crate) fn visible_sequence(&self) -> u64 {
self.sequence.visible()
}
pub(crate) fn publish(
&self,
up_to: u64,
operation: &'static str,
) -> Result<(), crate::store::StoreError> {
self.sequence.publish(up_to, operation)
}
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
#[allow(clippy::expect_used)]
pub(crate) fn replace_contents_from_fresh(
&self,
fresh: StoreIndex,
) -> Result<(), crate::store::StoreError> {
let _write = self.swap_gate.write();
self.streams.clear();
self.scan.clear();
self.by_id.clear();
self.latest.clear();
self.sequence.clear();
self.len.store(0, Ordering::Relaxed);
let mut interner_full = vec![String::new()];
interner_full.extend(fresh.interner.to_snapshot());
self.interner.replace_from_full_snapshot(&interner_full);
for (entity, stream) in fresh.streams.into_iter() {
self.streams.insert(entity, stream);
}
for (id, entry) in fresh.by_id.into_iter() {
self.by_id.insert(id, entry);
}
for (entity, latest) in fresh.latest.into_iter() {
self.latest.insert(entity, latest);
}
for entry in self.by_id.iter() {
self.scan.insert(entry.value());
}
let fresh_allocated = fresh.sequence.allocated();
let fresh_visible = fresh.sequence.visible();
let fresh_cancelled = fresh.sequence.cancelled_ranges_snapshot();
self.sequence.restore_allocator(fresh_allocated);
self.sequence
.publish(fresh_visible, "replace_contents_from_fresh")?;
self.sequence.restore_cancelled_ranges(fresh_cancelled);
self.len.store(self.by_id.len(), Ordering::Relaxed);
Ok(())
}
pub(crate) fn topology_name(&self) -> &'static str {
self.scan.topology_name()
}
pub(crate) fn tile_count(&self) -> usize {
self.scan.tile_count()
}
pub(crate) fn begin_visibility_fence(&self) -> Result<u64, crate::store::StoreError> {
self.sequence.begin_fence()
}
pub(crate) fn active_visibility_fence(&self) -> Option<u64> {
self.sequence.active_fence_token()
}
pub(crate) fn finish_visibility_fence(
&self,
token: u64,
publish_to: Option<u64>,
) -> Result<(), crate::store::StoreError> {
self.sequence.finish_fence(token, publish_to)
}
pub(crate) fn note_visibility_fence_progress(
&self,
token: u64,
start: u64,
end: u64,
) -> Result<(), crate::store::StoreError> {
self.sequence.note_fence_progress(token, start, end)
}
pub(crate) fn cancel_visibility_fence(
&self,
token: u64,
) -> Result<(), crate::store::StoreError> {
self.sequence.cancel_fence(token)
}
pub(crate) fn cancelled_visibility_ranges(&self) -> Vec<(u64, u64)> {
self.sequence.cancelled_ranges_snapshot()
}
pub(crate) fn restore_cancelled_visibility_ranges(&self, ranges: Vec<(u64, u64)>) {
self.sequence.restore_cancelled_ranges(ranges);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coordinate::Region;
use crate::event::EventKind;
fn make_entry(seq: u64, entity: &str, scope: &str) -> IndexEntry {
let coord = Coordinate::new(entity, scope).expect("coord");
IndexEntry {
event_id: seq as u128 + 1,
correlation_id: seq as u128 + 1,
causation_id: None,
entity_id: self::interner::InternId::sentinel(),
scope_id: self::interner::InternId::sentinel(),
coord,
kind: EventKind::custom(0xF, 1),
wall_ms: seq,
clock: u32::try_from(seq).expect("small seq"),
dag_lane: 0,
dag_depth: 0,
hash_chain: HashChain::default(),
disk_pos: DiskPos {
segment_id: 0,
offset: seq * 16,
length: 16,
},
global_sequence: seq,
}
}
#[test]
fn bulk_restore_keeps_entries_invisible_until_publish() {
let index = StoreIndex::new();
let entity_id = index.interner.intern("entity:bulk");
let scope_id = index.interner.intern("scope:bulk");
let entries = (0..3)
.map(|seq| {
let mut entry = make_entry(seq, "entity:bulk", "scope:bulk");
entry.entity_id = entity_id;
entry.scope_id = scope_id;
entry
})
.collect();
index
.restore_sorted_entries_with_before_publish(entries, 3, |index| {
assert_eq!(
index.visible_sequence(),
0,
"visibility watermark must not advance until every view is rebuilt"
);
assert!(
index.query(&Region::all()).is_empty(),
"PROPERTY: reads must observe neither base maps nor overlays before publish"
);
})
.expect("bulk restore publish must succeed");
assert_eq!(index.query(&Region::all()).len(), 3);
assert_eq!(index.visible_sequence(), 3);
}
}