use crate::error::DbError;
use crate::file_format::FORMAT_MINOR_V6;
use crate::segments::header::{SegmentType, SEGMENT_HEADER_LEN};
use crate::segments::reader::{read_segment_header_at, read_segment_payload, SegmentMeta};
use crate::storage::Store;
use crate::txn::decode_txn_payload_v0;
pub(crate) fn scan_segments_allow_tail_tear(
store: &mut impl Store,
start: u64,
) -> Result<Vec<SegmentMeta>, DbError> {
use crate::checksum::crc32c_append;
let mut out = Vec::new();
let mut offset = start;
let file_len = store.len()?;
while offset < file_len {
if file_len - offset < SEGMENT_HEADER_LEN as u64 {
break;
}
let (_, header) = read_segment_header_at(store, offset)?;
let payload_start = offset + SEGMENT_HEADER_LEN as u64;
let payload_end = payload_start + header.payload_len;
if payload_end > file_len {
break;
}
let mut remaining = header.payload_len;
let mut chunk = [0u8; 8192];
let mut cursor = payload_start;
let mut crc = 0u32;
while remaining > 0 {
let to_read = std::cmp::min(remaining as usize, chunk.len());
store.read_exact_at(cursor, &mut chunk[..to_read])?;
crc = crc32c_append(crc, &chunk[..to_read]);
cursor += to_read as u64;
remaining -= to_read as u64;
}
if header.segment_type != SegmentType::Checkpoint
&& header.segment_type != SegmentType::Temp
&& crc != header.payload_crc32c
{
break;
}
out.push(SegmentMeta { offset, header });
offset = payload_end;
}
Ok(out)
}
pub(crate) fn truncate_end_for_recovery(
store: &mut impl Store,
segment_start: u64,
format_minor: u16,
) -> Result<(u64, Option<&'static str>), DbError> {
let file_len = store.len()?;
let metas = scan_segments_allow_tail_tear(store, segment_start)?;
if format_minor < FORMAT_MINOR_V6 {
let safe = metas
.last()
.map(|m| m.offset + SEGMENT_HEADER_LEN as u64 + m.header.payload_len)
.unwrap_or(segment_start);
if safe < file_len {
return Ok((safe, Some("torn_tail_pre_v6")));
}
return Ok((file_len, None));
}
let mut safe_prefix_end = segment_start;
let mut txn_base: Option<u64> = None;
let mut pending_txn_id: Option<u64> = None;
for meta in &metas {
let e = meta.offset + SEGMENT_HEADER_LEN as u64 + meta.header.payload_len;
match meta.header.segment_type {
SegmentType::TxnBegin => {
if let Some(base) = txn_base {
return Ok((base, Some("uncommitted_transaction")));
}
let payload = read_segment_payload(store, meta)?;
let id = decode_txn_payload_v0(&payload)?;
txn_base = Some(meta.offset);
pending_txn_id = Some(id);
safe_prefix_end = meta.offset;
}
SegmentType::TxnCommit => {
let payload = read_segment_payload(store, meta)?;
let id = decode_txn_payload_v0(&payload)?;
let Some(pt) = pending_txn_id else {
return Ok((meta.offset, Some("orphan_txn_commit")));
};
if id != pt {
let truncate_at = txn_base.unwrap_or(meta.offset);
return Ok((truncate_at, Some("txn_id_mismatch")));
}
txn_base = None;
pending_txn_id = None;
safe_prefix_end = e;
}
SegmentType::TxnAbort => {
let _ = decode_txn_payload_v0(&read_segment_payload(store, meta)?)?;
txn_base = None;
pending_txn_id = None;
safe_prefix_end = e;
}
SegmentType::Manifest | SegmentType::Checkpoint | SegmentType::Temp => {
if txn_base.is_none() {
safe_prefix_end = e;
}
}
SegmentType::Schema | SegmentType::Record | SegmentType::Index => {
if txn_base.is_none() {
safe_prefix_end = e;
}
}
}
}
if let Some(base) = txn_base {
return Ok((base, Some("uncommitted_transaction")));
}
if safe_prefix_end < file_len {
return Ok((safe_prefix_end, Some("torn_tail")));
}
Ok((file_len, None))
}
#[cfg(test)]
mod tests {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/unit/src_db_recover_tests.rs"
));
}