use crate::read::physical_read_request::PhysicalReadRequest;
use crate::read::physical_read_results::PhysicalReadResults;
use crate::read::physical_reader::PhysicalReader;
use std::collections::BTreeSet;
use tracing::info_span;
use tracing::instrument;
use tracing::warn;
use uom::ConstZero;
use uom::si::information::byte;
use uom::si::usize::Information;
use windows::core::PCWSTR;
use windows::core::Param;
#[derive(Debug, Default, Clone)]
pub struct PhysicalReadPlan {
requests: BTreeSet<PhysicalReadRequest>,
zero_length_behavior: ZeroLengthPushBehaviour,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub enum ZeroLengthPushBehaviour {
#[default]
Panic,
NoOp,
}
const DEFAULT_MAX_IN_FLIGHT_IO: usize = 32;
impl IntoIterator for PhysicalReadPlan {
type Item = PhysicalReadRequest;
type IntoIter = std::collections::btree_set::IntoIter<PhysicalReadRequest>;
fn into_iter(self) -> Self::IntoIter {
self.requests.into_iter()
}
}
impl FromIterator<PhysicalReadRequest> for PhysicalReadPlan {
fn from_iter<T: IntoIterator<Item = PhysicalReadRequest>>(iter: T) -> Self {
let mut plan = PhysicalReadPlan::new();
for req in iter {
plan.push(req);
}
plan
}
}
impl PhysicalReadPlan {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn set_zero_length_behavior(&mut self, behavior: ZeroLengthPushBehaviour) -> &mut Self {
self.zero_length_behavior = behavior;
self
}
pub fn push(&mut self, request: PhysicalReadRequest) -> &mut Self {
if request.length == Information::ZERO {
match self.zero_length_behavior {
ZeroLengthPushBehaviour::Panic => panic!("Zero-length push detected"),
ZeroLengthPushBehaviour::NoOp => return self,
}
}
self.requests.insert(request);
self
}
pub fn merge_contiguous_reads(&mut self) -> &mut Self {
if self.requests.is_empty() {
return self;
}
let physical_requests = std::mem::take(&mut self.requests);
let merged = &mut self.requests;
for req in physical_requests {
let Some(mut last) = merged.pop_last() else {
merged.insert(req);
continue;
};
if last.physical_end() == req.offset {
last.length += req.length;
merged.insert(last);
} else {
merged.insert(last);
merged.insert(req);
}
}
self
}
#[must_use]
#[instrument(skip_all)]
pub fn chunked(&self, chunk_size: Information) -> Self {
if chunk_size == Information::ZERO {
return self.clone();
}
let mut out = PhysicalReadPlan::new();
for req in &self.requests {
let mut remaining = req.length;
let mut current_physical_offset = req.offset;
while remaining > Information::ZERO {
let segment_size = if remaining > chunk_size {
chunk_size
} else {
remaining
};
out.push(PhysicalReadRequest::new(
current_physical_offset,
segment_size,
));
current_physical_offset += segment_size;
remaining -= segment_size;
}
}
out
}
pub fn align_512(&mut self) -> &mut Self {
if self.requests.is_empty() {
return self;
}
let sector_size = Information::new::<byte>(512);
for mut req in std::mem::take(&mut self.requests) {
req.align_to_sector_size(sector_size);
self.push(req);
}
self.merge_contiguous_reads();
self
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.requests.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.requests.len()
}
#[must_use]
pub fn total_size(&self) -> Information {
self.requests
.iter()
.map(|r| r.length)
.fold(Information::ZERO, |a, b| a + b)
}
#[instrument(skip_all)]
pub fn read(self, filename: impl Param<PCWSTR>) -> eyre::Result<PhysicalReadResults> {
if self.is_empty() {
return Ok(PhysicalReadResults::new());
}
let max_in_flight = max_in_flight_io();
let request_count = self.requests.len();
let total_size = self.total_size().get::<byte>();
let reader = {
let _span = info_span!(
"create_physical_reader",
request_count,
total_physical_bytes = total_size,
max_in_flight,
)
.entered();
PhysicalReader::try_new(filename, self.requests, max_in_flight)?
};
reader.read_all()
}
}
fn max_in_flight_io() -> usize {
let Ok(value) = std::env::var("TEAMY_MFT_MAX_IN_FLIGHT_IO") else {
return DEFAULT_MAX_IN_FLIGHT_IO;
};
match value.parse::<usize>() {
Ok(0) => {
warn!(
env_value = %value,
default = DEFAULT_MAX_IN_FLIGHT_IO,
"Ignoring TEAMY_MFT_MAX_IN_FLIGHT_IO=0; using default"
);
DEFAULT_MAX_IN_FLIGHT_IO
}
Ok(parsed) => parsed,
Err(error) => {
warn!(
env_value = %value,
%error,
default = DEFAULT_MAX_IN_FLIGHT_IO,
"Ignoring invalid TEAMY_MFT_MAX_IN_FLIGHT_IO; using default"
);
DEFAULT_MAX_IN_FLIGHT_IO
}
}
}
#[cfg(test)]
mod test {
use crate::read::physical_read_plan::PhysicalReadPlan;
use crate::read::physical_read_plan::ZeroLengthPushBehaviour;
use crate::read::physical_read_request::PhysicalReadRequest;
use uom::si::information::byte;
use uom::si::usize::Information;
fn info(bytes: impl Into<usize>) -> Information {
Information::new::<byte>(bytes.into())
}
#[test]
fn merge_adjacent_pushes() {
let mut r = PhysicalReadPlan::new();
r.push(PhysicalReadRequest::new(info(0usize), info(100usize)));
r.push(PhysicalReadRequest::new(info(100usize), info(50usize))); r.merge_contiguous_reads();
assert_eq!(r.len(), 1usize, "Expected contiguous pushes to merge");
let reqs: Vec<_> = r.clone().into_iter().collect();
assert_eq!(reqs[0].offset.get::<byte>(), 0usize);
assert_eq!(reqs[0].length.get::<byte>(), 150usize);
assert_eq!(r.total_size().get::<byte>(), 150usize);
}
#[test]
fn non_adjacent_does_not_merge() {
let mut r = PhysicalReadPlan::new();
r.push(PhysicalReadRequest::new(info(0usize), info(100usize)));
r.push(PhysicalReadRequest::new(info(101usize), info(50usize))); r.merge_contiguous_reads();
assert_eq!(r.len(), 2usize, "Non-contiguous pushes must not merge");
}
#[test]
fn chunking_splits_without_merging_chunks() {
let mut r = PhysicalReadPlan::new();
r.push(PhysicalReadRequest::new(info(0usize), info(300usize))); let chunked = r.chunked(info(128usize));
assert_eq!(chunked.len(), 3usize, "Chunking should split into 3 parts");
let reqs: Vec<_> = chunked.clone().into_iter().collect();
assert_eq!(reqs[0].offset.get::<byte>(), 0usize);
assert_eq!(reqs[0].length.get::<byte>(), 128usize);
assert_eq!(reqs[1].offset.get::<byte>(), 128usize);
assert_eq!(reqs[1].length.get::<byte>(), 128usize);
assert_eq!(reqs[2].offset.get::<byte>(), 256usize);
assert_eq!(reqs[2].length.get::<byte>(), 44usize);
assert_eq!(
chunked.total_size().get::<byte>(),
300usize,
"Total requested should remain constant"
);
}
#[test]
fn chunking_respects_exact_division() {
let mut r = PhysicalReadPlan::new();
r.push(PhysicalReadRequest::new(info(4096usize), info(4096usize)));
let c = r.chunked(info(1024usize));
assert_eq!(c.len(), 4usize);
let reqs: Vec<_> = c.clone().into_iter().collect();
for (i, req) in reqs.iter().enumerate() {
assert_eq!(req.offset.get::<byte>(), 4096usize + i * 1024usize);
assert_eq!(req.length.get::<byte>(), 1024usize);
}
}
#[test]
fn zero_length_push_ignored() {
let mut r = PhysicalReadPlan::new();
r.set_zero_length_behavior(ZeroLengthPushBehaviour::NoOp);
r.push(PhysicalReadRequest::new(info(0usize), info(0usize)));
assert!(r.is_empty());
}
#[test]
#[should_panic(expected = "Zero-length push detected")]
fn zero_length_push_panics_by_default() {
let mut r = PhysicalReadPlan::new();
r.push(PhysicalReadRequest::new(info(0usize), info(0usize)));
}
}