use humansize::BINARY;
use std::time::Instant;
use teamy_uom_extensions::HumanInformationRateExt;
use teamy_uom_extensions::HumanTimeExt;
use teamy_uom_extensions::InformationOverExt;
use thousands::Separable;
use tracing::debug;
use tracing::debug_span;
use tracing::instrument;
use tracing::trace;
use uom::si::f64::Information;
use uom::si::f64::Time;
use uom::si::information::byte;
use uom::si::time::second;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct FixupStats {
pub applied: u64,
pub already_applied: u64,
pub invalid: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FixupState {
Applied,
AlreadyApplied,
Invalid,
}
impl FixupStats {
#[inline]
pub fn record(&mut self, state: FixupState) {
match state {
FixupState::Applied => self.applied += 1,
FixupState::AlreadyApplied => self.already_applied += 1,
FixupState::Invalid => self.invalid += 1,
}
}
}
const SECTOR: usize = 512;
#[inline]
#[must_use]
pub fn detect_entry_size(entry0: &[u8]) -> Option<u32> {
if entry0.len() < 0x20 {
return None;
}
let sz = u32::from_le_bytes(entry0[0x1C..0x20].try_into().ok()?);
if sz == 0 {
return None;
}
Some(sz)
}
#[inline]
fn read_update_sequence_array_fields(entry: &[u8]) -> Option<(usize, usize)> {
if entry.len() < 8 {
return None;
}
let usa_offset = u16::from_le_bytes([entry[4], entry[5]]) as usize;
let usa_size = u16::from_le_bytes([entry[6], entry[7]]) as usize; Some((usa_offset, usa_size))
}
#[inline]
#[must_use]
pub fn needs_fixup(entry: &[u8]) -> bool {
let Some((usa_offset, usa_size)) = read_update_sequence_array_fields(entry) else {
return false;
};
if usa_size < 2 {
return false;
}
let fixup_bytes_len = usa_size * 2; if usa_offset + fixup_bytes_len > entry.len() {
return false;
}
if entry.len() < SECTOR {
return false;
}
let update_sequence = &entry[usa_offset..usa_offset + 2];
&entry[SECTOR - 2..SECTOR] == update_sequence
}
#[inline]
pub fn apply_fixup_in_place(entry: &mut [u8]) -> FixupState {
let Some((usa_offset, usa_size)) = read_update_sequence_array_fields(entry) else {
return FixupState::Invalid;
};
if usa_size < 2 {
return FixupState::AlreadyApplied;
}
let total_fixup_bytes = usa_size * 2;
if usa_offset + total_fixup_bytes > entry.len() {
return FixupState::Invalid;
}
let update_sequence = {
let start = usa_offset;
let end = start + 2;
entry[start..end].to_vec()
}; let original_bytes = {
let start = usa_offset + 2;
let end = usa_offset + total_fixup_bytes;
entry[start..end].to_vec()
};
let sectors = usa_size - 1;
let mut any_applied = false;
for i in 0..sectors {
let sector_end = (i + 1) * SECTOR;
if sector_end > entry.len() || sector_end < 2 {
return if any_applied {
FixupState::Applied
} else {
FixupState::Invalid
};
}
let tail_start = sector_end - 2;
let (head, tail_and_rest) = entry.split_at_mut(tail_start);
let tail = &mut tail_and_rest[..2];
let _ = head;
if tail == &*update_sequence {
let fix_slice = &original_bytes[i * 2..i * 2 + 2];
tail.copy_from_slice(fix_slice);
any_applied = true;
} else {
let original = &original_bytes[i * 2..i * 2 + 2];
if tail != original {
return FixupState::Invalid;
}
}
}
if any_applied {
FixupState::Applied
} else {
FixupState::AlreadyApplied
}
}
#[instrument(level = "debug", skip_all)]
pub fn apply_fixups_parallel(buf: &mut [u8], entry_size: usize) -> FixupStats {
use rayon::prelude::*;
{
let _span = debug_span!("validate_entry_alignment").entered();
if entry_size == 0 || !buf.len().is_multiple_of(entry_size) {
debug!(
"Invalid/unaligned entry size: entry_size={} buf_len={}",
entry_size,
buf.len()
);
return FixupStats::default();
}
}
let entry_count = {
let _span = debug_span!("compute_entry_count").entered();
buf.len() / entry_size
};
trace!(
"Detected entry size: {} bytes, total entries: {}",
entry_size.separate_with_commas(),
entry_count.separate_with_commas()
);
let start = Instant::now();
let stats = {
let _span = debug_span!("parallel_apply_fixups").entered();
buf.par_chunks_mut(entry_size)
.enumerate()
.map(|(_entry_index, entry)| {
#[cfg(feature = "extended_observability_per_record")]
let _span = debug_span!("apply_fixup_to_entry").entered();
if entry.len() < entry_size {
return FixupState::Invalid;
}
apply_fixup_in_place(entry)
})
.fold(FixupStats::default, |mut acc, state| {
#[cfg(feature = "extended_observability_per_record")]
let _span = debug_span!("fold_fixup_state").entered();
acc.record(state);
acc
})
.reduce(FixupStats::default, |a, b| {
#[cfg(feature = "extended_observability_per_record")]
let _span = debug_span!("reduce_fixup_stats").entered();
FixupStats {
applied: a.applied + b.applied,
already_applied: a.already_applied + b.already_applied,
invalid: a.invalid + b.invalid,
}
})
};
let (elapsed, rate) = {
let _span = debug_span!("compute_fixup_telemetry").entered();
let elapsed = Time::new::<second>(start.elapsed().as_secs_f64());
#[allow(
clippy::cast_precision_loss,
reason = "precision loss is acceptable for size estimation"
)]
let total_size = Information::new::<byte>(buf.len() as f64);
let rate = total_size.over(elapsed);
(elapsed, rate)
};
trace!(
"Took {elapsed} to process {count} records ({rate}) - fixup stats: applied={applied} already-applied={already_applied} invalid={invalid}",
elapsed = elapsed.format_human(),
count = entry_count.separate_with_commas(),
rate = rate.format_human(BINARY),
applied = stats.applied.separate_with_commas(),
already_applied = stats.already_applied.separate_with_commas(),
invalid = stats.invalid.separate_with_commas()
);
stats
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_entry_size_basic() {
let mut entry = vec![0u8; 0x20];
entry[0..4].copy_from_slice(b"FILE");
entry[0x1C..0x20].copy_from_slice(&1024u32.to_le_bytes());
assert_eq!(detect_entry_size(&entry), Some(1024));
}
}