pub(crate) mod checkpoint;
pub(crate) mod mmap;
pub(crate) mod rebuild;
use crate::coordinate::Coordinate;
use crate::event::{EventHeader, HashChain};
use crate::store::index::interner::InternId;
use crate::store::index::{DiskPos, IndexEntry};
use crate::store::StoreError;
use std::path::{Path, PathBuf};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ColdStartSource {
Checkpoint,
MmapIndex,
Sidx,
}
impl ColdStartSource {
fn label(self) -> &'static str {
match self {
Self::Checkpoint => "checkpoint",
Self::MmapIndex => "mmap index",
Self::Sidx => "SIDX",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ColdStartArtifactKind {
MmapIndex,
Checkpoint,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct ColdStartPolicy {
try_checkpoint: bool,
try_mmap_index: bool,
}
impl ColdStartPolicy {
pub(crate) fn new(enable_checkpoint: bool, enable_mmap_index: bool) -> Self {
Self {
try_checkpoint: enable_checkpoint,
try_mmap_index: enable_mmap_index,
}
}
pub(crate) fn try_checkpoint(self) -> bool {
self.try_checkpoint
}
pub(crate) fn try_mmap_index(self) -> bool {
self.try_mmap_index
}
pub(crate) fn write_target(self) -> Option<ColdStartArtifactKind> {
if self.try_mmap_index {
Some(ColdStartArtifactKind::MmapIndex)
} else if self.try_checkpoint {
Some(ColdStartArtifactKind::Checkpoint)
} else {
None
}
}
}
#[derive(Debug)]
pub(crate) enum WatermarkValidationError {
MissingSegment {
path: PathBuf,
},
OffsetPastTail {
path: PathBuf,
file_len: u64,
watermark_offset: u64,
},
}
pub(crate) fn latest_segment_watermark(data_dir: &Path) -> Result<(u64, u64), StoreError> {
let mut max: Option<(u64, PathBuf)> = None;
for entry in std::fs::read_dir(data_dir).map_err(StoreError::Io)? {
let entry = entry.map_err(StoreError::Io)?;
let path = entry.path();
let is_segment = path
.extension()
.map(|ext| ext == crate::store::segment::SEGMENT_EXTENSION)
.unwrap_or(false);
if !is_segment {
continue;
}
let Some(segment_id) = path
.file_stem()
.and_then(|stem| stem.to_str())
.and_then(|stem| stem.parse::<u64>().ok())
else {
continue;
};
if max
.as_ref()
.map(|(current, _)| segment_id > *current)
.unwrap_or(true)
{
max = Some((segment_id, path));
}
}
match max {
Some((segment_id, path)) => {
let offset = std::fs::metadata(&path).map_err(StoreError::Io)?.len();
Ok((segment_id, offset))
}
None => Ok((0, 0)),
}
}
pub(crate) fn validate_watermark_segment(
data_dir: &Path,
watermark_segment_id: u64,
watermark_offset: u64,
) -> Result<(), WatermarkValidationError> {
let watermark_segment_path = data_dir.join(crate::store::segment::segment_filename(
watermark_segment_id,
));
match std::fs::metadata(&watermark_segment_path) {
Ok(meta) if meta.len() >= watermark_offset => Ok(()),
Ok(meta) => Err(WatermarkValidationError::OffsetPastTail {
path: watermark_segment_path,
file_len: meta.len(),
watermark_offset,
}),
Err(_) => Err(WatermarkValidationError::MissingSegment {
path: watermark_segment_path,
}),
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct ColdStartIndexRow {
pub(crate) source: ColdStartSource,
pub(crate) event_id: u128,
pub(crate) correlation_id: u128,
pub(crate) causation_id: Option<u128>,
pub(crate) entity_id: InternId,
pub(crate) scope_id: InternId,
pub(crate) kind: crate::event::EventKind,
pub(crate) wall_ms: u64,
pub(crate) clock: u32,
pub(crate) dag_lane: u32,
pub(crate) dag_depth: u32,
pub(crate) hash_chain: HashChain,
pub(crate) disk_pos: DiskPos,
pub(crate) global_sequence: u64,
}
impl ColdStartIndexRow {
fn resolve_part<'a>(
&self,
interner_strings: &'a [String],
id: InternId,
field: &str,
) -> Result<&'a str, StoreError> {
interner_strings
.get(id.to_usize())
.map(String::as_str)
.ok_or_else(|| {
StoreError::ser_msg(&format!(
"{} {} is out of interner range",
self.source.label(),
field
))
})
}
pub(crate) fn resolve_strings(
&self,
interner_strings: &[String],
) -> Result<(String, String), StoreError> {
Ok((
self.resolve_part(interner_strings, self.entity_id, "entity_id")?
.to_owned(),
self.resolve_part(interner_strings, self.scope_id, "scope_id")?
.to_owned(),
))
}
pub(crate) fn to_index_entry(
&self,
interner_strings: &[String],
) -> Result<IndexEntry, StoreError> {
let entity = self.resolve_part(interner_strings, self.entity_id, "entity_id")?;
let scope = self.resolve_part(interner_strings, self.scope_id, "scope_id")?;
let coord = Coordinate::new(entity, scope)?;
Ok(IndexEntry {
event_id: self.event_id,
correlation_id: self.correlation_id,
causation_id: self.causation_id,
coord,
entity_id: self.entity_id,
scope_id: self.scope_id,
kind: self.kind,
wall_ms: self.wall_ms,
clock: self.clock,
dag_lane: self.dag_lane,
dag_depth: self.dag_depth,
hash_chain: self.hash_chain.clone(),
disk_pos: self.disk_pos,
global_sequence: self.global_sequence,
})
}
pub(crate) fn to_event_header(&self) -> EventHeader {
EventHeader::new(
self.event_id,
self.correlation_id,
self.causation_id,
(self.wall_ms * 1000) as i64,
crate::coordinate::DagPosition::with_hlc(
self.wall_ms,
0,
self.dag_depth,
self.dag_lane,
self.clock,
),
0,
self.kind,
)
}
}
#[cfg(test)]
mod tests {
use super::{ColdStartIndexRow, ColdStartSource};
use crate::event::{EventKind, HashChain};
use crate::store::index::interner::InternId;
use crate::store::DiskPos;
#[test]
fn cold_start_row_to_event_header_preserves_lane_depth_and_ids() {
let row = ColdStartIndexRow {
source: ColdStartSource::Sidx,
event_id: 1,
correlation_id: 2,
causation_id: Some(3),
entity_id: InternId(1),
scope_id: InternId(2),
kind: EventKind::DATA,
wall_ms: 1_700_000_000_000,
clock: 9,
dag_lane: 4,
dag_depth: 2,
hash_chain: HashChain::default(),
disk_pos: DiskPos::new(7, 64, 32),
global_sequence: 11,
};
let header = row.to_event_header();
assert_eq!(header.event_id, 1);
assert_eq!(header.correlation_id, 2);
assert_eq!(header.causation_id, Some(3));
assert_eq!(header.timestamp_us, 1_700_000_000_000_000);
assert_eq!(header.position.wall_ms, 1_700_000_000_000);
assert_eq!(header.position.sequence, 9);
assert_eq!(header.position.lane, 4);
assert_eq!(header.position.depth, 2);
assert_eq!(header.event_kind, EventKind::DATA);
assert_eq!(header.payload_size, 0);
assert_eq!(header.flags, 0);
assert_eq!(header.content_hash, [0u8; 32]);
}
}