use crate::halt::Halt;
use crossbeam_channel::{Receiver, Sender, bounded};
use std::io::Read;
use std::thread::JoinHandle;
pub type Batch = std::io::Result<Vec<u8>>;
const FORWARD_DEPTH: usize = 2;
const RECYCLE_DEPTH: usize = FORWARD_DEPTH + 1;
pub const DEFAULT_CHUNK_BYTES: usize = 16 * 1024 * 1024;
pub struct PrefetchShell {
producer: Option<JoinHandle<()>>,
}
impl Drop for PrefetchShell {
fn drop(&mut self) {
if let Some(h) = self.producer.take() {
let _ = h.join();
}
}
}
pub struct BytePrefetcher {
rx: Receiver<Batch>,
recycle_tx: Sender<Vec<u8>>,
producer: Option<JoinHandle<()>>,
}
impl BytePrefetcher {
pub fn new<R: Read + Send + 'static>(
mut reader: R,
chunk_bytes: usize,
halt: Option<Halt>,
) -> Self {
let (tx, rx) = bounded::<Batch>(FORWARD_DEPTH);
let (recycle_tx, recycle_rx) = bounded::<Vec<u8>>(RECYCLE_DEPTH);
for _ in 0..RECYCLE_DEPTH {
let _ = recycle_tx.send(vec![0u8; chunk_bytes]);
}
let producer = std::thread::Builder::new()
.name("freemkv-byte-prefetch".into())
.spawn(move || {
loop {
if halt.as_ref().map(|h| h.is_cancelled()).unwrap_or(false) {
return;
}
let mut buf = match recycle_rx.recv() {
Ok(b) => b,
Err(_) => return, };
if buf.len() < chunk_bytes {
buf.resize(chunk_bytes, 0);
} else {
unsafe { buf.set_len(chunk_bytes) };
}
let n = match reader.read(&mut buf[..]) {
Ok(0) => return, Ok(n) => n,
Err(e) => {
let _ = tx.send(Err(e));
return;
}
};
buf.truncate(n);
if tx.send(Ok(buf)).is_err() {
return; }
}
})
.expect("freemkv-byte-prefetch thread spawn failed");
Self {
rx,
recycle_tx,
producer: Some(producer),
}
}
pub fn into_channels(self) -> (Receiver<Batch>, Sender<Vec<u8>>, PrefetchShell) {
let mut me = self;
let producer = me.producer.take();
let rx = me.rx.clone();
let recycle = me.recycle_tx.clone();
std::mem::forget(me);
(rx, recycle, PrefetchShell { producer })
}
}
impl Drop for BytePrefetcher {
fn drop(&mut self) {
if let Some(h) = self.producer.take() {
let _ = h.join();
}
}
}