use crate::coordinate::Coordinate;
use crate::store::cold_start::{ColdStartPolicy, ReservedKindFallbackStats, WatermarkInfo};
use crate::store::index::interner::StringInterner;
use crate::store::index::{DiskPos, IndexEntry, RoutingSummary, StoreIndex};
use crate::store::segment::scan::{FrameScanTailPolicy, Reader, ScannedIndexEntry};
use crate::store::StoreError;
use rayon::prelude::*;
use std::path::Path;
mod load_status;
mod report;
mod topology;
pub use load_status::OpenIndexLoadStatus;
use load_status::SnapshotLoadDiagnostics;
pub use report::{OpenIndexPath, OpenIndexReport};
pub(crate) use topology::{
clear_pending_compaction, write_pending_compaction, COMPACTION_MARKER_FILENAME,
};
use topology::{load_pending_compaction, segment_paths};
#[derive(Debug, Clone)]
pub(crate) struct OpenIndexOutcome {
pub(crate) report: OpenIndexReport,
pub(crate) cumulative_reserved_kind_fallbacks: ReservedKindFallbackStats,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RestoreSource {
Mmap,
Checkpoint,
SealedSidxRebuild,
FrameScanFallback,
}
struct RestorePlan {
source: RestoreSource,
entries: Vec<IndexEntry>,
interner_strings: Vec<String>,
allocator_hint: u64,
routing: RoutingSummary,
restored_entries: usize,
tail_entries: usize,
reopen_reserved_kind_fallbacks: ReservedKindFallbackStats,
persisted_cumulative_reserved_kind_fallbacks: ReservedKindFallbackStats,
snapshot_loads: SnapshotLoadDiagnostics,
}
struct SnapshotPlanInput {
entries: Vec<IndexEntry>,
interner_strings: Vec<String>,
watermark: WatermarkInfo,
stored_allocator: u64,
routing: RoutingSummary,
reopen_reserved_kind_fallbacks: ReservedKindFallbackStats,
persisted_cumulative_reserved_kind_fallbacks: ReservedKindFallbackStats,
receipt_extensions_hydrated: bool,
snapshot_loads: SnapshotLoadDiagnostics,
}
struct RestorePlanner<'a> {
reader: &'a Reader,
data_dir: &'a Path,
policy: ColdStartPolicy,
clock: &'a dyn crate::store::Clock,
}
impl<'a> RestorePlanner<'a> {
fn build(&self) -> Result<RestorePlan, StoreError> {
let has_pending_compaction = load_pending_compaction(self.data_dir)?.is_some();
let mut snapshot_loads = SnapshotLoadDiagnostics::default();
if !has_pending_compaction && self.policy.try_mmap_index() {
let mmap_load = super::mmap::load_mmap_snapshot(self.data_dir, self.clock);
snapshot_loads.record_mmap(&mmap_load);
if let super::FileLoad::Loaded(snapshot) = mmap_load {
return self.build_snapshot_plan(
RestoreSource::Mmap,
SnapshotPlanInput {
entries: snapshot.entries,
interner_strings: snapshot.interner_strings,
watermark: snapshot.watermark,
stored_allocator: snapshot.stored_allocator,
routing: snapshot.routing,
reopen_reserved_kind_fallbacks: snapshot.reopen_reserved_kind_fallbacks,
persisted_cumulative_reserved_kind_fallbacks: snapshot
.cumulative_reserved_kind_fallbacks,
receipt_extensions_hydrated: snapshot.receipt_extensions_hydrated,
snapshot_loads,
},
);
}
}
if !has_pending_compaction && self.policy.try_checkpoint() {
let checkpoint_load = super::checkpoint::load_checkpoint_snapshot(self.data_dir);
snapshot_loads.record_checkpoint(&checkpoint_load);
if let super::FileLoad::Loaded(snapshot) = checkpoint_load {
return self.build_snapshot_plan(
RestoreSource::Checkpoint,
SnapshotPlanInput {
entries: snapshot.entries,
interner_strings: snapshot.interner_strings,
watermark: snapshot.watermark,
stored_allocator: snapshot.stored_allocator,
routing: snapshot.routing,
reopen_reserved_kind_fallbacks: ReservedKindFallbackStats::default(),
persisted_cumulative_reserved_kind_fallbacks: snapshot
.cumulative_reserved_kind_fallbacks,
receipt_extensions_hydrated: snapshot.receipt_extensions_hydrated,
snapshot_loads,
},
);
}
}
let (
source,
entries,
interner_strings,
allocator_hint,
chunk_count,
reopen_reserved_kind_fallbacks,
) = collect_rebuild_entries(self.reader, self.data_dir)?;
let routing = RoutingSummary::from_sorted_entries(&entries, chunk_count.max(1));
Ok(RestorePlan {
source,
restored_entries: entries.len(),
tail_entries: 0,
allocator_hint,
interner_strings,
routing,
entries,
reopen_reserved_kind_fallbacks,
persisted_cumulative_reserved_kind_fallbacks: ReservedKindFallbackStats::default(),
snapshot_loads,
})
}
#[allow(clippy::needless_pass_by_value)]
fn build_snapshot_plan(
&self,
source: RestoreSource,
mut snapshot: SnapshotPlanInput,
) -> Result<RestorePlan, StoreError> {
if !snapshot.receipt_extensions_hydrated {
hydrate_receipt_extensions(self.reader, &mut snapshot.entries)?;
}
let interner = StringInterner::new();
interner.replace_from_full_snapshot(&snapshot.interner_strings);
let tail_entries = collect_tail_entries(
&interner,
self.reader,
self.data_dir,
&snapshot.watermark,
snapshot.stored_allocator,
)?;
let restored_entries = snapshot.entries.len();
let tail_count = tail_entries.len();
snapshot.entries.extend(tail_entries);
snapshot.entries.sort_by_key(|entry| entry.global_sequence);
let chunk_count = usize::try_from(snapshot.routing.chunk_count)
.unwrap_or(1)
.max(1)
+ usize::from(tail_count > 0);
let routing = RoutingSummary::from_sorted_entries(&snapshot.entries, chunk_count);
Ok(RestorePlan {
source,
allocator_hint: snapshot.stored_allocator.max(
snapshot
.entries
.last()
.map(|entry| entry.global_sequence.saturating_add(1))
.unwrap_or(0),
),
interner_strings: full_interner_snapshot(&interner),
routing,
entries: snapshot.entries,
restored_entries,
tail_entries: tail_count,
reopen_reserved_kind_fallbacks: snapshot.reopen_reserved_kind_fallbacks,
persisted_cumulative_reserved_kind_fallbacks: snapshot
.persisted_cumulative_reserved_kind_fallbacks,
snapshot_loads: snapshot.snapshot_loads,
})
}
}
pub(crate) fn open_index(
index: &StoreIndex,
reader: &Reader,
data_dir: &Path,
policy: ColdStartPolicy,
clock: &dyn crate::store::Clock,
) -> Result<OpenIndexOutcome, StoreError> {
let t0 = clock.now_mono_ns();
let planner = RestorePlanner {
reader,
data_dir,
policy,
clock,
};
let t_plan = clock.now_mono_ns();
let plan = planner.build()?;
let phase_plan_build_us = elapsed_us(clock, t_plan);
let t_interner = clock.now_mono_ns();
index
.interner
.replace_from_full_snapshot(&plan.interner_strings);
let phase_interner_us = elapsed_us(clock, t_interner);
let t_restore = clock.now_mono_ns();
index.restore_sorted_entries_with_routing(plan.entries, plan.allocator_hint, &plan.routing)?;
let phase_restore_index_us = elapsed_us(clock, t_restore);
let t_hidden = clock.now_mono_ns();
if let Some(ranges) = crate::store::hidden_ranges::load_cancelled_ranges(data_dir)? {
index.restore_cancelled_visibility_ranges(ranges);
}
let phase_hidden_ranges_us = elapsed_us(clock, t_hidden);
let cumulative_reserved_kind_fallbacks = plan
.persisted_cumulative_reserved_kind_fallbacks
.add(&plan.reopen_reserved_kind_fallbacks);
Ok(OpenIndexOutcome {
report: OpenIndexReport {
path: match plan.source {
RestoreSource::Mmap => OpenIndexPath::Mmap,
RestoreSource::Checkpoint => OpenIndexPath::Checkpoint,
RestoreSource::SealedSidxRebuild | RestoreSource::FrameScanFallback => {
OpenIndexPath::Rebuild
}
},
restored_entries: plan.restored_entries,
tail_entries: plan.tail_entries,
elapsed_us: elapsed_us(clock, t0),
phase_plan_build_us,
phase_interner_us,
phase_restore_index_us,
phase_hidden_ranges_us,
mmap_load_status: plan.snapshot_loads.mmap_status,
mmap_invalid_reason: plan.snapshot_loads.mmap_invalid_reason.clone(),
checkpoint_load_status: plan.snapshot_loads.checkpoint_status,
checkpoint_invalid_reason: plan.snapshot_loads.checkpoint_invalid_reason.clone(),
unknown_reserved_system_kind_fallbacks: plan.reopen_reserved_kind_fallbacks.system,
unknown_reserved_system_kind_histogram: plan
.reopen_reserved_kind_fallbacks
.system_histogram
.clone(),
unknown_reserved_effect_kind_fallbacks: plan.reopen_reserved_kind_fallbacks.effect,
unknown_reserved_effect_kind_histogram: plan
.reopen_reserved_kind_fallbacks
.effect_histogram
.clone(),
cumulative_unknown_reserved_system_kind_fallbacks: cumulative_reserved_kind_fallbacks
.system,
cumulative_unknown_reserved_system_kind_histogram: cumulative_reserved_kind_fallbacks
.system_histogram
.clone(),
cumulative_unknown_reserved_effect_kind_fallbacks: cumulative_reserved_kind_fallbacks
.effect,
cumulative_unknown_reserved_effect_kind_histogram: cumulative_reserved_kind_fallbacks
.effect_histogram
.clone(),
},
cumulative_reserved_kind_fallbacks,
})
}
fn elapsed_us(clock: &dyn crate::store::Clock, start_ns: i64) -> u64 {
u64::try_from(clock.now_mono_ns().saturating_sub(start_ns).max(0) / 1_000).unwrap_or(u64::MAX)
}
fn read_sealed_sidx_entries_parallel(
reader: &Reader,
sealed_segments: &[(u64, std::path::PathBuf)],
) -> Option<(Vec<ScannedIndexEntry>, ReservedKindFallbackStats)> {
let per_segment: Result<Vec<_>, StoreError> = sealed_segments
.par_iter()
.map(|(segment_id, path)| scanned_entries_from_sidx_footer(reader, *segment_id, path))
.collect();
match per_segment {
Ok(mut batches) => {
let mut flat = Vec::new();
let mut reserved_kind_fallbacks = ReservedKindFallbackStats::default();
for (batch, counts) in batches.drain(..) {
flat.extend(batch);
reserved_kind_fallbacks = reserved_kind_fallbacks.add(&counts);
}
flat.sort_by_key(|entry| entry.global_sequence.unwrap_or(0));
Some((flat, reserved_kind_fallbacks))
}
Err(error) => {
tracing::warn!(
target: "batpak::rebuild",
error = %error,
"parallel SIDX rebuild unavailable; falling back to sequential scan"
);
None
}
}
}
fn scanned_entries_from_sidx_footer(
reader: &Reader,
segment_id: u64,
path: &Path,
) -> Result<(Vec<ScannedIndexEntry>, ReservedKindFallbackStats), StoreError> {
match crate::store::segment::sidx::read_footer(path) {
Ok(Some((entries, strings))) => {
let mut scanned = Vec::with_capacity(entries.len());
let mut reserved_kind_fallbacks = ReservedKindFallbackStats::default();
for entry in entries {
let row = entry.to_cold_start_row_counted(segment_id, &mut reserved_kind_fallbacks);
let kind = row.kind;
if kind == crate::event::EventKind::SYSTEM_BATCH_BEGIN
|| kind == crate::event::EventKind::SYSTEM_BATCH_COMMIT
{
continue;
}
let mut scanned_entry = ScannedIndexEntry::from_cold_start_row(&row, &strings)?;
scanned_entry.receipt_extensions = reader.read_receipt_extensions(&row.disk_pos)?;
scanned.push(scanned_entry);
}
Ok((scanned, reserved_kind_fallbacks))
}
Ok(None) => Err(StoreError::ser_msg(
"sealed segment missing SIDX footer during parallel rebuild",
)),
Err(error) => Err(error),
}
}
fn full_interner_snapshot(interner: &StringInterner) -> Vec<String> {
let mut snapshot = vec![String::new()];
snapshot.extend(interner.to_snapshot());
snapshot
}
#[cfg(test)]
fn read_sealed_sidx_entries_sequential(
reader: &Reader,
sealed_segments: &[(u64, std::path::PathBuf)],
) -> Result<Vec<ScannedIndexEntry>, StoreError> {
let mut flat = Vec::new();
for (segment_id, path) in sealed_segments {
flat.extend(scanned_entries_from_sidx_footer(reader, *segment_id, path)?.0);
}
flat.sort_by_key(|entry| entry.global_sequence.unwrap_or(0));
Ok(flat)
}
fn hydrate_receipt_extensions(
reader: &Reader,
entries: &mut [IndexEntry],
) -> Result<(), StoreError> {
for entry in entries {
entry.receipt_extensions = reader.read_receipt_extensions(&entry.disk_pos)?;
}
Ok(())
}
#[derive(Default)]
struct SequenceTracker {
max_seen: u64,
inserted_any: bool,
}
impl SequenceTracker {
fn synthesize_next(&self) -> u64 {
if self.inserted_any {
self.max_seen.saturating_add(1)
} else {
0
}
}
fn note_seen(&mut self, global_sequence: u64) {
self.max_seen = self.max_seen.max(global_sequence);
self.inserted_any = true;
}
}
fn entry_from_scan(
interner: &StringInterner,
se: ScannedIndexEntry,
global_sequence: u64,
) -> Result<IndexEntry, StoreError> {
let coord = Coordinate::new(&se.entity, &se.scope)?;
let entity_id = interner.intern(&se.entity);
let scope_id = interner.intern(&se.scope);
let clock = se.header.position.sequence;
use crate::id::EntityIdType;
Ok(IndexEntry {
event_id: se.header.event_id.as_u128(),
correlation_id: se.header.correlation_id.as_u128(),
causation_id: se
.header
.causation_id
.map(|id| id.as_u128())
.filter(|&id| id != 0),
coord,
entity_id,
scope_id,
kind: se.header.event_kind,
wall_ms: se.header.position.wall_ms,
clock,
dag_lane: se.header.position.lane,
dag_depth: se.header.position.depth,
hash_chain: se.hash_chain,
disk_pos: DiskPos::new(se.segment_id, se.offset, se.length),
global_sequence,
receipt_extensions: se.receipt_extensions,
})
}
fn collect_tail_entries(
interner: &StringInterner,
reader: &Reader,
data_dir: &Path,
watermark: &WatermarkInfo,
allocator_floor: u64,
) -> Result<Vec<IndexEntry>, StoreError> {
let entries = segment_paths(data_dir)?;
let recoverable_tail_segment_id = entries.last().map(|(segment_id, _)| *segment_id);
let mut batch_state = crate::store::segment::scan::BatchRecoveryState::default();
let mut tracker = SequenceTracker {
max_seen: allocator_floor.saturating_sub(1),
inserted_any: allocator_floor > 0,
};
let mut rebuilt_entries = Vec::new();
for (seg_id, path) in &entries {
if *seg_id < watermark.watermark_segment_id {
continue;
}
let tail_policy = if Some(*seg_id) == recoverable_tail_segment_id {
FrameScanTailPolicy::RecoverTornTail
} else {
FrameScanTailPolicy::FailClosed
};
reader.scan_segment_index_into_with_tail_policy(
path,
Some(&mut batch_state),
tail_policy,
|se| {
if *seg_id == watermark.watermark_segment_id
&& se.offset < watermark.watermark_offset
{
return Ok(());
}
let global_sequence = se
.global_sequence
.unwrap_or_else(|| tracker.synthesize_next());
let entry = entry_from_scan(interner, se, global_sequence)?;
tracker.note_seen(global_sequence);
rebuilt_entries.push(entry);
Ok(())
},
)?;
}
Ok(rebuilt_entries)
}
type RebuildResult = (
RestoreSource,
Vec<IndexEntry>,
Vec<String>,
u64,
usize,
ReservedKindFallbackStats,
);
fn collect_rebuild_entries(reader: &Reader, data_dir: &Path) -> Result<RebuildResult, StoreError> {
let entries = segment_paths(data_dir)?;
let recoverable_tail_segment_id = entries.last().map(|(segment_id, _)| *segment_id);
let configured_active_segment = reader.active_segment_id();
let active_segment_id = (configured_active_segment != 0).then_some(configured_active_segment);
let interner = StringInterner::new();
let mut rebuilt_entries = Vec::new();
let mut tracker = SequenceTracker::default();
let sealed_segments: Vec<_> = entries
.iter()
.filter(|(segment_id, _)| active_segment_id.is_none_or(|active| *segment_id < active))
.cloned()
.collect();
let mut source = RestoreSource::SealedSidxRebuild;
let mut chunk_count = sealed_segments.len().max(1);
let mut reserved_kind_fallbacks = ReservedKindFallbackStats::default();
if !sealed_segments.is_empty() {
if let Some((scanned, counts)) = read_sealed_sidx_entries_parallel(reader, &sealed_segments)
{
reserved_kind_fallbacks = reserved_kind_fallbacks.add(&counts);
for se in scanned {
let global_sequence = se
.global_sequence
.unwrap_or_else(|| tracker.synthesize_next());
let entry = entry_from_scan(&interner, se, global_sequence)?;
tracker.note_seen(global_sequence);
rebuilt_entries.push(entry);
}
} else {
source = RestoreSource::FrameScanFallback;
chunk_count = 1;
let mut batch_state = crate::store::segment::scan::BatchRecoveryState::default();
for (segment_id, path) in &sealed_segments {
let tail_policy = if Some(*segment_id) == recoverable_tail_segment_id {
FrameScanTailPolicy::RecoverTornTail
} else {
FrameScanTailPolicy::FailClosed
};
reader.scan_segment_index_into_with_tail_policy(
path,
Some(&mut batch_state),
tail_policy,
|se| {
let global_sequence = se
.global_sequence
.unwrap_or_else(|| tracker.synthesize_next());
let entry = entry_from_scan(&interner, se, global_sequence)?;
tracker.note_seen(global_sequence);
rebuilt_entries.push(entry);
Ok(())
},
)?;
}
}
} else {
source = RestoreSource::FrameScanFallback;
}
let mut batch_state = crate::store::segment::scan::BatchRecoveryState::default();
for (segment_id, path) in &entries {
if Some(*segment_id) != active_segment_id {
continue;
}
reader.scan_segment_index_into_with_tail_policy(
path,
Some(&mut batch_state),
FrameScanTailPolicy::RecoverTornTail,
|se| {
let global_sequence = se
.global_sequence
.unwrap_or_else(|| tracker.synthesize_next());
let entry = entry_from_scan(&interner, se, global_sequence)?;
tracker.note_seen(global_sequence);
rebuilt_entries.push(entry);
Ok(())
},
)?;
}
rebuilt_entries.sort_by_key(|entry| entry.global_sequence);
let allocator_hint = if tracker.inserted_any {
tracker.max_seen.saturating_add(1)
} else {
0
};
Ok((
source,
rebuilt_entries,
full_interner_snapshot(&interner),
allocator_hint,
chunk_count,
reserved_kind_fallbacks,
))
}
pub(crate) fn rebuild_from_segments(
index: &StoreIndex,
reader: &Reader,
data_dir: &Path,
) -> Result<(), StoreError> {
let (_, entries, interner_strings, allocator_hint, chunk_count, _) =
collect_rebuild_entries(reader, data_dir)?;
index.interner.replace_from_full_snapshot(&interner_strings);
let routing = RoutingSummary::from_sorted_entries(&entries, chunk_count.max(1));
index.restore_sorted_entries_with_routing(entries, allocator_hint, &routing)?;
Ok(())
}
#[cfg(test)]
mod tests;