use std::io::{Seek, SeekFrom, Write};
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
use crate::error::Error;
use crate::io::{Flow, Sink};
use super::mapfile::{MapStats, Mapfile, SectorStatus};
const ZERO_CHUNK: usize = 65 * 1024;
pub(super) enum WorkItem {
Good { pos: u64, buf: Vec<u8> },
BisectGood { pos: u64, buf: Box<[u8; 2048]> },
BisectBad { pos: u64 },
SkipFill { pos: u64, len: u64 },
GapFill { pos: u64, len: u64 },
StatsRequest,
}
pub(super) struct ProgressSnapshot {
pub stats: MapStats,
pub bad_ranges: Vec<(u64, u64)>,
}
pub(super) struct ConsumerSummary {
pub stats: MapStats,
}
pub(super) fn try_recv_progress(rx: &Receiver<ProgressSnapshot>) -> Option<ProgressSnapshot> {
let mut latest = None;
while let Ok(snap) = rx.try_recv() {
latest = Some(snap);
}
latest
}
pub(super) struct SweepSink {
file: crate::io::WritebackFile,
map: Mapfile,
is_regular: bool,
prog_tx: SyncSender<ProgressSnapshot>,
zero: Box<[u8; ZERO_CHUNK]>,
}
impl SweepSink {
pub(super) fn new(
file: crate::io::WritebackFile,
map: Mapfile,
is_regular: bool,
) -> (Self, Receiver<ProgressSnapshot>) {
let (prog_tx, prog_rx) = sync_channel::<ProgressSnapshot>(1);
let sink = SweepSink {
file,
map,
is_regular,
prog_tx,
zero: Box::new([0u8; ZERO_CHUNK]),
};
(sink, prog_rx)
}
}
impl Sink<WorkItem> for SweepSink {
type Output = ConsumerSummary;
fn apply(&mut self, item: WorkItem) -> Result<Flow, Error> {
match item {
WorkItem::Good { pos, buf } => {
let len = buf.len() as u64;
self.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
self.file
.write_all(&buf)
.map_err(|e| Error::IoError { source: e })?;
self.map
.record(pos, len, SectorStatus::Finished)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::BisectGood { pos, buf } => {
self.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
self.file
.write_all(&buf[..])
.map_err(|e| Error::IoError { source: e })?;
self.map
.record(pos, 2048, SectorStatus::Finished)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::BisectBad { pos } => {
self.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
self.file
.write_all(&self.zero[..2048])
.map_err(|e| Error::IoError { source: e })?;
self.map
.record(pos, 2048, SectorStatus::NonTrimmed)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::SkipFill { pos, len } | WorkItem::GapFill { pos, len } => {
self.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
let mut filled = 0u64;
while filled < len {
let chunk = (len - filled).min(self.zero.len() as u64) as usize;
self.file
.write_all(&self.zero[..chunk])
.map_err(|e| Error::IoError { source: e })?;
filled += chunk as u64;
}
self.map
.record(pos, len, SectorStatus::NonTrimmed)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::StatsRequest => {
let stats = self.map.stats();
let bad_ranges = self.map.ranges_with(&[
SectorStatus::NonTrimmed,
SectorStatus::Unreadable,
SectorStatus::NonScraped,
SectorStatus::NonTried,
]);
let _ = self
.prog_tx
.try_send(ProgressSnapshot { stats, bad_ranges });
}
}
Ok(Flow::Continue)
}
fn close(mut self) -> Result<Self::Output, Error> {
if let Err(e) = self.file.sync_all() {
if self.is_regular {
return Err(Error::IoError { source: e });
}
}
self.map.flush().map_err(|e| Error::IoError { source: e })?;
Ok(ConsumerSummary {
stats: self.map.stats(),
})
}
}