use std::io::{Read, Seek, SeekFrom};
use std::sync::{Arc, Condvar, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{Context, Result};
const PREBUFFER_BYTES: usize = 256 * 1024;
struct StreamInner {
buf: Mutex<Vec<u8>>,
cond: Condvar,
done: AtomicBool,
}
pub struct StreamingReader {
inner: Arc<StreamInner>,
pos: u64,
}
impl Read for StreamingReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
let pos = self.pos as usize;
let buf = {
let guard = self.inner.buf.lock().unwrap();
self.inner
.cond
.wait_while(guard, |b| b.len() <= pos && !self.inner.done.load(Ordering::Acquire))
.unwrap()
};
let available = buf.len().saturating_sub(pos);
if available == 0 {
return Ok(0); }
let n = out.len().min(available);
out[..n].copy_from_slice(&buf[pos..pos + n]);
drop(buf);
self.pos += n as u64;
Ok(n)
}
}
impl Seek for StreamingReader {
fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> {
let new_pos: i64 = match from {
SeekFrom::Start(off) => off as i64,
SeekFrom::Current(off) => self.pos as i64 + off,
SeekFrom::End(off) => {
let buf = {
let guard = self.inner.buf.lock().unwrap();
self.inner
.cond
.wait_while(guard, |_| !self.inner.done.load(Ordering::Acquire))
.unwrap()
};
let len = buf.len() as i64;
drop(buf);
len + off
}
};
if new_pos < 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek before start of stream",
));
}
self.pos = new_pos as u64;
Ok(self.pos)
}
}
pub fn open_stream(url: &str) -> Result<StreamingReader> {
let inner = Arc::new(StreamInner {
buf: Mutex::new(Vec::new()),
cond: Condvar::new(),
done: AtomicBool::new(false),
});
let inner_dl = inner.clone();
let url = url.to_owned();
std::thread::Builder::new()
.name("playterm-stream".into())
.spawn(move || download_thread(&url, inner_dl))
.context("failed to spawn stream thread")?;
{
let guard = inner.buf.lock().unwrap();
let _guard = inner
.cond
.wait_while(guard, |b| {
b.len() < PREBUFFER_BYTES && !inner.done.load(Ordering::Acquire)
})
.unwrap();
}
Ok(StreamingReader { inner, pos: 0 })
}
fn download_thread(url: &str, inner: Arc<StreamInner>) {
let _ = download_into(url, &inner);
inner.done.store(true, Ordering::Release);
inner.cond.notify_all();
}
fn download_into(url: &str, inner: &StreamInner) -> Result<()> {
use std::io::Read as _;
let mut response = reqwest::blocking::get(url).context("HTTP request failed")?;
let mut chunk = vec![0u8; 32 * 1024]; loop {
let n = response.read(&mut chunk).context("stream read error")?;
if n == 0 {
break;
}
{
let mut buf = inner.buf.lock().unwrap();
buf.extend_from_slice(&chunk[..n]);
}
inner.cond.notify_all();
}
Ok(())
}