use crate::error::Result;
use crate::event::{Event, EventKind};
use crate::halt::Halt;
use crate::sector::SectorSource;
use crossbeam_channel::{Receiver, Sender, bounded};
use std::thread::JoinHandle;
pub type EventFn = Box<dyn Fn(Event) + Send + 'static>;
const PREFETCH_CHANNEL_DEPTH: usize = 2;
const SECTOR_ALIGNMENT: u16 = 3;
pub type Batch = std::result::Result<Vec<u8>, std::io::Error>;
pub struct PrefetchedSectorSource {
rx: Receiver<Batch>,
recycle_tx: Sender<Vec<u8>>,
producer: Option<JoinHandle<()>>,
total_sectors: u32,
}
impl PrefetchedSectorSource {
pub fn new<S>(
reader: S,
extents: Vec<crate::disc::Extent>,
batch_sectors: u16,
halt: Option<Halt>,
) -> Self
where
S: SectorSource + Send + 'static,
{
Self::new_with_events(reader, extents, batch_sectors, halt, None)
}
pub fn new_with_events<S>(
mut reader: S,
extents: Vec<crate::disc::Extent>,
batch_sectors: u16,
halt: Option<Halt>,
event_fn: Option<EventFn>,
) -> Self
where
S: SectorSource + Send + 'static,
{
let total_sectors: u32 = extents.iter().map(|e| e.sector_count).sum();
let bytes_total_extents: u64 = extents.iter().map(|e| e.sector_count as u64 * 2048).sum();
let (tx, rx) = bounded::<Batch>(PREFETCH_CHANNEL_DEPTH);
let (recycle_tx, recycle_rx) = bounded::<Vec<u8>>(PREFETCH_CHANNEL_DEPTH + 1);
let batch_bytes = batch_sectors as usize * 2048;
for _ in 0..(PREFETCH_CHANNEL_DEPTH + 1) {
let _ = recycle_tx.send(vec![0u8; batch_bytes]);
}
let producer = std::thread::Builder::new()
.name("freemkv-prefetch".into())
.spawn(move || {
let mut ext_idx = 0usize;
let mut offset: u32 = 0;
let mut bytes_read_total: u64 = 0;
while ext_idx < extents.len() {
if halt.as_ref().map(|h| h.is_cancelled()).unwrap_or(false) {
return;
}
let extent = &extents[ext_idx];
let remaining = extent.sector_count.saturating_sub(offset);
if remaining == 0 {
ext_idx += 1;
offset = 0;
continue;
}
let mut sectors = remaining.min(batch_sectors as u32) as u16;
if sectors >= SECTOR_ALIGNMENT {
sectors -= sectors % SECTOR_ALIGNMENT;
}
let bytes = sectors as usize * 2048;
let mut buf = match recycle_rx.recv() {
Ok(b) => b,
Err(_) => return, };
if buf.len() < bytes {
buf.resize(bytes, 0);
} else {
unsafe { buf.set_len(bytes) };
}
let lba = extent.start_lba + offset;
match reader.read_sectors(lba, sectors, &mut buf[..bytes], false) {
Ok(n) => {
buf.truncate(n);
bytes_read_total = bytes_read_total.saturating_add(n as u64);
if let Some(ref f) = event_fn {
f(Event {
kind: EventKind::BytesRead {
bytes: bytes_read_total,
total: bytes_total_extents,
},
});
}
if tx.send(Ok(buf)).is_err() {
return; }
offset += sectors as u32;
}
Err(e) => {
let _ = tx.send(Err(e.into()));
return;
}
}
}
})
.expect("freemkv-prefetch producer spawn failed");
Self {
rx,
recycle_tx,
producer: Some(producer),
total_sectors,
}
}
pub fn into_channels(self) -> (Receiver<Batch>, Sender<Vec<u8>>, PrefetchShell) {
let total = self.total_sectors;
let mut me = self;
let producer = me.producer.take();
let rx = me.rx.clone();
let recycle = me.recycle_tx.clone();
std::mem::forget(me);
(rx, recycle, PrefetchShell { producer, total })
}
}
pub struct PrefetchShell {
producer: Option<JoinHandle<()>>,
#[allow(dead_code)]
total: u32,
}
impl Drop for PrefetchShell {
fn drop(&mut self) {
if let Some(h) = self.producer.take() {
let _ = h.join();
}
}
}
impl Drop for PrefetchedSectorSource {
fn drop(&mut self) {
if let Some(h) = self.producer.take() {
let _ = h.join();
}
}
}
impl SectorSource for PrefetchedSectorSource {
fn capacity_sectors(&self) -> u32 {
self.total_sectors
}
fn read_sectors(
&mut self,
_lba: u32,
_count: u16,
buf: &mut [u8],
_recovery: bool,
) -> Result<usize> {
match self.rx.recv() {
Ok(Ok(filled)) => {
let n = filled.len().min(buf.len());
buf[..n].copy_from_slice(&filled[..n]);
Ok(n)
}
Ok(Err(e)) => Err(crate::error::Error::IoError { source: e }),
Err(_) => Ok(0),
}
}
}