mod full_scan;
mod point_read;
mod recovery;
mod validate;
use crate::event::{Event, EventHeader, EventKind, HashChain};
use crate::store::cold_start::ColdStartIndexRow;
use crate::store::index::DiskPos;
use crate::store::platform::fs::StoreFs;
use crate::store::segment::{self, FramePayload};
use crate::store::{Clock, EncodedBytes, ExtensionKey, StoreError};
use dashmap::DashMap;
use parking_lot::Mutex;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{Error, ErrorKind, Read};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
const FRAME_HEADER_BYTES: usize = 8;
const MAX_BATCH_RECOVERY_ITEMS: u32 = 1_000_000;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum FrameScanTailPolicy {
FailClosed,
RecoverTornTail,
}
impl FrameScanTailPolicy {
fn can_recover_torn_tail(self) -> bool {
matches!(self, Self::RecoverTornTail)
}
}
fn read_frame_header_or_clean_eof(
reader: &mut impl Read,
) -> Result<Option<[u8; FRAME_HEADER_BYTES]>, Error> {
let mut frame_header = [0u8; FRAME_HEADER_BYTES];
match reader.read_exact(&mut frame_header) {
Ok(()) => Ok(Some(frame_header)),
Err(error) if error.kind() == ErrorKind::UnexpectedEof => Ok(None),
Err(error) => Err(error),
}
}
#[doc(hidden)]
pub struct Reader {
data_dir: PathBuf,
fd_cache: Mutex<FdCache>,
buffer_pool: Mutex<Vec<Vec<u8>>>,
sealed_maps: DashMap<u64, memmap2::Mmap>,
active_segment_id: AtomicU64,
sealed_mmap_admission: Option<crate::store::platform::mmap::SealedSegmentMmapAdmission>,
fs: Arc<dyn StoreFs>,
}
struct FdCache {
fds: HashMap<u64, File>,
order: Vec<u64>, budget: usize,
}
pub(crate) struct ScannedEntry {
pub event: Event<serde_json::Value>,
pub entity: String,
pub scope: String,
pub receipt_extensions: BTreeMap<ExtensionKey, EncodedBytes>,
pub payload_bytes: Vec<u8>,
}
pub(crate) struct ScannedIndexEntry {
pub header: EventHeader,
pub entity: String,
pub scope: String,
pub hash_chain: HashChain,
pub segment_id: u64,
pub offset: u64,
pub length: u32,
pub receipt_extensions: BTreeMap<ExtensionKey, EncodedBytes>,
pub global_sequence: Option<u64>,
}
impl ScannedIndexEntry {
pub(crate) fn from_cold_start_row(
row: &ColdStartIndexRow,
interner_strings: &[String],
) -> Result<Self, StoreError> {
let (entity, scope) = row.resolve_strings(interner_strings)?;
Ok(Self {
header: row.to_event_header(),
entity,
scope,
hash_chain: row.hash_chain.clone(),
segment_id: row.disk_pos.segment_id,
offset: row.disk_pos.offset,
length: row.disk_pos.length,
receipt_extensions: BTreeMap::new(),
global_sequence: Some(row.global_sequence),
})
}
}
pub(crate) use recovery::{BatchRecoveryState, IndexScanEvent};
impl Reader {
fn decode_frame_payload_raw(msgpack: &[u8]) -> Result<FramePayload<Vec<u8>>, StoreError> {
crate::encoding::from_bytes(msgpack).map_err(|e| StoreError::Serialization(Box::new(e)))
}
fn decode_frame_payload_value(
msgpack: &[u8],
) -> Result<FramePayload<serde_json::Value>, StoreError> {
Self::decode_frame_payload_value_with_raw_payload(msgpack).map(|(payload, _raw)| payload)
}
fn plaintext_value_from_bytes(
event_kind: EventKind,
raw_payload_bytes: &[u8],
) -> Result<serde_json::Value, StoreError> {
match event_kind {
EventKind::SYSTEM_BATCH_BEGIN | EventKind::SYSTEM_BATCH_COMMIT => {
Ok(serde_json::Value::Null)
}
_ => crate::encoding::from_bytes(raw_payload_bytes)
.map_err(|e| StoreError::Serialization(Box::new(e))),
}
}
fn value_framepayload_from_raw(
payload: FramePayload<Vec<u8>>,
decoded_payload: serde_json::Value,
) -> (FramePayload<serde_json::Value>, Vec<u8>) {
let FramePayload {
event,
entity,
scope,
receipt_extensions,
} = payload;
let Event {
header,
payload: raw_payload_bytes,
hash_chain,
} = event;
(
FramePayload {
event: Event {
header,
payload: decoded_payload,
hash_chain,
},
entity,
scope,
receipt_extensions,
},
raw_payload_bytes,
)
}
fn decode_frame_payload_value_with_raw_payload(
msgpack: &[u8],
) -> Result<(FramePayload<serde_json::Value>, Vec<u8>), StoreError> {
let payload = Self::decode_frame_payload_raw(msgpack)?;
#[cfg(feature = "payload-encryption")]
if payload.event.header.payload_encryption.is_some() {
return Err(StoreError::ser_msg(
"encrypted payload cannot be decoded on this read path without its key; use the \
key-aware read surface (Store::get / Store::get_shreddable)",
));
}
let decoded_payload = Self::plaintext_value_from_bytes(
payload.event.header.event_kind,
&payload.event.payload,
)?;
Ok(Self::value_framepayload_from_raw(payload, decoded_payload))
}
fn decode_frame_payload_value_for_compaction(
msgpack: &[u8],
) -> Result<(FramePayload<serde_json::Value>, Vec<u8>), StoreError> {
let payload = Self::decode_frame_payload_raw(msgpack)?;
#[cfg(feature = "payload-encryption")]
if payload.event.header.payload_encryption.is_some() {
return Ok(Self::value_framepayload_from_raw(
payload,
serde_json::Value::Null,
));
}
let decoded_payload = Self::plaintext_value_from_bytes(
payload.event.header.event_kind,
&payload.event.payload,
)?;
Ok(Self::value_framepayload_from_raw(payload, decoded_payload))
}
fn frame_decode_error(
segment_id: u64,
offset: u64,
error: segment::FrameDecodeError,
) -> StoreError {
match error {
segment::FrameDecodeError::CrcMismatch { .. } => {
StoreError::CrcMismatch { segment_id, offset }
}
error @ (segment::FrameDecodeError::TooShort
| segment::FrameDecodeError::Truncated { .. }) => StoreError::CorruptSegment {
segment_id,
detail: format!(
"frame at offset {offset} is corrupt after full payload read: {error}"
),
},
}
}
fn frame_decode_error_for_pos(pos: &DiskPos, error: segment::FrameDecodeError) -> StoreError {
Self::frame_decode_error(pos.segment_id, pos.offset, error)
}
pub(crate) fn new(
data_dir: PathBuf,
fd_budget: usize,
clock: &Arc<dyn Clock>,
fs: Arc<dyn StoreFs>,
) -> Self {
let sealed_mmap_admission = crate::store::platform::mmap::admit_sealed_segment_mmap(
crate::store::platform::evidence::collect_for_store_path(&data_dir, &**clock)
.store_path
.sealed_segment_mmap,
)
.ok();
Self {
data_dir,
fd_cache: Mutex::new(FdCache {
fds: HashMap::new(),
order: Vec::new(),
budget: fd_budget,
}),
buffer_pool: Mutex::new(Vec::new()),
sealed_maps: DashMap::new(),
active_segment_id: AtomicU64::new(0),
sealed_mmap_admission,
fs,
}
}
pub(crate) fn set_active_segment(&self, id: u64) {
self.active_segment_id.store(id, Ordering::Release);
}
pub(crate) fn active_segment_id(&self) -> u64 {
self.active_segment_id.load(Ordering::Acquire)
}
fn is_sealed(&self, segment_id: u64) -> bool {
segment_id < self.active_segment_id.load(Ordering::Acquire)
}
fn get_or_map_sealed(
&self,
segment_id: u64,
) -> Result<Option<dashmap::mapref::one::Ref<'_, u64, memmap2::Mmap>>, StoreError> {
if let Some(entry) = self.sealed_maps.get(&segment_id) {
return Ok(Some(entry));
}
let Some(admission) = self.sealed_mmap_admission else {
return Ok(None);
};
let path = self.data_dir.join(segment::segment_filename(segment_id));
let file = crate::store::platform::fs::open_file(&path).map_err(StoreError::Io)?;
let mmap =
unsafe { crate::store::platform::mmap::map_sealed_segment_file(&file, admission) }
.map_err(StoreError::Io)?;
self.sealed_maps.insert(segment_id, mmap);
self.sealed_maps
.get(&segment_id)
.ok_or_else(|| {
StoreError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"mmap entry missing after insert",
))
})
.map(Some)
}
fn acquire_buffer(&self, min_size: usize) -> Vec<u8> {
let mut pool = self.buffer_pool.lock();
if let Some(mut buf) = pool.pop() {
buf.clear();
buf.resize(min_size, 0);
buf
} else {
vec![0u8; min_size]
}
}
fn release_buffer(&self, buf: Vec<u8>) {
let mut pool = self.buffer_pool.lock();
if pool.len() < 16 {
pool.push(buf);
}
}
#[cfg(test)]
pub(super) fn disable_sealed_mmap_for_test(&mut self) {
self.sealed_mmap_admission = None;
}
#[cfg(test)]
pub(super) fn sealed_mmap_admitted_for_test(&self) -> bool {
self.sealed_mmap_admission.is_some()
}
pub(crate) fn evict_segment(&self, segment_id: u64) {
self.sealed_maps.remove(&segment_id);
let mut cache = self.fd_cache.lock();
cache.fds.remove(&segment_id);
cache.order.retain(|&id| id != segment_id);
}
}
#[cfg(test)]
mod tests;