playterm_player/
stream.rs1use std::io::{Read, Seek, SeekFrom};
15use std::sync::{Arc, Condvar, Mutex};
16use std::sync::atomic::{AtomicBool, Ordering};
17
18use anyhow::{Context, Result};
19
20const PREBUFFER_BYTES: usize = 256 * 1024;
24
25struct StreamInner {
28 buf: Mutex<Vec<u8>>,
30 cond: Condvar,
32 done: AtomicBool,
34}
35
36pub struct StreamingReader {
40 inner: Arc<StreamInner>,
41 pos: u64,
42}
43
44impl Read for StreamingReader {
45 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
46 let pos = self.pos as usize;
47 let buf = {
49 let guard = self.inner.buf.lock().unwrap();
50 self.inner
51 .cond
52 .wait_while(guard, |b| b.len() <= pos && !self.inner.done.load(Ordering::Acquire))
53 .unwrap()
54 };
55 let available = buf.len().saturating_sub(pos);
56 if available == 0 {
57 return Ok(0); }
59 let n = out.len().min(available);
60 out[..n].copy_from_slice(&buf[pos..pos + n]);
61 drop(buf);
62 self.pos += n as u64;
63 Ok(n)
64 }
65}
66
67impl Seek for StreamingReader {
68 fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> {
69 let new_pos: i64 = match from {
70 SeekFrom::Start(off) => off as i64,
71 SeekFrom::Current(off) => self.pos as i64 + off,
72 SeekFrom::End(off) => {
73 let buf = {
75 let guard = self.inner.buf.lock().unwrap();
76 self.inner
77 .cond
78 .wait_while(guard, |_| !self.inner.done.load(Ordering::Acquire))
79 .unwrap()
80 };
81 let len = buf.len() as i64;
82 drop(buf);
83 len + off
84 }
85 };
86 if new_pos < 0 {
87 return Err(std::io::Error::new(
88 std::io::ErrorKind::InvalidInput,
89 "seek before start of stream",
90 ));
91 }
92 self.pos = new_pos as u64;
93 Ok(self.pos)
94 }
95}
96
97pub fn open_stream(url: &str) -> Result<StreamingReader> {
105 let inner = Arc::new(StreamInner {
106 buf: Mutex::new(Vec::new()),
107 cond: Condvar::new(),
108 done: AtomicBool::new(false),
109 });
110
111 let inner_dl = inner.clone();
113 let url = url.to_owned();
114 std::thread::Builder::new()
115 .name("playterm-stream".into())
116 .spawn(move || download_thread(&url, inner_dl))
117 .context("failed to spawn stream thread")?;
118
119 {
121 let guard = inner.buf.lock().unwrap();
122 let _guard = inner
123 .cond
124 .wait_while(guard, |b| {
125 b.len() < PREBUFFER_BYTES && !inner.done.load(Ordering::Acquire)
126 })
127 .unwrap();
128 }
129
130 Ok(StreamingReader { inner, pos: 0 })
131}
132
133fn download_thread(url: &str, inner: Arc<StreamInner>) {
136 let _ = download_into(url, &inner);
137 inner.done.store(true, Ordering::Release);
138 inner.cond.notify_all();
139}
140
141fn download_into(url: &str, inner: &StreamInner) -> Result<()> {
142 use std::io::Read as _;
143 let mut response = reqwest::blocking::get(url).context("HTTP request failed")?;
144 let mut chunk = vec![0u8; 32 * 1024]; loop {
146 let n = response.read(&mut chunk).context("stream read error")?;
147 if n == 0 {
148 break;
149 }
150 {
151 let mut buf = inner.buf.lock().unwrap();
152 buf.extend_from_slice(&chunk[..n]);
153 }
154 inner.cond.notify_all();
155 }
156 Ok(())
157}