use crate::event::EventKind;
use crate::store::segment::scan::{Reader, ScannedIndexEntry};
use crate::store::segment::{self, sidx::SidxEntry};
use crate::store::StoreError;
use std::io::{Read, Seek, SeekFrom};
use std::path::Path;
use std::sync::atomic::Ordering;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum SidxTailCoverage {
Complete,
Incomplete,
Unreadable,
}
impl Reader {
pub(super) fn sidx_covers_segment_tail(
path: &Path,
sidx_entries: &[SidxEntry],
) -> SidxTailCoverage {
let file_len = match crate::store::platform::fs::metadata(path) {
Ok(metadata) => metadata.len(),
Err(_) => return SidxTailCoverage::Unreadable,
};
let mut file = match crate::store::platform::fs::open_file(path) {
Ok(file) => file,
Err(_) => return SidxTailCoverage::Unreadable,
};
if file_len < 16 {
return SidxTailCoverage::Incomplete;
}
if file.seek(SeekFrom::End(-16)).is_err() {
return SidxTailCoverage::Unreadable;
}
let mut trailer = [0u8; 16];
if file.read_exact(&mut trailer).is_err() {
return SidxTailCoverage::Unreadable;
}
if &trailer[12..16] != crate::store::segment::sidx::SIDX_MAGIC {
return SidxTailCoverage::Unreadable;
}
let offset_bytes: [u8; 8] = match trailer[0..8].try_into() {
Ok(bytes) => bytes,
Err(_) => return SidxTailCoverage::Unreadable,
};
let sidx_start = u64::from_le_bytes(offset_bytes);
let max_tail = sidx_entries
.iter()
.map(|e| e.frame_offset.saturating_add(u64::from(e.frame_length)))
.max()
.unwrap_or(0);
if max_tail == sidx_start {
SidxTailCoverage::Complete
} else {
SidxTailCoverage::Incomplete
}
}
pub(super) fn try_sidx_fast_path<F>(
&self,
path: &Path,
segment_id: u64,
batch_in_progress: bool,
sink: &mut F,
) -> Result<bool, StoreError>
where
F: FnMut(ScannedIndexEntry) -> Result<(), StoreError>,
{
let is_active = self.active_segment_id.load(Ordering::Acquire) == segment_id;
if is_active || batch_in_progress {
return Ok(false);
}
match segment::sidx::read_footer(path) {
Ok(Some((sidx_entries, strings)))
if Self::sidx_covers_segment_tail(path, &sidx_entries)
== SidxTailCoverage::Complete =>
{
for se in sidx_entries {
let row = se.to_cold_start_row(segment_id);
let kind = row.kind;
if kind == EventKind::SYSTEM_BATCH_BEGIN
|| kind == EventKind::SYSTEM_BATCH_COMMIT
{
continue;
}
sink(ScannedIndexEntry::from_cold_start_row(&row, &strings)?)?;
}
Ok(true)
}
Ok(Some(_)) | Ok(None) | Err(_) => Ok(false),
}
}
}