use std::io::{Seek, SeekFrom, Write};
use std::sync::mpsc::{Receiver, SyncSender, TrySendError, sync_channel};
use std::thread::{self, JoinHandle};
use crate::error::{Error, Result};
use super::mapfile::{MapStats, Mapfile, SectorStatus};
const CHANNEL_DEPTH: usize = 4;
const ZERO_CHUNK: usize = 65 * 1024;
pub(super) enum WorkItem {
Good { pos: u64, buf: Vec<u8> },
BisectGood { pos: u64, buf: Box<[u8; 2048]> },
BisectBad { pos: u64 },
SkipFill { pos: u64, len: u64 },
GapFill { pos: u64, len: u64 },
StatsRequest,
Finish,
}
pub(super) struct ProgressSnapshot {
pub stats: MapStats,
pub bad_ranges: Vec<(u64, u64)>,
}
pub(super) struct ConsumerSummary {
pub stats: MapStats,
pub error: Option<Error>,
}
pub(super) struct ConsumerInputs {
pub file: crate::io::Writer,
pub map: Mapfile,
pub is_regular: bool,
}
pub(super) fn spawn_consumer(
inputs: ConsumerInputs,
) -> (
SyncSender<WorkItem>,
Receiver<ProgressSnapshot>,
JoinHandle<ConsumerSummary>,
) {
let (work_tx, work_rx) = sync_channel::<WorkItem>(CHANNEL_DEPTH);
let (prog_tx, prog_rx) = sync_channel::<ProgressSnapshot>(1);
let handle = thread::Builder::new()
.name("freemkv-sweep-consumer".into())
.spawn(move || consumer_loop(inputs, work_rx, prog_tx))
.expect("spawning sweep consumer thread should not fail");
(work_tx, prog_rx, handle)
}
pub(super) fn send_or_abort(tx: &SyncSender<WorkItem>, item: WorkItem) -> Result<()> {
tx.send(item).map_err(|_| Error::IoError {
source: std::io::Error::other("sweep consumer terminated unexpectedly"),
})
}
pub(super) fn try_request_stats(tx: &SyncSender<WorkItem>) {
if let Err(TrySendError::Full(_)) = tx.try_send(WorkItem::StatsRequest) {
}
}
pub(super) fn try_recv_progress(rx: &Receiver<ProgressSnapshot>) -> Option<ProgressSnapshot> {
let mut latest = None;
while let Ok(snap) = rx.try_recv() {
latest = Some(snap);
}
latest
}
fn consumer_loop(
mut inputs: ConsumerInputs,
work_rx: Receiver<WorkItem>,
prog_tx: SyncSender<ProgressSnapshot>,
) -> ConsumerSummary {
let zero = [0u8; ZERO_CHUNK];
let mut first_error: Option<Error> = None;
while let Ok(item) = work_rx.recv() {
if first_error.is_some() {
if matches!(item, WorkItem::Finish) {
break;
}
continue;
}
match apply_item(&mut inputs, item, &zero, &prog_tx) {
Ok(true) => {}
Ok(false) => break, Err(e) => first_error = Some(e),
}
}
if first_error.is_none() {
if let Err(e) = inputs.file.sync_all() {
if inputs.is_regular {
first_error = Some(Error::IoError { source: e });
}
}
if let Err(e) = inputs.map.flush() {
first_error = Some(Error::IoError { source: e });
}
}
ConsumerSummary {
stats: inputs.map.stats(),
error: first_error,
}
}
fn apply_item(
inputs: &mut ConsumerInputs,
item: WorkItem,
zero: &[u8; ZERO_CHUNK],
prog_tx: &SyncSender<ProgressSnapshot>,
) -> Result<bool> {
match item {
WorkItem::Good { pos, buf } => {
let len = buf.len() as u64;
inputs
.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
inputs
.file
.write_all(&buf)
.map_err(|e| Error::IoError { source: e })?;
inputs
.map
.record(pos, len, SectorStatus::Finished)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::BisectGood { pos, buf } => {
inputs
.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
inputs
.file
.write_all(&buf[..])
.map_err(|e| Error::IoError { source: e })?;
inputs
.map
.record(pos, 2048, SectorStatus::Finished)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::BisectBad { pos } => {
inputs
.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
inputs
.file
.write_all(&zero[..2048])
.map_err(|e| Error::IoError { source: e })?;
inputs
.map
.record(pos, 2048, SectorStatus::NonTrimmed)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::SkipFill { pos, len } | WorkItem::GapFill { pos, len } => {
inputs
.file
.seek(SeekFrom::Start(pos))
.map_err(|e| Error::IoError { source: e })?;
let mut filled = 0u64;
while filled < len {
let chunk = (len - filled).min(zero.len() as u64) as usize;
inputs
.file
.write_all(&zero[..chunk])
.map_err(|e| Error::IoError { source: e })?;
filled += chunk as u64;
}
inputs
.map
.record(pos, len, SectorStatus::NonTrimmed)
.map_err(|e| Error::IoError { source: e })?;
}
WorkItem::StatsRequest => {
let stats = inputs.map.stats();
let bad_ranges = inputs.map.ranges_with(&[
SectorStatus::NonTrimmed,
SectorStatus::Unreadable,
SectorStatus::NonScraped,
SectorStatus::NonTried,
]);
let _ = prog_tx.try_send(ProgressSnapshot { stats, bad_ranges });
}
WorkItem::Finish => return Ok(false),
}
Ok(true)
}