use std::collections::HashMap;
use std::io::{Seek, Write};
use crate::constants::SECTOR_SIZE;
use crate::error::{Error, Result};
use crate::log::{Descriptor, Entry, Log};
use crate::medium::write_all_at;
use crate::types::Guid;
#[derive(Debug)]
pub struct ActiveSequence<'a> {
entries: Vec<LocatedEntry<'a>>,
flushed_file_offset: u64,
last_file_offset: u64,
}
#[derive(Debug)]
pub struct LocatedEntry<'a> {
pub(super) entry: Entry<'a>,
_offset: usize,
}
impl<'a> ActiveSequence<'a> {
pub fn entries(&self) -> &[LocatedEntry<'a>] {
&self.entries
}
pub fn flushed_file_offset(&self) -> u64 {
self.flushed_file_offset
}
pub fn last_file_offset(&self) -> u64 {
self.last_file_offset
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.entries.len()
}
}
#[derive(Debug)]
pub struct ReplayOverlay {
sectors: HashMap<u64, Vec<u8>>,
zeros: Vec<(u64, u64)>,
last_file_offset: u64,
}
impl ReplayOverlay {
pub fn last_file_offset(&self) -> u64 {
self.last_file_offset
}
pub fn read(&self, offset: u64, buf: &mut [u8]) -> usize {
if buf.is_empty() {
return 0;
}
for (§or_offset, sector_data) in &self.sectors {
let sector_end = sector_offset
+ u64::try_from(sector_data.len()).expect("sector data length fits u64");
if offset >= sector_offset && offset < sector_end {
let in_sector = usize::try_from(offset - sector_offset)
.expect("sector-relative offset fits usize");
let available = sector_data.len() - in_sector;
let to_copy = available.min(buf.len());
buf[..to_copy].copy_from_slice(§or_data[in_sector..in_sector + to_copy]);
return to_copy;
}
}
for &(zero_offset, zero_length) in &self.zeros {
let zero_end = zero_offset + zero_length;
if offset >= zero_offset && offset < zero_end {
let remaining = usize::try_from(zero_length - (offset - zero_offset))
.expect("remaining zero length fits usize");
let to_fill = remaining.min(buf.len());
buf[..to_fill].fill(0);
return to_fill;
}
}
0
}
pub fn apply_to_region(&self, region_data: &mut [u8], region_offset: u64) {
let region_end =
region_offset + u64::try_from(region_data.len()).expect("region length fits u64");
let mut touched = vec![false; region_data.len()];
for (§or_offset, sector_data) in &self.sectors {
let sector_end = sector_offset
+ u64::try_from(sector_data.len()).expect("sector data length fits u64");
if sector_end > region_offset && sector_offset < region_end {
let overlap_start = sector_offset.max(region_offset);
let overlap_end = sector_end.min(region_end);
let region_start = usize::try_from(overlap_start - region_offset)
.expect("region overlap start fits usize");
let sector_start = usize::try_from(overlap_start - sector_offset)
.expect("sector overlap start fits usize");
let len = usize::try_from(overlap_end - overlap_start)
.expect("overlap length fits usize");
region_data[region_start..region_start + len]
.copy_from_slice(§or_data[sector_start..sector_start + len]);
for touched_byte in touched.iter_mut().skip(region_start).take(len) {
*touched_byte = true;
}
}
}
for &(zero_offset, zero_length) in &self.zeros {
let zero_end = zero_offset + zero_length;
if zero_end > region_offset && zero_offset < region_end {
let overlap_start = zero_offset.max(region_offset);
let overlap_end = zero_end.min(region_end);
let region_start = usize::try_from(overlap_start - region_offset)
.expect("region overlap start fits usize");
let len = usize::try_from(overlap_end - overlap_start)
.expect("overlap length fits usize");
for i in region_start..region_start + len {
if !touched[i] {
region_data[i] = 0;
}
}
}
}
}
#[cfg(test)]
pub(super) fn sectors(&self) -> &HashMap<u64, Vec<u8>> {
&self.sectors
}
#[cfg(test)]
pub(super) fn zeros(&self) -> &[(u64, u64)] {
&self.zeros
}
#[cfg(test)]
pub(crate) fn from_raw(sectors: HashMap<u64, Vec<u8>>, zeros: Vec<(u64, u64)>) -> Self {
Self {
sectors,
zeros,
last_file_offset: 0,
}
}
}
pub fn detect_active_sequence<'a>(log: &'a Log<'a>, log_guid: &Guid) -> Result<ActiveSequence<'a>> {
let log_size = log.len();
if log_size == 0 {
return Err(Error::LogEntryCorrupted(
"log buffer is empty, no active sequence".into(),
));
}
let mut candidate_entries: Vec<(usize, u64)> = Vec::new(); let mut candidate_head_seq: u64 = 0;
let mut current_tail: usize = 0;
let mut old_tail: usize = 0;
loop {
let mut current_entries: Vec<(usize, u64)> = Vec::new();
let mut head: usize = current_tail;
let mut current_seq: u64 = 0;
loop {
let parsed = try_validate_entry(log, head, log_guid);
match parsed {
Some((entry, seq)) => {
if !current_entries.is_empty() && seq != current_seq + 1 {
break; }
let entry_len = entry.header().entry_length() as usize;
current_entries.push((head, seq));
current_seq = seq;
head = (head + entry_len) % log_size;
if head == 0 && !current_entries.is_empty() {
break;
}
}
None => break, }
}
let is_valid = if current_entries.is_empty() {
false
} else {
let head_entry_offset = current_entries.last().unwrap().0;
let head_entry = log.entry_at(head_entry_offset)?;
let tail = head_entry.header().tail() as usize;
current_entries.iter().any(|(off, _)| *off == tail)
};
if is_valid && current_seq > candidate_head_seq {
candidate_entries.clone_from(¤t_entries);
candidate_head_seq = current_seq;
}
if current_entries.is_empty() || !is_valid {
current_tail = (current_tail + SECTOR_SIZE as usize) % log_size;
} else {
let last_entry_offset = current_entries.last().unwrap().0;
let last_entry = log.entry_at(last_entry_offset)?;
let last_len = last_entry.header().entry_length() as usize;
let next_head = (last_entry_offset + last_len) % log_size;
current_tail = if next_head >= log_size {
next_head % log_size
} else {
next_head
};
}
if current_tail <= old_tail {
break;
}
old_tail = current_tail;
}
if candidate_entries.is_empty() {
return Err(Error::LogEntryCorrupted(
"no valid active log sequence found".into(),
));
}
let head_entry_offset = candidate_entries.last().unwrap().0;
let head_entry = log.entry_at(head_entry_offset)?;
let flushed_file_offset = head_entry.header().flushed_file_offset();
let last_file_offset = head_entry.header().last_file_offset();
let mut entries = Vec::with_capacity(candidate_entries.len());
for (offset, _seq) in &candidate_entries {
let entry = log.entry_at(*offset)?;
entries.push(LocatedEntry {
entry,
_offset: *offset,
});
}
Ok(ActiveSequence {
entries,
flushed_file_offset,
last_file_offset,
})
}
pub fn has_pending_log(log: &Log<'_>, log_guid: &Guid) -> bool {
if log_guid.to_bytes() == [0u8; 16] {
return false;
}
detect_active_sequence(log, log_guid).is_ok()
}
pub fn build_replay_overlay(active: &ActiveSequence<'_>) -> Result<ReplayOverlay> {
let mut sectors: HashMap<u64, Vec<u8>> = HashMap::new();
let mut zeros: Vec<(u64, u64)> = Vec::new();
for located in active.entries() {
let entry = &located.entry;
let desc_count = entry.header().descriptor_count();
let data_sectors: Vec<_> = entry.data().collect();
let mut data_idx = 0usize;
for di in 0..desc_count as usize {
let desc = entry.descriptor(di)?;
match desc {
Descriptor::Data(data_desc) => {
if data_idx >= data_sectors.len() {
return Err(Error::LogEntryCorrupted(format!(
"data descriptor {di} has no matching data sector ({} descriptors, {} sectors)",
desc_count,
data_sectors.len()
)));
}
let sector = &data_sectors[data_idx];
let file_offset = data_desc.file_offset();
sectors.insert(file_offset, sector.data().to_vec());
data_idx += 1;
}
Descriptor::Zero(zero_desc) => {
let file_offset = zero_desc.file_offset();
let zero_length = zero_desc.zero_length();
zeros.push((file_offset, zero_length));
}
}
}
}
Ok(ReplayOverlay {
sectors,
zeros,
last_file_offset: active.last_file_offset(),
})
}
pub fn replay_to_file<T>(file: &mut T, active: &ActiveSequence<'_>) -> Result<()>
where
T: Seek + Write + crate::medium::SetLen,
{
for located in active.entries() {
let entry = &located.entry;
let desc_count = entry.header().descriptor_count();
let data_sectors: Vec<_> = entry.data().collect();
let mut data_idx = 0usize;
for di in 0..desc_count as usize {
let desc = entry.descriptor(di)?;
match desc {
Descriptor::Data(data_desc) => {
if data_idx >= data_sectors.len() {
return Err(Error::LogEntryCorrupted(format!(
"data descriptor {di} has no matching data sector"
)));
}
let sector = &data_sectors[data_idx];
let file_offset = data_desc.file_offset();
write_all_at(file, file_offset, §or.data())?;
data_idx += 1;
}
Descriptor::Zero(zero_desc) => {
let file_offset = zero_desc.file_offset();
let zero_length = usize::try_from(zero_desc.zero_length())
.expect("zero descriptor length fits usize");
let zero_buf = vec![0u8; (SECTOR_SIZE as usize).min(zero_length)];
let mut written: usize = 0;
while written < zero_length {
let chunk = zero_buf.len().min(zero_length - written);
write_all_at(
file,
file_offset + u64::try_from(written).expect("written count fits u64"),
&zero_buf[..chunk],
)?;
written += chunk;
}
}
}
}
}
crate::medium::SetLen::set_len(file, active.last_file_offset())?;
Ok(())
}
fn try_validate_entry<'a>(
log: &'a Log<'a>, offset: usize, log_guid: &Guid,
) -> Option<(Entry<'a>, u64)> {
let entry = log.entry_at(offset).ok()?;
if entry.header().log_guid() != *log_guid {
return None;
}
if entry.verify_checksum().is_err() {
return None;
}
let seq = entry.header().sequence_number();
if seq == 0 {
return None;
}
Some((entry, seq))
}