Skip to main content

playterm_player/
engine.rs

1//! Audio playback engine.
2//!
3//! Runs entirely on a dedicated `std::thread` — no tokio inside this module.
4//! The TUI communicates via two `std::sync::mpsc` channels:
5//!
6//! - `PlayerCommand` (TUI → engine): play a URL, pause, resume, stop, set volume.
7//! - `PlayerEvent`  (engine → TUI): progress ticks, track-ended, errors.
8
9use std::collections::VecDeque;
10use std::sync::{Arc, Mutex, mpsc};
11use std::time::Duration;
12
13use anyhow::Result;
14use rodio::{Decoder, DeviceSinkBuilder, Player};
15
16use crate::tap::SampleTap;
17
18type SampleBuffer = Arc<Mutex<VecDeque<f32>>>;
19
20// ── Public channel types ──────────────────────────────────────────────────────
21
22/// Commands sent from the TUI to the player thread.
23#[derive(Debug)]
24pub enum PlayerCommand {
25    /// Start playing the track at `url`. `duration` is the expected total
26    /// duration (from Subsonic metadata), used for progress display.
27    /// `gen` is a monotonically increasing counter from the TUI; the engine
28    /// uses it to discard stale downloads when multiple skips arrive quickly.
29    PlayUrl { url: String, duration: Option<Duration>, gen: u64 },
30    /// Append the next track to the player queue for gapless playback.
31    ///
32    /// Must only be sent in response to `PlayerEvent::AboutToFinish`.
33    /// Does NOT stop current playback.
34    EnqueueNext { url: String, duration: Option<Duration> },
35    Pause,
36    Resume,
37    Stop,
38    SetVolume(f32),
39    /// Seek to an absolute position in the current track.
40    Seek(Duration),
41    /// Stop playback and shut down the player thread cleanly.
42    Quit,
43}
44
45/// Events sent from the player thread back to the TUI.
46#[derive(Debug)]
47pub enum PlayerEvent {
48    TrackStarted,
49    /// Fired every ~500 ms. `total` is `None` when unknown.
50    Progress { elapsed: Duration, total: Option<Duration> },
51    /// Fired ~5 s before the current track ends. The TUI should respond with
52    /// `PlayerCommand::EnqueueNext` to enable gapless playback.
53    AboutToFinish,
54    /// Fired when a gaplessly enqueued track begins playing (elapsed resets).
55    TrackAdvanced,
56    TrackEnded,
57    Error(String),
58}
59
60// ── Engine spawn ──────────────────────────────────────────────────────────────
61
62/// Spawn the player thread.
63///
64/// Returns `(cmd_tx, evt_rx, join_handle, sample_buffer)`.  The caller should send
65/// `PlayerCommand::Quit` and then join the handle (with a timeout) on
66/// shutdown to ensure the audio device is released cleanly.
67///
68/// `sample_buffer` is a ring buffer of the most recent decoded f32 samples;
69/// the TUI reads it each frame to drive the visualizer FFT.
70pub fn spawn_player() -> (
71    mpsc::Sender<PlayerCommand>,
72    mpsc::Receiver<PlayerEvent>,
73    std::thread::JoinHandle<()>,
74    SampleBuffer,
75) {
76    let (cmd_tx, cmd_rx) = mpsc::channel::<PlayerCommand>();
77    let (evt_tx, evt_rx) = mpsc::channel::<PlayerEvent>();
78
79    let sample_buffer: SampleBuffer = Arc::new(Mutex::new(VecDeque::with_capacity(4096)));
80    let thread_buffer = sample_buffer.clone();
81
82    let handle = std::thread::Builder::new()
83        .name("playterm-player".into())
84        .spawn(move || player_thread(cmd_rx, evt_tx, thread_buffer))
85        .expect("failed to spawn player thread");
86
87    (cmd_tx, evt_rx, handle, sample_buffer)
88}
89
90// ── Player thread ─────────────────────────────────────────────────────────────
91
92fn player_thread(cmd_rx: mpsc::Receiver<PlayerCommand>, evt_tx: mpsc::Sender<PlayerEvent>, sample_buffer: SampleBuffer) {
93    // MixerDeviceSink must live for the duration of playback.
94    let mut device = match DeviceSinkBuilder::open_default_sink() {
95        Ok(d) => d,
96        Err(e) => {
97            let _ = evt_tx.send(PlayerEvent::Error(format!("audio device error: {e}")));
98            return;
99        }
100    };
101    // Suppress the default stderr message on drop — we control shutdown explicitly.
102    device.log_on_drop(false);
103
104    let player = Player::connect_new(&device.mixer());
105
106    // State for the current track.
107    let mut current_total: Option<Duration> = None;
108    // Tracks whether the previous tick saw a non-empty player (to detect natural end).
109    let mut was_playing = false;
110    // Gapless state.
111    let mut next_total: Option<Duration> = None;
112    let mut next_queued = false;
113    let mut about_to_finish_sent = false;
114    let mut prev_elapsed = Duration::ZERO;
115    // Skip-generation counter: updated every time a PlayUrl is received.
116    // Used to discard stale downloads when the user skips rapidly.
117    let mut skip_gen: u64 = 0;
118
119    'outer: loop {
120        // ── Drain all pending commands (non-blocking) ─────────────────────────
121        loop {
122            use mpsc::TryRecvError;
123            match cmd_rx.try_recv() {
124                Ok(PlayerCommand::Quit) => break 'outer,
125                Ok(PlayerCommand::PlayUrl { url, duration, gen }) => {
126                    // Before downloading, drain any further PlayUrl commands that
127                    // are already queued.  This turns N rapid skips into one fetch.
128                    play_url(
129                        url, duration, gen,
130                        &cmd_rx, &mut skip_gen,
131                        &player, &evt_tx,
132                        &mut current_total, &mut was_playing,
133                        &mut next_total, &mut next_queued,
134                        &mut about_to_finish_sent, &mut prev_elapsed,
135                        &sample_buffer,
136                    );
137                }
138                Ok(cmd) => handle_command(
139                    cmd,
140                    &player,
141                    &evt_tx,
142                    &mut current_total,
143                    &mut was_playing,
144                    &mut next_total,
145                    &mut next_queued,
146                    &mut about_to_finish_sent,
147                    &mut prev_elapsed,
148                    &sample_buffer,
149                ),
150                Err(TryRecvError::Empty) => break,
151                Err(TryRecvError::Disconnected) => break 'outer,
152            }
153        }
154
155        // ── Progress tick ─────────────────────────────────────────────────────
156        if !player.is_paused() && !player.empty() {
157            let elapsed = player.get_pos();
158
159            // Detect gapless track transition: elapsed resets to near zero
160            // while we know a next track was appended.  Use a 2 s window rather
161            // than 500 ms to tolerate rodio's first-tick imprecision.
162            if next_queued
163                && prev_elapsed > Duration::from_secs(2)
164                && elapsed < Duration::from_secs(2)
165            {
166                current_total = next_total.take();
167                next_queued = false;
168                about_to_finish_sent = false;
169                let _ = evt_tx.send(PlayerEvent::TrackAdvanced);
170            }
171            prev_elapsed = elapsed;
172
173            let _ = evt_tx.send(PlayerEvent::Progress {
174                elapsed,
175                total: current_total,
176            });
177
178            // Send AboutToFinish ~10 s before the end so the TUI can enqueue next.
179            // 10 s gives enough headroom for: player-thread sleep (≤500 ms) +
180            // TUI dispatch latency + full-track download + decode.
181            if !about_to_finish_sent && !next_queued {
182                if let Some(total) = current_total {
183                    let remaining = total.saturating_sub(elapsed);
184                    if remaining <= Duration::from_secs(10) && remaining > Duration::ZERO {
185                        about_to_finish_sent = true;
186                        let _ = evt_tx.send(PlayerEvent::AboutToFinish);
187                    }
188                }
189            }
190
191            was_playing = true;
192        }
193
194        // ── Natural track end detection (no next track was enqueued) ──────────
195        if was_playing && player.empty() {
196            was_playing = false;
197            current_total = None;
198            next_total = None;
199            next_queued = false;
200            about_to_finish_sent = false;
201            prev_elapsed = Duration::ZERO;
202            let _ = evt_tx.send(PlayerEvent::TrackEnded);
203        }
204
205        std::thread::sleep(Duration::from_millis(500));
206    }
207
208    // Stop playback before releasing the audio device.
209    player.stop();
210    drop(player);
211    drop(device);
212}
213
214/// Handle a `PlayUrl` command with skip-generation cancellation.
215///
216/// Before downloading, drains any further `PlayUrl` commands already queued
217/// in the channel — turning N rapid skips into a single fetch.  After the
218/// (blocking) download, checks the channel once more: if an even newer
219/// `PlayUrl` arrived while we were fetching, the download is discarded and
220/// we recurse for the new one.
221#[allow(clippy::too_many_arguments)]
222fn play_url(
223    url: String,
224    duration: Option<Duration>,
225    gen: u64,
226    cmd_rx: &mpsc::Receiver<PlayerCommand>,
227    skip_gen: &mut u64,
228    player: &Player,
229    evt_tx: &mpsc::Sender<PlayerEvent>,
230    current_total: &mut Option<Duration>,
231    was_playing: &mut bool,
232    next_total: &mut Option<Duration>,
233    next_queued: &mut bool,
234    about_to_finish_sent: &mut bool,
235    prev_elapsed: &mut Duration,
236    sample_buffer: &SampleBuffer,
237) {
238    // Update skip_gen for this command.
239    *skip_gen = gen;
240
241    // ── Pre-download drain ────────────────────────────────────────────────────
242    // Consume any later PlayUrl commands already sitting in the channel.
243    // Non-PlayUrl commands are dropped here; in practice the only commands
244    // that arrive between rapid skips are more PlayUrl commands.
245    let mut final_url = url;
246    let mut final_duration = duration;
247    let mut final_gen = gen;
248    loop {
249        match cmd_rx.try_recv() {
250            Ok(PlayerCommand::PlayUrl { url: u, duration: d, gen: g }) => {
251                final_url = u;
252                final_duration = d;
253                final_gen = g;
254                *skip_gen = g;
255            }
256            Ok(_other) => break, // non-play command; stop lookahead
257            Err(_) => break,
258        }
259    }
260
261    // Stop current playback and reset all state before the (slow) fetch.
262    player.stop();
263    *was_playing = false;
264    *next_total = None;
265    *next_queued = false;
266    *about_to_finish_sent = false;
267    *prev_elapsed = Duration::ZERO;
268
269    // ── Network fetch (blocking) ──────────────────────────────────────────────
270    let source = match download_and_decode(&final_url) {
271        Ok(s) => s,
272        Err(e) => {
273            let _ = evt_tx.send(PlayerEvent::Error(format!("playback error: {e}")));
274            return;
275        }
276    };
277
278    // ── Post-download drain ───────────────────────────────────────────────────
279    // If the user skipped again while we were fetching, discard this result
280    // and handle the newer command instead.
281    let mut newer: Option<(String, Option<Duration>, u64)> = None;
282    loop {
283        match cmd_rx.try_recv() {
284            Ok(PlayerCommand::PlayUrl { url: u, duration: d, gen: g }) => {
285                *skip_gen = g;
286                newer = Some((u, d, g));
287            }
288            Ok(_other) => break,
289            Err(_) => break,
290        }
291    }
292
293    if *skip_gen != final_gen {
294        // A newer PlayUrl arrived mid-download — discard this source.
295        drop(source);
296        if let Some((u, d, g)) = newer {
297            play_url(
298                u, d, g, cmd_rx, skip_gen,
299                player, evt_tx,
300                current_total, was_playing,
301                next_total, next_queued,
302                about_to_finish_sent, prev_elapsed,
303                sample_buffer,
304            );
305        }
306        return;
307    }
308
309    // ── Commit ────────────────────────────────────────────────────────────────
310    *current_total = final_duration;
311    let tapped = SampleTap::new(source, sample_buffer.clone());
312    player.append(tapped);
313    player.play();
314    let _ = evt_tx.send(PlayerEvent::TrackStarted);
315}
316
317fn handle_command(
318    cmd: PlayerCommand,
319    player: &Player,
320    evt_tx: &mpsc::Sender<PlayerEvent>,
321    current_total: &mut Option<Duration>,
322    was_playing: &mut bool,
323    next_total: &mut Option<Duration>,
324    next_queued: &mut bool,
325    about_to_finish_sent: &mut bool,
326    prev_elapsed: &mut Duration,
327    sample_buffer: &SampleBuffer,
328) {
329    match cmd {
330        PlayerCommand::PlayUrl { .. } => {
331            // Handled by play_url() in the main loop — should not reach here.
332            unreachable!("PlayUrl must be dispatched via play_url()");
333        }
334        PlayerCommand::EnqueueNext { url, duration } => {
335            match download_and_decode(&url) {
336                Ok(source) => {
337                    *next_total = duration;
338                    *next_queued = true;
339                    let tapped = SampleTap::new(source, sample_buffer.clone());
340                    player.append(tapped);
341                }
342                Err(e) => {
343                    let _ = evt_tx.send(PlayerEvent::Error(format!("enqueue error: {e}")));
344                }
345            }
346        }
347        PlayerCommand::Pause => player.pause(),
348        PlayerCommand::Resume => player.play(),
349        PlayerCommand::Stop => {
350            player.stop();
351            *current_total = None;
352            *next_total = None;
353            *next_queued = false;
354            *about_to_finish_sent = false;
355            *prev_elapsed = Duration::ZERO;
356            *was_playing = false;
357        }
358        PlayerCommand::SetVolume(v) => player.set_volume(v),
359        PlayerCommand::Seek(pos) => {
360            let _ = player.try_seek(pos);
361            // Update prev_elapsed so the gapless-transition heuristic isn't
362            // confused by the sudden position jump.
363            *prev_elapsed = pos;
364        }
365        PlayerCommand::Quit => {
366            // Handled by the 'outer break in player_thread — should not reach here.
367            unreachable!("Quit must be handled in the outer command-drain loop");
368        }
369    }
370}
371
372// ── Stream + decode ───────────────────────────────────────────────────────────
373
374fn download_and_decode(url: &str) -> Result<Decoder<std::io::Cursor<Vec<u8>>>> {
375    // Download the full track into RAM so symphonia gets an unambiguously
376    // seekable Cursor<Vec<u8>>.  StreamingReader over a BufReader was technically
377    // seekable but symphonia's demuxer cached seekability as false during probe,
378    // causing every try_seek to return Err(Unseekable).
379    //
380    // with_byte_len: tells symphonia the exact byte length, which also sets
381    //   is_seekable = true internally.
382    // with_coarse_seek: bypasses the time_base requirement for accurate seeking
383    //   (unavailable on transcoded MP3 streams); seeks to the nearest keyframe.
384    let bytes = reqwest::blocking::get(url)?.bytes()?;
385    let byte_len = bytes.len() as u64;
386    let cursor = std::io::Cursor::new(bytes.to_vec());
387    let decoder = Decoder::builder()
388        .with_data(cursor)
389        .with_byte_len(byte_len)
390        .with_coarse_seek(true)
391        .build()?;
392    Ok(decoder)
393}