use crate::halt::Halt;
use crate::sector::SectorSource;
use crossbeam_channel::{Receiver, Sender, bounded};
use std::thread::JoinHandle;
const DEMUX_CHANNEL_DEPTH: usize = 2;
pub enum DemuxBatch {
Ts(Vec<super::ts::PesPacket>),
Ps(Vec<super::ps::PsPacket>),
Err(std::io::Error),
}
pub struct DemuxThread {
handle: Option<JoinHandle<()>>,
#[allow(dead_code)]
producer_shell: Option<Box<dyn Send>>,
}
impl DemuxThread {
pub fn spawn<S: SectorSource + Send + 'static>(
mut reader: S,
extents: Vec<crate::disc::Extent>,
batch_sectors: u16,
halt: Option<Halt>,
ts: Option<super::ts::TsDemuxer>,
ps: Option<super::ps::PsDemuxer>,
) -> (Self, Receiver<DemuxBatch>) {
let (tx, rx) = bounded::<DemuxBatch>(DEMUX_CHANNEL_DEPTH);
let mut ts = ts;
let mut ps = ps;
let handle = std::thread::Builder::new()
.name("freemkv-demux".into())
.spawn(move || {
let mut buf = vec![0u8; batch_sectors as usize * 2048];
let mut ext_idx = 0usize;
let mut offset: u32 = 0;
let prof = std::env::var_os("FREEMKV_PROFILE").is_some();
let mut prof_started = std::time::Instant::now();
let mut prof_last_dump = prof_started;
let mut prof_read_ns: u128 = 0;
let mut prof_feed_ns: u128 = 0;
let mut prof_send_ns: u128 = 0;
let mut prof_bytes: u64 = 0;
while ext_idx < extents.len() {
if halt.as_ref().map(|h| h.is_cancelled()).unwrap_or(false) {
return;
}
let ext = &extents[ext_idx];
let remaining = ext.sector_count.saturating_sub(offset);
if remaining == 0 {
ext_idx += 1;
offset = 0;
continue;
}
let mut sectors = remaining.min(batch_sectors as u32) as u16;
if sectors >= 3 {
sectors -= sectors % 3;
}
let bytes = sectors as usize * 2048;
if buf.len() < bytes {
buf.resize(bytes, 0);
}
let lba = ext.start_lba + offset;
let t0 = if prof {
Some(std::time::Instant::now())
} else {
None
};
let n = match reader.read_sectors(lba, sectors, &mut buf[..bytes], false) {
Ok(n) => n,
Err(e) => {
let _ = tx.send(DemuxBatch::Err(e.into()));
return;
}
};
let t1 = if prof {
Some(std::time::Instant::now())
} else {
None
};
offset += sectors as u32;
if let Some(ref mut d) = ts {
let pkts = d.feed(&buf[..n]);
let t2 = if prof {
Some(std::time::Instant::now())
} else {
None
};
if !pkts.is_empty() && tx.send(DemuxBatch::Ts(pkts)).is_err() {
return; }
let t3 = if prof {
Some(std::time::Instant::now())
} else {
None
};
if prof {
prof_read_ns += t1.unwrap().duration_since(t0.unwrap()).as_nanos();
prof_feed_ns += t2.unwrap().duration_since(t1.unwrap()).as_nanos();
prof_send_ns += t3.unwrap().duration_since(t2.unwrap()).as_nanos();
prof_bytes += n as u64;
let now = t3.unwrap();
if now.duration_since(prof_last_dump)
>= std::time::Duration::from_secs(5)
{
let el = now.duration_since(prof_started).as_millis().max(1);
let mbps = prof_bytes as u128 * 1000 / 1_000_000 / el;
eprintln!(
"[demux] elapsed={}ms in={}MB/s read={}% feed={}% send={}%",
el,
mbps,
prof_read_ns / 10_000 / el,
prof_feed_ns / 10_000 / el,
prof_send_ns / 10_000 / el,
);
prof_last_dump = now;
prof_started = now;
prof_read_ns = 0;
prof_feed_ns = 0;
prof_send_ns = 0;
prof_bytes = 0;
}
}
} else if let Some(ref mut d) = ps {
let pkts = d.feed(&buf[..n]);
if !pkts.is_empty() && tx.send(DemuxBatch::Ps(pkts)).is_err() {
return;
}
}
}
if let Some(ref mut d) = ts {
let tail = d.flush();
if !tail.is_empty() {
let _ = tx.send(DemuxBatch::Ts(tail));
}
} else if let Some(ref mut d) = ps {
let tail = d.flush();
if !tail.is_empty() {
let _ = tx.send(DemuxBatch::Ps(tail));
}
}
})
.expect("freemkv-demux thread spawn failed");
(
Self {
handle: Some(handle),
producer_shell: None,
},
rx,
)
}
pub fn spawn_zero_copy<S: Send + 'static>(
prefetch_rx: Receiver<std::io::Result<Vec<u8>>>,
recycle_tx: Sender<Vec<u8>>,
producer_shell: S,
halt: Option<Halt>,
ts: Option<super::ts::TsDemuxer>,
ps: Option<super::ps::PsDemuxer>,
) -> (Self, Receiver<DemuxBatch>) {
let (tx, rx) = bounded::<DemuxBatch>(DEMUX_CHANNEL_DEPTH);
let mut ts = ts;
let mut ps = ps;
let handle = std::thread::Builder::new()
.name("freemkv-demux".into())
.spawn(move || {
let prof = std::env::var_os("FREEMKV_PROFILE").is_some();
let mut prof_started = std::time::Instant::now();
let mut prof_last_dump = prof_started;
let mut prof_read_ns: u128 = 0;
let mut prof_feed_ns: u128 = 0;
let mut prof_bytes: u64 = 0;
loop {
if halt.as_ref().map(|h| h.is_cancelled()).unwrap_or(false) {
return;
}
let t0 = if prof {
Some(std::time::Instant::now())
} else {
None
};
let buf = match prefetch_rx.recv() {
Ok(Ok(b)) => b,
Ok(Err(e)) => {
let _ = tx.send(DemuxBatch::Err(e));
return;
}
Err(_) => break, };
let t1 = if prof {
Some(std::time::Instant::now())
} else {
None
};
let n = buf.len();
if let Some(ref mut d) = ts {
let pkts = d.feed(&buf);
let t2 = if prof {
Some(std::time::Instant::now())
} else {
None
};
let _ = recycle_tx.send(buf);
if !pkts.is_empty() && tx.send(DemuxBatch::Ts(pkts)).is_err() {
return;
}
if prof {
prof_read_ns += t1.unwrap().duration_since(t0.unwrap()).as_nanos();
prof_feed_ns += t2.unwrap().duration_since(t1.unwrap()).as_nanos();
prof_bytes += n as u64;
let now = std::time::Instant::now();
if now.duration_since(prof_last_dump)
>= std::time::Duration::from_secs(5)
{
let el = now.duration_since(prof_started).as_millis().max(1);
let mbps = prof_bytes as u128 * 1000 / 1_000_000 / el;
eprintln!(
"[demux] elapsed={}ms in={}MB/s read={}% feed={}%",
el,
mbps,
prof_read_ns / 10_000 / el,
prof_feed_ns / 10_000 / el,
);
prof_last_dump = now;
prof_started = now;
prof_read_ns = 0;
prof_feed_ns = 0;
prof_bytes = 0;
}
}
} else if let Some(ref mut d) = ps {
let pkts = d.feed(&buf);
let _ = recycle_tx.send(buf);
if !pkts.is_empty() && tx.send(DemuxBatch::Ps(pkts)).is_err() {
return;
}
} else {
let _ = recycle_tx.send(buf);
}
}
if let Some(ref mut d) = ts {
let tail = d.flush();
if !tail.is_empty() {
let _ = tx.send(DemuxBatch::Ts(tail));
}
} else if let Some(ref mut d) = ps {
let tail = d.flush();
if !tail.is_empty() {
let _ = tx.send(DemuxBatch::Ps(tail));
}
}
})
.expect("freemkv-demux thread spawn failed");
(
Self {
handle: Some(handle),
producer_shell: Some(Box::new(producer_shell)),
},
rx,
)
}
}
impl Drop for DemuxThread {
fn drop(&mut self) {
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}