pub(crate) mod columnar;
mod entry;
pub(crate) mod interner;
mod projection_bridge;
mod query;
mod restore;
mod visibility;
use self::columnar::ScanIndex;
pub(crate) use self::entry::{ClockKey, QueryHit};
pub use self::entry::{DiskPos, IndexEntry};
use self::interner::StringInterner;
pub(crate) use self::projection_bridge::{ProjectionCacheStoreStatus, ProjectionReplayPlan};
use self::restore::RestoreBase;
pub(crate) use self::restore::{
recommended_restore_chunk_count, restore_chunk_ranges, RoutingSummary,
};
use self::visibility::SequenceGate;
use crate::store::config::IndexConfig;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub(crate) struct StoreIndex {
streams: DashMap<Arc<str>, BTreeMap<ClockKey, Arc<IndexEntry>>>,
pub(crate) scan: ScanIndex,
by_id: DashMap<u128, Arc<IndexEntry>>,
latest: DashMap<Arc<str>, Arc<IndexEntry>>,
pub(crate) sequence: SequenceGate,
len: AtomicUsize,
pub(crate) interner: Arc<StringInterner>,
swap_gate: RwLock<()>,
}
impl StoreIndex {
#[cfg(test)]
pub(crate) fn new() -> Self {
Self::with_config(&IndexConfig::default())
}
pub(crate) fn with_config(config: &IndexConfig) -> Self {
Self {
streams: DashMap::new(),
scan: ScanIndex::for_config(config),
by_id: DashMap::new(),
latest: DashMap::new(),
sequence: SequenceGate::new(),
len: AtomicUsize::new(0),
interner: Arc::new(StringInterner::new()),
swap_gate: RwLock::new(()),
}
}
pub(crate) fn reserve_sequences(&self, n: u64) -> u64 {
self.sequence.reserve(n)
}
pub(crate) fn insert(&self, entry: IndexEntry) {
let _read = self.swap_gate.read();
self.insert_inner(entry);
self.sequence.advance();
}
fn insert_inner(&self, entry: IndexEntry) {
let entity = entry.coord.entity_arc();
debug_assert_eq!(entry.entity_id, self.interner.intern(entry.coord.entity()));
debug_assert_eq!(entry.scope_id, self.interner.intern(entry.coord.scope()));
let key = ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
};
let arc_entry = Arc::new(entry);
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key, Arc::clone(&arc_entry));
self.scan.insert(&arc_entry);
self.by_id
.insert(arc_entry.event_id, Arc::clone(&arc_entry));
self.latest.insert(entity, arc_entry);
self.len.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn insert_batch(&self, entries: Vec<IndexEntry>) {
let _read = self.swap_gate.read();
if entries.is_empty() {
return;
}
let arc_entries: Vec<Arc<IndexEntry>> = entries.into_iter().map(Arc::new).collect();
for arc_entry in &arc_entries {
let entity = arc_entry.coord.entity_arc();
let key = ClockKey {
wall_ms: arc_entry.wall_ms,
clock: arc_entry.clock,
uuid: arc_entry.event_id,
};
self.streams
.entry(Arc::clone(&entity))
.or_default()
.insert(key, Arc::clone(arc_entry));
self.scan.insert(arc_entry);
self.by_id.insert(arc_entry.event_id, Arc::clone(arc_entry));
self.latest.insert(entity, Arc::clone(arc_entry));
self.len.fetch_add(1, Ordering::Relaxed);
}
}
#[allow(clippy::expect_used)]
fn restore_sorted_entries_impl(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
chunk_count: usize,
routing_hint: Option<&RoutingSummary>,
before_publish: impl FnOnce(&Self),
) -> Result<(), crate::store::StoreError> {
self.streams.clear();
self.scan.clear();
self.by_id.clear();
self.latest.clear();
self.sequence.clear();
let restored = RestoreBase::from_sorted_entries(entries, chunk_count, routing_hint);
let mut by_id =
HashMap::<u128, Arc<IndexEntry>>::with_capacity(restored.entries_by_sequence.len());
let mut latest =
HashMap::<Arc<str>, Arc<IndexEntry>>::with_capacity(restored.routing.entity_runs.len());
for run in &restored.routing.entity_runs {
let start = usize::try_from(run.start)
.expect("invariant: entity run index fits usize on any supported target");
let len = usize::try_from(run.len)
.expect("invariant: entity run length fits usize on any supported target");
let end = start
.checked_add(len)
.expect("invariant: entity run start+len fits usize on supported targets");
let slice = &restored.entries_by_entity[start..end];
if slice.is_empty() {
continue;
}
let entity = slice[0].coord.entity_arc();
let stream: BTreeMap<ClockKey, Arc<IndexEntry>> = slice
.iter()
.map(|entry| {
(
ClockKey {
wall_ms: entry.wall_ms,
clock: entry.clock,
uuid: entry.event_id,
},
Arc::clone(entry),
)
})
.collect();
latest.insert(Arc::clone(&entity), Arc::clone(&slice[slice.len() - 1]));
self.streams.insert(entity, stream);
}
self.scan.rebuild_from_restore_base(
&restored.entries_by_sequence,
&restored.entries_by_entity,
&restored.routing,
);
for entry in &restored.entries_by_sequence {
by_id.insert(entry.event_id, Arc::clone(entry));
}
for (event_id, entry) in by_id {
self.by_id.insert(event_id, entry);
}
for (entity, entry) in latest {
self.latest.insert(entity, entry);
}
self.len
.store(restored.entries_by_sequence.len(), Ordering::Relaxed);
before_publish(self);
let next_sequence = restored
.entries_by_sequence
.last()
.map(|entry| entry.global_sequence.saturating_add(1))
.unwrap_or(allocator_hint)
.max(allocator_hint);
self.sequence.restore_allocator(next_sequence);
self.publish(next_sequence, "restore_sorted_entries")?;
Ok(())
}
#[cfg(test)]
pub(crate) fn restore_sorted_entries(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
) -> Result<(), crate::store::StoreError> {
self.restore_sorted_entries_impl(entries, allocator_hint, 1, None, |_| {})
}
pub(crate) fn restore_sorted_entries_with_routing(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
routing: &RoutingSummary,
) -> Result<(), crate::store::StoreError> {
let chunk_count = usize::try_from(routing.chunk_count).unwrap_or(1).max(1);
self.restore_sorted_entries_impl(
entries,
allocator_hint,
chunk_count,
Some(routing),
|_| {},
)
}
#[cfg(test)]
pub(crate) fn restore_sorted_entries_with_before_publish(
&self,
entries: Vec<IndexEntry>,
allocator_hint: u64,
before_publish: impl FnOnce(&Self),
) -> Result<(), crate::store::StoreError> {
self.restore_sorted_entries_impl(entries, allocator_hint, 1, None, before_publish)
}
pub(crate) fn all_entries(&self) -> Vec<IndexEntry> {
self.by_id
.iter()
.map(|r| r.value().as_ref().clone())
.collect()
}
pub(crate) fn hlc_for_global_sequence(
&self,
global_sequence: u64,
) -> Option<crate::store::stats::HlcPoint> {
self.by_id
.iter()
.find(|entry| entry.value().global_sequence == global_sequence)
.map(|entry| crate::store::stats::HlcPoint {
wall_ms: entry.value().wall_ms,
global_sequence,
})
}
pub(crate) fn global_sequence(&self) -> u64 {
self.sequence.allocated()
}
pub(crate) fn visible_sequence(&self) -> u64 {
self.sequence.visible()
}
pub(crate) fn publish(
&self,
up_to: u64,
operation: &'static str,
) -> Result<(), crate::store::StoreError> {
self.sequence.publish(up_to, operation)
}
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
#[allow(clippy::expect_used)]
pub(crate) fn replace_contents_from_fresh(
&self,
fresh: StoreIndex,
) -> Result<(), crate::store::StoreError> {
let _write = self.swap_gate.write();
self.streams.clear();
self.scan.clear();
self.by_id.clear();
self.latest.clear();
self.sequence.clear();
self.len.store(0, Ordering::Relaxed);
let mut interner_full = vec![String::new()];
interner_full.extend(fresh.interner.to_snapshot());
self.interner.replace_from_full_snapshot(&interner_full);
for (entity, stream) in fresh.streams.into_iter() {
self.streams.insert(entity, stream);
}
for (id, entry) in fresh.by_id.into_iter() {
self.by_id.insert(id, entry);
}
for (entity, latest) in fresh.latest.into_iter() {
self.latest.insert(entity, latest);
}
for entry in self.by_id.iter() {
self.scan.insert(entry.value());
}
let fresh_allocated = fresh.sequence.allocated();
let fresh_visible = fresh.sequence.visible();
let fresh_cancelled = fresh.sequence.cancelled_ranges_snapshot();
self.sequence.restore_allocator(fresh_allocated);
self.sequence
.publish(fresh_visible, "replace_contents_from_fresh")?;
self.sequence.restore_cancelled_ranges(fresh_cancelled);
self.len.store(self.by_id.len(), Ordering::Relaxed);
Ok(())
}
pub(crate) fn topology_name(&self) -> &'static str {
self.scan.topology_name()
}
pub(crate) fn tile_count(&self) -> usize {
self.scan.tile_count()
}
pub(crate) fn begin_visibility_fence(&self) -> Result<u64, crate::store::StoreError> {
self.sequence.begin_fence()
}
pub(crate) fn active_visibility_fence(&self) -> Option<u64> {
self.sequence.active_fence_token()
}
pub(crate) fn finish_visibility_fence(
&self,
token: u64,
publish_to: Option<u64>,
) -> Result<(), crate::store::StoreError> {
self.sequence.finish_fence(token, publish_to)
}
pub(crate) fn note_visibility_fence_progress(
&self,
token: u64,
start: u64,
end: u64,
) -> Result<(), crate::store::StoreError> {
self.sequence.note_fence_progress(token, start, end)
}
pub(crate) fn cancel_visibility_fence(
&self,
token: u64,
) -> Result<(), crate::store::StoreError> {
self.sequence.cancel_fence(token)
}
pub(crate) fn cancelled_visibility_ranges(&self) -> Vec<(u64, u64)> {
self.sequence.cancelled_ranges_snapshot()
}
pub(crate) fn restore_cancelled_visibility_ranges(&self, ranges: Vec<(u64, u64)>) {
self.sequence.restore_cancelled_ranges(ranges);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coordinate::{Coordinate, Region};
use crate::event::{EventKind, HashChain};
fn make_entry(seq: u64, entity: &str, scope: &str) -> IndexEntry {
let coord = Coordinate::new(entity, scope).expect("coord");
IndexEntry {
event_id: seq as u128 + 1,
correlation_id: seq as u128 + 1,
causation_id: None,
entity_id: self::interner::InternId::sentinel(),
scope_id: self::interner::InternId::sentinel(),
coord,
kind: EventKind::custom(0xF, 1),
wall_ms: seq,
clock: u32::try_from(seq).expect("small seq"),
dag_lane: 0,
dag_depth: 0,
hash_chain: HashChain::default(),
disk_pos: DiskPos {
segment_id: 0,
offset: seq * 16,
length: 16,
},
global_sequence: seq,
receipt_extensions: BTreeMap::new(),
}
}
#[test]
fn clock_key_orders_by_wall_then_clock_then_uuid() {
let mut keys = [
ClockKey {
wall_ms: 10,
clock: 3,
uuid: 9,
},
ClockKey {
wall_ms: 9,
clock: 99,
uuid: 1,
},
ClockKey {
wall_ms: 10,
clock: 2,
uuid: 99,
},
ClockKey {
wall_ms: 10,
clock: 3,
uuid: 4,
},
];
keys.sort();
assert_eq!(
keys,
[
ClockKey {
wall_ms: 9,
clock: 99,
uuid: 1,
},
ClockKey {
wall_ms: 10,
clock: 2,
uuid: 99,
},
ClockKey {
wall_ms: 10,
clock: 3,
uuid: 4,
},
ClockKey {
wall_ms: 10,
clock: 3,
uuid: 9,
},
],
"PROPERTY: ClockKey ordering must be wall_ms first, then clock, then uuid as the deterministic tiebreaker"
);
}
#[test]
fn bulk_restore_keeps_entries_invisible_until_publish() {
let index = StoreIndex::new();
let entity_id = index.interner.intern("entity:bulk");
let scope_id = index.interner.intern("scope:bulk");
let entries = (0..3)
.map(|seq| {
let mut entry = make_entry(seq, "entity:bulk", "scope:bulk");
entry.entity_id = entity_id;
entry.scope_id = scope_id;
entry
})
.collect();
index
.restore_sorted_entries_with_before_publish(entries, 3, |index| {
assert_eq!(
index.visible_sequence(),
0,
"visibility watermark must not advance until every view is rebuilt"
);
assert!(
index.query(&Region::all()).is_empty(),
"PROPERTY: reads must observe neither base maps nor overlays before publish"
);
})
.expect("bulk restore publish must succeed");
assert_eq!(index.query(&Region::all()).len(), 3);
assert_eq!(index.visible_sequence(), 3);
}
#[test]
fn upgrade_with_visibility_snapshot_rejects_cancelled_ranges() {
let index = StoreIndex::new();
let entity_id = index.interner.intern("entity:visibility");
let scope_id = index.interner.intern("scope:visibility");
for seq in 0..3 {
let mut entry = make_entry(seq, "entity:visibility", "scope:visibility");
entry.entity_id = entity_id;
entry.scope_id = scope_id;
index.insert(entry);
}
index
.publish(3, "test-publish")
.expect("publish test entries");
index.restore_cancelled_visibility_ranges(vec![(1, 2)]);
let hidden = QueryHit {
event_id: 2,
global_sequence: 1,
disk_pos: DiskPos::new(0, 16, 16),
kind: EventKind::custom(0xF, 1),
clock: 1,
};
let (hits, visibility) = index.query_hits_with_snapshot(&Region::all());
assert_eq!(
hits.iter()
.map(|hit| hit.global_sequence)
.collect::<Vec<_>>(),
vec![0, 2],
"PROPERTY: query-hit collection must skip cancelled hidden ranges below the visible watermark"
);
assert!(
index
.upgrade_hit_with_visibility(hidden, &visibility)
.is_none(),
"PROPERTY: hit upgrade must use the same hidden-range visibility predicate as query collection"
);
}
}