pub(crate) mod checkpoint;
pub(crate) mod mmap;
pub mod rebuild;
pub(crate) mod row;
#[cfg(test)]
pub(crate) use row::raw_to_kind;
pub(crate) use row::{
kind_to_raw, raw_to_kind_counted, ColdStartIndexRow, ColdStartSource,
ReservedKindFallbackStats, WatermarkInfo,
};
use crate::store::{file_classification::StoreFileKind, platform, StoreError};
use std::path::{Path, PathBuf};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ColdStartArtifactKind {
MmapIndex,
Checkpoint,
}
#[derive(Debug)]
pub(crate) enum FileLoad<T> {
Missing,
Loaded(T),
Invalid {
reason: String,
},
FutureVersion {
found: u16,
supported: u16,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct ColdStartPolicy {
try_checkpoint: bool,
try_mmap_index: bool,
}
impl ColdStartPolicy {
pub(crate) fn new(enable_checkpoint: bool, enable_mmap_index: bool) -> Self {
Self {
try_checkpoint: enable_checkpoint,
try_mmap_index: enable_mmap_index,
}
}
pub(crate) fn try_checkpoint(self) -> bool {
self.try_checkpoint
}
pub(crate) fn try_mmap_index(self) -> bool {
self.try_mmap_index
}
pub(crate) fn write_target(self) -> Option<ColdStartArtifactKind> {
if self.try_mmap_index {
Some(ColdStartArtifactKind::MmapIndex)
} else if self.try_checkpoint {
Some(ColdStartArtifactKind::Checkpoint)
} else {
None
}
}
}
#[derive(Debug)]
pub(crate) enum WatermarkValidationError {
MissingSegment {
path: PathBuf,
},
OffsetPastTail {
path: PathBuf,
file_len: u64,
watermark_offset: u64,
},
}
pub(crate) fn latest_segment_watermark(data_dir: &Path) -> Result<(u64, u64), StoreError> {
let mut max: Option<(u64, PathBuf)> = None;
for entry in platform::fs::read_dir(data_dir).map_err(StoreError::Io)? {
let entry = entry.map_err(StoreError::Io)?;
let path = entry.path();
let segment_id = match StoreFileKind::from_path(&path) {
StoreFileKind::Segment(segment_id) => segment_id.as_u64(),
StoreFileKind::MalformedSegment(error) => {
tracing::warn!(
path = %path.display(),
%error,
"skipping malformed segment filename"
);
continue;
}
StoreFileKind::VisibilityRanges
| StoreFileKind::Checkpoint
| StoreFileKind::MmapIndex
| StoreFileKind::IdempotencyStore
| StoreFileKind::PendingCompactionMarker
| StoreFileKind::CompactSource
| StoreFileKind::CursorDirectory
| StoreFileKind::Keyset
| StoreFileKind::Other => continue,
};
if max
.as_ref()
.map(|(current, _)| segment_id > *current)
.unwrap_or(true)
{
max = Some((segment_id, path));
}
}
match max {
Some((segment_id, path)) => {
let offset = platform::fs::metadata(&path).map_err(StoreError::Io)?.len();
Ok((segment_id, offset))
}
None => Ok((0, 0)),
}
}
pub(crate) fn validate_watermark_segment(
data_dir: &Path,
watermark_segment_id: u64,
watermark_offset: u64,
) -> Result<(), WatermarkValidationError> {
let watermark_segment_path = data_dir.join(crate::store::segment::segment_filename(
watermark_segment_id,
));
match platform::fs::metadata(&watermark_segment_path) {
Ok(meta) if meta.len() >= watermark_offset => Ok(()),
Ok(meta) => Err(WatermarkValidationError::OffsetPastTail {
path: watermark_segment_path,
file_len: meta.len(),
watermark_offset,
}),
Err(_) => Err(WatermarkValidationError::MissingSegment {
path: watermark_segment_path,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn latest_segment_watermark_keeps_the_first_directory_entry_on_a_duplicate_id_tie() {
let dir = tempfile::TempDir::new().expect("create temp dir");
let write_segment = |name: &str, bytes: &'static [u8]| {
platform::fs::write_file_atomically(dir.path(), &dir.path().join(name), name, |file| {
use std::io::Write;
file.write_all(bytes).map_err(StoreError::Io)
})
.expect("write segment fixture through the platform seam");
};
write_segment("000001.fbat", b"aaaaa");
write_segment("7.fbat", b"xyz");
write_segment("000007.fbat", b"123456789");
let (segment_id, offset) =
latest_segment_watermark(dir.path()).expect("watermark scan succeeds");
assert_eq!(segment_id, 7, "the max id wins regardless of tie order");
let mut expected: Option<(u64, PathBuf)> = None;
for entry in platform::fs::read_dir(dir.path()).expect("read dir for the oracle") {
let entry = entry.expect("oracle dir entry");
let path = entry.path();
let StoreFileKind::Segment(id) = StoreFileKind::from_path(&path) else {
continue;
};
let id = id.as_u64();
if expected
.as_ref()
.map(|(current, _)| id > *current)
.unwrap_or(true)
{
expected = Some((id, path));
}
}
let (expected_id, expected_path) = expected.expect("oracle saw the segment files");
assert_eq!(expected_id, 7, "oracle sanity: the tie sits at the max id");
let expected_offset = platform::fs::metadata(&expected_path)
.expect("oracle metadata")
.len();
assert_eq!(
offset, expected_offset,
"on a duplicate-id tie the FIRST directory entry's length is the watermark offset"
);
}
#[test]
fn r4_validate_watermark_rejects_an_offset_past_the_segment_tail() {
let dir = tempfile::TempDir::new().expect("create temp dir");
let name = crate::store::segment::segment_filename(3);
platform::fs::write_file_atomically(dir.path(), &dir.path().join(&name), &name, |file| {
use std::io::Write;
file.write_all(b"12345").map_err(StoreError::Io)
})
.expect("write the 5-byte segment fixture through the platform seam");
validate_watermark_segment(dir.path(), 3, 5)
.expect("offset == file length is a valid watermark (`>=` is inclusive)");
validate_watermark_segment(dir.path(), 3, 0).expect("offset 0 is always within the tail");
let past_tail = validate_watermark_segment(dir.path(), 3, 6)
.expect_err("an offset one byte past the file length must be rejected");
assert!(
matches!(
&past_tail,
WatermarkValidationError::OffsetPastTail {
path,
file_len: 5,
watermark_offset: 6,
} if *path == dir.path().join(&name)
),
"exact OffsetPastTail evidence (path, file_len 5, offset 6) expected, got {past_tail:?}"
);
let missing = validate_watermark_segment(dir.path(), 9, 0)
.expect_err("a watermark naming an absent segment must be rejected");
assert!(
matches!(
&missing,
WatermarkValidationError::MissingSegment { path }
if *path == dir.path().join(crate::store::segment::segment_filename(9))
),
"exact MissingSegment evidence expected, got {missing:?}"
);
}
}