pub(crate) mod columnar;
mod entry;
pub(crate) mod idemp;
pub(crate) mod interner;
mod projection_bridge;
mod query;
mod restore;
mod visibility;
use self::columnar::ScanIndex;
pub(crate) use self::entry::{ClockKey, QueryHit};
pub use self::entry::{DiskPos, IndexEntry};
use self::interner::{InternId, StringInterner};
pub(crate) use self::projection_bridge::{
projection_kind_matches, ProjectionCacheStoreStatus, ProjectionReplayItem, ProjectionReplayPlan,
};
use self::restore::RestoreBase;
pub(crate) use self::restore::{
recommended_restore_chunk_count, restore_chunk_ranges, RoutingSummary,
};
use self::visibility::SequenceGate;
use crate::store::config::IndexConfig;
use crate::store::hidden_ranges::CancelledVisibilityRanges;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
struct LaneHeadKey {
entity_id: InternId,
lane: u32,
}
impl LaneHeadKey {
fn new(entity_id: InternId, lane: u32) -> Self {
Self { entity_id, lane }
}
}
fn lane_visible_from_entries(entries: &[Arc<IndexEntry>]) -> BTreeMap<u32, u64> {
let mut lanes = BTreeMap::new();
for entry in entries {
let visible = entry.global_sequence.saturating_add(1);
lanes
.entry(entry.dag_lane)
.and_modify(|current: &mut u64| *current = (*current).max(visible))
.or_insert(visible);
}
lanes
}
pub(crate) struct StoreIndex {
streams: DashMap<Arc<str>, BTreeMap<ClockKey, Arc<IndexEntry>>>,
pub(crate) scan: ScanIndex,
by_id: DashMap<u128, Arc<IndexEntry>>,
latest: DashMap<LaneHeadKey, Arc<IndexEntry>>,
pub(crate) sequence: SequenceGate,
len: AtomicUsize,
pub(crate) interner: Arc<StringInterner>,
swap_gate: RwLock<()>,
pub(crate) idemp: idemp::IdempotencyStore,
}
impl StoreIndex {
#[cfg(test)]
pub(crate) fn new() -> Self {
Self::with_config(&IndexConfig::default())
}
pub(crate) fn with_config(config: &IndexConfig) -> Self {
Self {
streams: DashMap::new(),
scan: ScanIndex::for_config(config),
by_id: DashMap::new(),
latest: DashMap::new(),
sequence: SequenceGate::new(),
len: AtomicUsize::new(0),
interner: Arc::new(StringInterner::new()),
swap_gate: RwLock::new(()),
idemp: idemp::IdempotencyStore::new(
config.idempotency_retention,
config.idempotency_overflow,
),
}
}
pub(crate) fn reserve_sequences(&self, n: u64) -> u64 {
self.sequence.reserve(n)
}
pub(crate) fn insert(&self, entry: IndexEntry) {
let _read = self.swap_gate.read();
self.insert_inner(entry);
self.sequence.advance();
}
fn insert_inner(&self, entry: IndexEntry) {
let lane = entry.dag_lane;
debug_assert!(self
.interner
.intern(entry.coord.entity())
.is_ok_and(|id| id == entry.entity_id));
debug_assert!(self
.interner
.intern(entry.coord.scope())
.is_ok_and(|id| id == entry.scope_id));
let key = ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
};
let arc_entry = Arc::new(entry);
let entity = arc_entry.coord.entity_arc();
let entity_id = arc_entry.entity_id;
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key, Arc::clone(&arc_entry));
self.scan.insert(&arc_entry);
self.by_id
.insert(arc_entry.event_id, Arc::clone(&arc_entry));
self.latest
.insert(LaneHeadKey::new(entity_id, lane), arc_entry);
self.len.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn insert_batch(&self, entries: Vec<IndexEntry>) {
let _read = self.swap_gate.read();
if entries.is_empty() {
return;
}
let arc_entries: Vec<Arc<IndexEntry>> = entries.into_iter().map(Arc::new).collect();
for arc_entry in &arc_entries {
let entity = arc_entry.coord.entity_arc();
let lane = arc_entry.dag_lane;
let entity_id = arc_entry.entity_id;
let key = ClockKey {
wall_ms: arc_entry.wall_ms,
clock: arc_entry.clock,
uuid: arc_entry.event_id,
};
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key, Arc::clone(arc_entry));
self.scan.insert(arc_entry);
self.by_id.insert(arc_entry.event_id, Arc::clone(arc_entry));
self.latest
.insert(LaneHeadKey::new(entity_id, lane), Arc::clone(arc_entry));
self.len.fetch_add(1, Ordering::Relaxed);
}
}
fn restore_sorted_entries_impl(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
chunk_count: usize,
routing_hint: Option<&RoutingSummary>,
before_publish: impl FnOnce(&Self),
) -> Result<(), crate::store::StoreError> {
self.streams.clear();
self.scan.clear();
self.by_id.clear();
self.latest.clear();
self.sequence.clear();
let restored = RestoreBase::from_sorted_entries(entries, chunk_count, routing_hint);
let mut by_id =
HashMap::<u128, Arc<IndexEntry>>::with_capacity(restored.entries_by_sequence.len());
let mut latest = HashMap::<LaneHeadKey, Arc<IndexEntry>>::new();
for run in &restored.routing.entity_runs {
let range = run.usize_range()?;
let slice = restored.entries_by_entity.get(range).ok_or_else(|| {
crate::store::StoreError::corrupt_segment_with_detail(
0,
"routing entity-run range out of bounds for restored entries",
)
})?;
if slice.is_empty() {
continue;
}
let entity = slice[0].coord.entity_arc();
let stream: BTreeMap<ClockKey, Arc<IndexEntry>> = slice
.iter()
.map(|entry| {
(
ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
},
Arc::clone(entry),
)
})
.collect();
for entry in slice {
latest.insert(
LaneHeadKey::new(entry.entity_id, entry.dag_lane),
Arc::clone(entry),
);
}
self.streams.insert(entity, stream);
}
self.scan.rebuild_from_restore_base(
&restored.entries_by_sequence,
&restored.entries_by_entity,
&restored.routing,
)?;
for entry in &restored.entries_by_sequence {
by_id.insert(entry.event_id, Arc::clone(entry));
}
for (event_id, entry) in by_id {
self.by_id.insert(event_id, entry);
}
for (key, entry) in latest {
self.latest.insert(key, entry);
}
self.len
.store(restored.entries_by_sequence.len(), Ordering::Relaxed);
before_publish(self);
let lane_visible = lane_visible_from_entries(&restored.entries_by_sequence);
let next_sequence = restored
.entries_by_sequence
.last()
.map(|entry| entry.global_sequence.saturating_add(1))
.unwrap_or(allocator_hint)
.max(allocator_hint);
self.sequence.restore_allocator(next_sequence);
self.publish(next_sequence, "restore_sorted_entries")?;
self.sequence.restore_lane_visible(lane_visible);
Ok(())
}
#[cfg(test)]
pub(crate) fn restore_sorted_entries(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
) -> Result<(), crate::store::StoreError> {
self.restore_sorted_entries_impl(entries, allocator_hint, 1, None, |_| {})
}
pub(crate) fn restore_sorted_entries_with_routing(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
routing: &RoutingSummary,
) -> Result<(), crate::store::StoreError> {
let chunk_count = usize::try_from(routing.chunk_count).unwrap_or(1).max(1);
self.restore_sorted_entries_impl(
entries,
allocator_hint,
chunk_count,
Some(routing),
|_| {},
)
}
#[cfg(test)]
pub(crate) fn restore_sorted_entries_with_before_publish(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
before_publish: impl FnOnce(&Self),
) -> Result<(), crate::store::StoreError> {
self.restore_sorted_entries_impl(entries, allocator_hint, 1, None, before_publish)
}
pub(crate) fn all_entries(&self) -> Vec<IndexEntry> {
self.by_id
.iter()
.map(|r| r.value().as_ref().clone())
.collect()
}
pub(crate) fn visible_entries(&self) -> Vec<IndexEntry> {
let _read_guard = self.swap_gate.read();
let visibility = self.sequence.snapshot();
self.by_id
.iter()
.filter(|r| {
let entry = r.value();
visibility.is_visible_on_lane(entry.global_sequence, entry.dag_lane)
})
.map(|r| r.value().as_ref().clone())
.collect()
}
pub(crate) fn hlc_for_global_sequence(
&self,
global_sequence: u64,
) -> Option<crate::store::stats::HlcPoint> {
self.by_id
.iter()
.find(|entry| entry.value().global_sequence == global_sequence)
.map(|entry| crate::store::stats::HlcPoint {
wall_ms: entry.value().wall_ms,
global_sequence,
})
}
pub(crate) fn global_sequence(&self) -> u64 {
self.sequence.allocated()
}
pub(crate) fn visible_sequence(&self) -> u64 {
self.sequence.visible()
}
pub(crate) fn publish(
&self,
up_to: u64,
operation: &'static str,
) -> Result<(), crate::store::StoreError> {
self.sequence.publish(up_to, operation)
}
pub(crate) fn publish_on_lanes(
&self,
global_up_to: u64,
lanes: impl IntoIterator<Item = (u32, u64)>,
operation: &'static str,
) -> Result<(), crate::store::StoreError> {
self.sequence
.publish_on_lanes(global_up_to, lanes, operation)
}
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub(crate) fn mark_idemp_evicted_against_live(&self) {
let _read = self.swap_gate.read();
self.idemp
.mark_evicted(|event_id| self.by_id.contains_key(&event_id));
}
pub(crate) fn replace_contents_from_fresh(
&self,
fresh: StoreIndex,
) -> Result<(), crate::store::StoreError> {
let _write = self.swap_gate.write();
self.streams.clear();
self.scan.clear();
self.by_id.clear();
self.latest.clear();
self.sequence.clear();
self.len.store(0, Ordering::Relaxed);
let mut interner_full = vec![String::new()];
interner_full.extend(fresh.interner.to_snapshot());
self.interner.replace_from_full_snapshot(&interner_full)?;
for (entity, stream) in fresh.streams.into_iter() {
self.streams.insert(entity, stream);
}
for (id, entry) in fresh.by_id.into_iter() {
self.by_id.insert(id, entry);
}
for (key, latest_entry) in fresh.latest.into_iter() {
self.latest.insert(key, latest_entry);
}
for entry in self.by_id.iter() {
self.scan.insert(entry.value());
}
let fresh_allocated = fresh.sequence.allocated();
let fresh_visible = fresh.sequence.visible();
let fresh_cancelled = fresh.sequence.cancelled_ranges_snapshot();
let fresh_lane_visible = fresh.sequence.lane_visible_snapshot();
self.sequence.restore_allocator(fresh_allocated);
self.sequence
.publish(fresh_visible, "replace_contents_from_fresh")?;
self.sequence.restore_lane_visible(fresh_lane_visible);
self.sequence.restore_cancelled_ranges(fresh_cancelled);
self.len.store(self.by_id.len(), Ordering::Relaxed);
Ok(())
}
pub(crate) fn topology_name(&self) -> &'static str {
self.scan.topology_name()
}
pub(crate) fn tile_count(&self) -> usize {
self.scan.tile_count()
}
pub(crate) fn begin_visibility_fence(&self) -> Result<u64, crate::store::StoreError> {
self.sequence.begin_fence()
}
pub(crate) fn active_visibility_fence(&self) -> Option<u64> {
self.sequence.active_fence_token()
}
pub(crate) fn finish_visibility_fence_on_lanes(
&self,
token: u64,
publish_to: Option<u64>,
lanes: impl IntoIterator<Item = (u32, u64)>,
) -> Result<(), crate::store::StoreError> {
self.sequence
.finish_fence_on_lanes(token, publish_to, lanes)
}
pub(crate) fn note_visibility_fence_progress(
&self,
token: u64,
start: u64,
end: u64,
) -> Result<(), crate::store::StoreError> {
self.sequence.note_fence_progress(token, start, end)
}
pub(crate) fn cancel_visibility_fence(
&self,
token: u64,
) -> Result<(), crate::store::StoreError> {
let mut lane_ranges = BTreeMap::<u32, Vec<(u64, u64)>>::new();
if let Some((start, end)) = self.sequence.active_fence_range() {
for entry in self.by_id.iter() {
let entry = entry.value();
if entry.global_sequence >= start && entry.global_sequence < end {
lane_ranges.entry(entry.dag_lane).or_default().push((
entry.global_sequence,
entry.global_sequence.saturating_add(1),
));
}
}
}
self.sequence.cancel_fence(token, lane_ranges)
}
pub(crate) fn cancelled_visibility_ranges(&self) -> CancelledVisibilityRanges {
self.sequence.cancelled_ranges_snapshot()
}
pub(crate) fn restore_cancelled_visibility_ranges(&self, ranges: CancelledVisibilityRanges) {
self.sequence.restore_cancelled_ranges(ranges);
}
}
#[cfg(test)]
mod tests;