Skip to main content

playterm_player/
stream.rs

1//! Streaming HTTP → growing-buffer reader for the audio pipeline.
2//!
3//! `open_stream` starts a background thread that downloads `url` via
4//! `reqwest::blocking`, appending chunks to a shared `Vec<u8>`.  The
5//! `StreamingReader` returned immediately exposes that buffer as a `Read +
6//! Seek` handle that blocks only when the read position overtakes the download.
7//!
8//! Keeping all bytes (never freeing) means backward seeks always succeed
9//! without re-fetching.  Trade-off: memory grows with the file (~14–30 MB for
10//! a typical song), which is acceptable for a music player.
11//!
12//! Playback starts as soon as 256 KB have been buffered (PREBUFFER_BYTES).
13
14use std::io::{Read, Seek, SeekFrom};
15use std::sync::{Arc, Condvar, Mutex};
16use std::sync::atomic::{AtomicBool, Ordering};
17
18use anyhow::{Context, Result};
19
20// ── Constants ─────────────────────────────────────────────────────────────────
21
22/// Minimum bytes buffered before `open_stream` returns.
23const PREBUFFER_BYTES: usize = 256 * 1024;
24
25// ── Shared inner state ────────────────────────────────────────────────────────
26
27struct StreamInner {
28    /// Append-only byte buffer.  Never shrinks.
29    buf: Mutex<Vec<u8>>,
30    /// Signalled after every chunk appended (and after download completes).
31    cond: Condvar,
32    /// Set to `true` once the download thread has finished (success or error).
33    done: AtomicBool,
34}
35
36// ── Public reader ─────────────────────────────────────────────────────────────
37
38/// A `Read + Seek` handle to a streaming HTTP response.
39pub struct StreamingReader {
40    inner: Arc<StreamInner>,
41    pos: u64,
42}
43
44impl Read for StreamingReader {
45    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
46        let pos = self.pos as usize;
47        // Block until there's data at `pos` or the download is done.
48        let buf = {
49            let guard = self.inner.buf.lock().unwrap();
50            self.inner
51                .cond
52                .wait_while(guard, |b| b.len() <= pos && !self.inner.done.load(Ordering::Acquire))
53                .unwrap()
54        };
55        let available = buf.len().saturating_sub(pos);
56        if available == 0 {
57            return Ok(0); // EOF
58        }
59        let n = out.len().min(available);
60        out[..n].copy_from_slice(&buf[pos..pos + n]);
61        drop(buf);
62        self.pos += n as u64;
63        Ok(n)
64    }
65}
66
67impl Seek for StreamingReader {
68    fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> {
69        let new_pos: i64 = match from {
70            SeekFrom::Start(off) => off as i64,
71            SeekFrom::Current(off) => self.pos as i64 + off,
72            SeekFrom::End(off) => {
73                // Must wait until download is complete to know total length.
74                let buf = {
75                    let guard = self.inner.buf.lock().unwrap();
76                    self.inner
77                        .cond
78                        .wait_while(guard, |_| !self.inner.done.load(Ordering::Acquire))
79                        .unwrap()
80                };
81                let len = buf.len() as i64;
82                drop(buf);
83                len + off
84            }
85        };
86        if new_pos < 0 {
87            return Err(std::io::Error::new(
88                std::io::ErrorKind::InvalidInput,
89                "seek before start of stream",
90            ));
91        }
92        self.pos = new_pos as u64;
93        Ok(self.pos)
94    }
95}
96
97// ── Public constructor ────────────────────────────────────────────────────────
98
99/// Start streaming `url` in a background thread and return a reader that
100/// blocks only when the read position overtakes the download.
101///
102/// Returns after `PREBUFFER_BYTES` have been buffered (or the download
103/// completes, whichever comes first).
104pub fn open_stream(url: &str) -> Result<StreamingReader> {
105    let inner = Arc::new(StreamInner {
106        buf: Mutex::new(Vec::new()),
107        cond: Condvar::new(),
108        done: AtomicBool::new(false),
109    });
110
111    // Spawn download thread.
112    let inner_dl = inner.clone();
113    let url = url.to_owned();
114    std::thread::Builder::new()
115        .name("playterm-stream".into())
116        .spawn(move || download_thread(&url, inner_dl))
117        .context("failed to spawn stream thread")?;
118
119    // Wait until PREBUFFER_BYTES are ready (or download finishes early).
120    {
121        let guard = inner.buf.lock().unwrap();
122        let _guard = inner
123            .cond
124            .wait_while(guard, |b| {
125                b.len() < PREBUFFER_BYTES && !inner.done.load(Ordering::Acquire)
126            })
127            .unwrap();
128    }
129
130    Ok(StreamingReader { inner, pos: 0 })
131}
132
133// ── Download thread ───────────────────────────────────────────────────────────
134
135fn download_thread(url: &str, inner: Arc<StreamInner>) {
136    let _ = download_into(url, &inner);
137    inner.done.store(true, Ordering::Release);
138    inner.cond.notify_all();
139}
140
141fn download_into(url: &str, inner: &StreamInner) -> Result<()> {
142    use std::io::Read as _;
143    let mut response = reqwest::blocking::get(url).context("HTTP request failed")?;
144    let mut chunk = vec![0u8; 32 * 1024]; // 32 KB read buffer
145    loop {
146        let n = response.read(&mut chunk).context("stream read error")?;
147        if n == 0 {
148            break;
149        }
150        {
151            let mut buf = inner.buf.lock().unwrap();
152            buf.extend_from_slice(&chunk[..n]);
153        }
154        inner.cond.notify_all();
155    }
156    Ok(())
157}