use crate::coordinate::Coordinate;
use crate::store::index::{DiskPos, IndexEntry, StoreIndex};
use crate::store::reader::Reader;
use crate::store::segment;
use crate::store::StoreError;
use std::path::Path;
pub(crate) fn open_index(
index: &StoreIndex,
reader: &Reader,
data_dir: &Path,
enable_checkpoint: bool,
) -> Result<(), StoreError> {
if enable_checkpoint {
if let Some((entries, interner_strings, watermark, stored_allocator)) =
crate::store::checkpoint::try_load_checkpoint(data_dir)
{
tracing::info!(
"checkpoint v2 loaded: {} entries, {} interner strings, watermark segment {} offset {}, allocator {}",
entries.len(),
interner_strings.len(),
watermark.watermark_segment_id,
watermark.watermark_offset,
stored_allocator,
);
crate::store::checkpoint::restore_from_checkpoint(
index,
entries,
&interner_strings,
stored_allocator,
)?;
replay_tail_segments(index, reader, data_dir, &watermark)?;
return Ok(());
}
tracing::debug!("no valid checkpoint, performing full index rebuild");
}
rebuild_from_segments(index, reader, data_dir)
}
fn entry_from_scan(
index: &StoreIndex,
cursor: &mut crate::store::index::ReplayCursor<'_>,
se: crate::store::reader::ScannedIndexEntry,
) -> Result<IndexEntry, StoreError> {
let coord = Coordinate::new(&se.entity, &se.scope)?;
let entity_id = index.interner.intern(&se.entity);
let scope_id = index.interner.intern(&se.scope);
let clock = se.header.position.sequence;
let global_sequence = se
.global_sequence
.unwrap_or_else(|| cursor.synthesize_next());
Ok(IndexEntry {
event_id: se.header.event_id,
correlation_id: se.header.correlation_id,
causation_id: se.header.causation_id,
coord,
entity_id,
scope_id,
kind: se.header.event_kind,
wall_ms: se.header.position.wall_ms,
clock,
hash_chain: se.hash_chain,
disk_pos: DiskPos {
segment_id: se.segment_id,
offset: se.offset,
length: se.length,
},
global_sequence,
})
}
fn replay_tail_segments(
index: &StoreIndex,
reader: &Reader,
data_dir: &Path,
watermark: &crate::store::checkpoint::WatermarkInfo,
) -> Result<(), StoreError> {
let mut entries: Vec<std::fs::DirEntry> = std::fs::read_dir(data_dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
})
.collect();
entries.sort_by_key(|e| e.file_name());
let mut batch_state = crate::store::reader::BatchRecoveryState::default();
let mut cursor = index.begin_replay();
let allocator_floor = index.global_sequence();
let scan_result = (|| -> Result<(), StoreError> {
for dir_entry in &entries {
let seg_id = dir_entry
.path()
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
if seg_id < watermark.watermark_segment_id {
continue; }
reader.scan_segment_index_into(&dir_entry.path(), Some(&mut batch_state), |se| {
if seg_id == watermark.watermark_segment_id
&& se.offset < watermark.watermark_offset
{
return Ok(());
}
let entry = entry_from_scan(index, &mut cursor, se)?;
cursor.insert(entry);
Ok(())
})?;
}
Ok(())
})();
match scan_result {
Ok(()) => {
cursor.commit(allocator_floor);
Ok(())
}
Err(e) => {
cursor.abort();
Err(e)
}
}
}
pub(crate) fn rebuild_from_segments(
index: &StoreIndex,
reader: &Reader,
data_dir: &Path,
) -> Result<(), StoreError> {
let mut entries: Vec<std::fs::DirEntry> = std::fs::read_dir(data_dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
})
.collect();
entries.sort_by_key(|e| e.file_name());
let mut batch_state = crate::store::reader::BatchRecoveryState::default();
let mut cursor = index.begin_replay();
let scan_result = (|| -> Result<(), StoreError> {
for dir_entry in &entries {
reader.scan_segment_index_into(&dir_entry.path(), Some(&mut batch_state), |se| {
let entry = entry_from_scan(index, &mut cursor, se)?;
cursor.insert(entry);
Ok(())
})?;
}
Ok(())
})();
match scan_result {
Ok(()) => {
cursor.commit(0);
Ok(())
}
Err(e) => {
cursor.abort();
Err(e)
}
}
}