playterm_player/engine.rs
1//! Audio playback engine.
2//!
3//! Runs entirely on a dedicated `std::thread` — no tokio inside this module.
4//! The TUI communicates via two `std::sync::mpsc` channels:
5//!
6//! - `PlayerCommand` (TUI → engine): play a URL, pause, resume, stop, set volume.
7//! - `PlayerEvent` (engine → TUI): progress ticks, track-ended, errors.
8
9use std::collections::VecDeque;
10use std::sync::{Arc, Mutex, mpsc};
11use std::time::Duration;
12
13use anyhow::Result;
14use rodio::{Decoder, DeviceSinkBuilder, Player};
15
16use crate::tap::SampleTap;
17
18type SampleBuffer = Arc<Mutex<VecDeque<f32>>>;
19
20// ── Public channel types ──────────────────────────────────────────────────────
21
22/// Commands sent from the TUI to the player thread.
23#[derive(Debug)]
24pub enum PlayerCommand {
25 /// Start playing the track at `url`. `duration` is the expected total
26 /// duration (from Subsonic metadata), used for progress display.
27 /// `gen` is a monotonically increasing counter from the TUI; the engine
28 /// uses it to discard stale downloads when multiple skips arrive quickly.
29 PlayUrl { url: String, duration: Option<Duration>, gen: u64 },
30 /// Append the next track to the player queue for gapless playback.
31 ///
32 /// Must only be sent in response to `PlayerEvent::AboutToFinish`.
33 /// Does NOT stop current playback.
34 EnqueueNext { url: String, duration: Option<Duration> },
35 Pause,
36 Resume,
37 Stop,
38 SetVolume(f32),
39 /// Seek to an absolute position in the current track.
40 Seek(Duration),
41 /// Stop playback and shut down the player thread cleanly.
42 Quit,
43}
44
45/// Events sent from the player thread back to the TUI.
46#[derive(Debug)]
47pub enum PlayerEvent {
48 TrackStarted,
49 /// Fired every ~500 ms. `total` is `None` when unknown.
50 Progress { elapsed: Duration, total: Option<Duration> },
51 /// Fired ~5 s before the current track ends. The TUI should respond with
52 /// `PlayerCommand::EnqueueNext` to enable gapless playback.
53 AboutToFinish,
54 /// Fired when a gaplessly enqueued track begins playing (elapsed resets).
55 TrackAdvanced,
56 TrackEnded,
57 Error(String),
58}
59
60// ── Engine spawn ──────────────────────────────────────────────────────────────
61
62/// Spawn the player thread.
63///
64/// Returns `(cmd_tx, evt_rx, join_handle, sample_buffer)`. The caller should send
65/// `PlayerCommand::Quit` and then join the handle (with a timeout) on
66/// shutdown to ensure the audio device is released cleanly.
67///
68/// `sample_buffer` is a ring buffer of the most recent decoded f32 samples;
69/// the TUI reads it each frame to drive the visualizer FFT.
70pub fn spawn_player() -> (
71 mpsc::Sender<PlayerCommand>,
72 mpsc::Receiver<PlayerEvent>,
73 std::thread::JoinHandle<()>,
74 SampleBuffer,
75) {
76 let (cmd_tx, cmd_rx) = mpsc::channel::<PlayerCommand>();
77 let (evt_tx, evt_rx) = mpsc::channel::<PlayerEvent>();
78
79 let sample_buffer: SampleBuffer = Arc::new(Mutex::new(VecDeque::with_capacity(4096)));
80 let thread_buffer = sample_buffer.clone();
81
82 let handle = std::thread::Builder::new()
83 .name("playterm-player".into())
84 .spawn(move || player_thread(cmd_rx, evt_tx, thread_buffer))
85 .expect("failed to spawn player thread");
86
87 (cmd_tx, evt_rx, handle, sample_buffer)
88}
89
90// ── Player thread ─────────────────────────────────────────────────────────────
91
92fn player_thread(cmd_rx: mpsc::Receiver<PlayerCommand>, evt_tx: mpsc::Sender<PlayerEvent>, sample_buffer: SampleBuffer) {
93 // MixerDeviceSink must live for the duration of playback.
94 let mut device = match DeviceSinkBuilder::open_default_sink() {
95 Ok(d) => d,
96 Err(e) => {
97 let _ = evt_tx.send(PlayerEvent::Error(format!("audio device error: {e}")));
98 return;
99 }
100 };
101 // Suppress the default stderr message on drop — we control shutdown explicitly.
102 device.log_on_drop(false);
103
104 let player = Player::connect_new(&device.mixer());
105
106 // State for the current track.
107 let mut current_total: Option<Duration> = None;
108 // Tracks whether the previous tick saw a non-empty player (to detect natural end).
109 let mut was_playing = false;
110 // Gapless state.
111 let mut next_total: Option<Duration> = None;
112 let mut next_queued = false;
113 let mut about_to_finish_sent = false;
114 let mut prev_elapsed = Duration::ZERO;
115 // Skip-generation counter: updated every time a PlayUrl is received.
116 // Used to discard stale downloads when the user skips rapidly.
117 let mut skip_gen: u64 = 0;
118
119 'outer: loop {
120 // ── Drain all pending commands (non-blocking) ─────────────────────────
121 loop {
122 use mpsc::TryRecvError;
123 match cmd_rx.try_recv() {
124 Ok(PlayerCommand::Quit) => break 'outer,
125 Ok(PlayerCommand::PlayUrl { url, duration, gen }) => {
126 // Before downloading, drain any further PlayUrl commands that
127 // are already queued. This turns N rapid skips into one fetch.
128 play_url(
129 url, duration, gen,
130 &cmd_rx, &mut skip_gen,
131 &player, &evt_tx,
132 &mut current_total, &mut was_playing,
133 &mut next_total, &mut next_queued,
134 &mut about_to_finish_sent, &mut prev_elapsed,
135 &sample_buffer,
136 );
137 }
138 Ok(cmd) => handle_command(
139 cmd,
140 &player,
141 &evt_tx,
142 &mut current_total,
143 &mut was_playing,
144 &mut next_total,
145 &mut next_queued,
146 &mut about_to_finish_sent,
147 &mut prev_elapsed,
148 &sample_buffer,
149 ),
150 Err(TryRecvError::Empty) => break,
151 Err(TryRecvError::Disconnected) => break 'outer,
152 }
153 }
154
155 // ── Progress tick ─────────────────────────────────────────────────────
156 if !player.is_paused() && !player.empty() {
157 let elapsed = player.get_pos();
158
159 // Detect gapless track transition: elapsed resets to near zero
160 // while we know a next track was appended. Use a 2 s window rather
161 // than 500 ms to tolerate rodio's first-tick imprecision.
162 if next_queued
163 && prev_elapsed > Duration::from_secs(2)
164 && elapsed < Duration::from_secs(2)
165 {
166 current_total = next_total.take();
167 next_queued = false;
168 about_to_finish_sent = false;
169 let _ = evt_tx.send(PlayerEvent::TrackAdvanced);
170 }
171 prev_elapsed = elapsed;
172
173 let _ = evt_tx.send(PlayerEvent::Progress {
174 elapsed,
175 total: current_total,
176 });
177
178 // Send AboutToFinish ~10 s before the end so the TUI can enqueue next.
179 // 10 s gives enough headroom for: player-thread sleep (≤500 ms) +
180 // TUI dispatch latency + full-track download + decode.
181 if !about_to_finish_sent && !next_queued {
182 if let Some(total) = current_total {
183 let remaining = total.saturating_sub(elapsed);
184 if remaining <= Duration::from_secs(10) && remaining > Duration::ZERO {
185 about_to_finish_sent = true;
186 let _ = evt_tx.send(PlayerEvent::AboutToFinish);
187 }
188 }
189 }
190
191 was_playing = true;
192 }
193
194 // ── Natural track end detection (no next track was enqueued) ──────────
195 if was_playing && player.empty() {
196 was_playing = false;
197 current_total = None;
198 next_total = None;
199 next_queued = false;
200 about_to_finish_sent = false;
201 prev_elapsed = Duration::ZERO;
202 let _ = evt_tx.send(PlayerEvent::TrackEnded);
203 }
204
205 std::thread::sleep(Duration::from_millis(500));
206 }
207
208 // Stop playback before releasing the audio device.
209 player.stop();
210 drop(player);
211 drop(device);
212}
213
214/// Handle a `PlayUrl` command with skip-generation cancellation.
215///
216/// Before downloading, drains any further `PlayUrl` commands already queued
217/// in the channel — turning N rapid skips into a single fetch. After the
218/// (blocking) download, checks the channel once more: if an even newer
219/// `PlayUrl` arrived while we were fetching, the download is discarded and
220/// we recurse for the new one.
221#[allow(clippy::too_many_arguments)]
222fn play_url(
223 url: String,
224 duration: Option<Duration>,
225 gen: u64,
226 cmd_rx: &mpsc::Receiver<PlayerCommand>,
227 skip_gen: &mut u64,
228 player: &Player,
229 evt_tx: &mpsc::Sender<PlayerEvent>,
230 current_total: &mut Option<Duration>,
231 was_playing: &mut bool,
232 next_total: &mut Option<Duration>,
233 next_queued: &mut bool,
234 about_to_finish_sent: &mut bool,
235 prev_elapsed: &mut Duration,
236 sample_buffer: &SampleBuffer,
237) {
238 // Update skip_gen for this command.
239 *skip_gen = gen;
240
241 // ── Pre-download drain ────────────────────────────────────────────────────
242 // Consume any later PlayUrl commands already sitting in the channel.
243 // Non-PlayUrl commands are dropped here; in practice the only commands
244 // that arrive between rapid skips are more PlayUrl commands.
245 let mut final_url = url;
246 let mut final_duration = duration;
247 let mut final_gen = gen;
248 loop {
249 match cmd_rx.try_recv() {
250 Ok(PlayerCommand::PlayUrl { url: u, duration: d, gen: g }) => {
251 final_url = u;
252 final_duration = d;
253 final_gen = g;
254 *skip_gen = g;
255 }
256 Ok(_other) => break, // non-play command; stop lookahead
257 Err(_) => break,
258 }
259 }
260
261 // Stop current playback and reset all state before the (slow) fetch.
262 player.stop();
263 *was_playing = false;
264 *next_total = None;
265 *next_queued = false;
266 *about_to_finish_sent = false;
267 *prev_elapsed = Duration::ZERO;
268
269 // ── Network fetch (blocking) ──────────────────────────────────────────────
270 let source = match download_and_decode(&final_url) {
271 Ok(s) => s,
272 Err(e) => {
273 let _ = evt_tx.send(PlayerEvent::Error(format!("playback error: {e}")));
274 return;
275 }
276 };
277
278 // ── Post-download drain ───────────────────────────────────────────────────
279 // If the user skipped again while we were fetching, discard this result
280 // and handle the newer command instead.
281 let mut newer: Option<(String, Option<Duration>, u64)> = None;
282 loop {
283 match cmd_rx.try_recv() {
284 Ok(PlayerCommand::PlayUrl { url: u, duration: d, gen: g }) => {
285 *skip_gen = g;
286 newer = Some((u, d, g));
287 }
288 Ok(_other) => break,
289 Err(_) => break,
290 }
291 }
292
293 if *skip_gen != final_gen {
294 // A newer PlayUrl arrived mid-download — discard this source.
295 drop(source);
296 if let Some((u, d, g)) = newer {
297 play_url(
298 u, d, g, cmd_rx, skip_gen,
299 player, evt_tx,
300 current_total, was_playing,
301 next_total, next_queued,
302 about_to_finish_sent, prev_elapsed,
303 sample_buffer,
304 );
305 }
306 return;
307 }
308
309 // ── Commit ────────────────────────────────────────────────────────────────
310 *current_total = final_duration;
311 let tapped = SampleTap::new(source, sample_buffer.clone());
312 player.append(tapped);
313 player.play();
314 let _ = evt_tx.send(PlayerEvent::TrackStarted);
315}
316
317fn handle_command(
318 cmd: PlayerCommand,
319 player: &Player,
320 evt_tx: &mpsc::Sender<PlayerEvent>,
321 current_total: &mut Option<Duration>,
322 was_playing: &mut bool,
323 next_total: &mut Option<Duration>,
324 next_queued: &mut bool,
325 about_to_finish_sent: &mut bool,
326 prev_elapsed: &mut Duration,
327 sample_buffer: &SampleBuffer,
328) {
329 match cmd {
330 PlayerCommand::PlayUrl { .. } => {
331 // Handled by play_url() in the main loop — should not reach here.
332 unreachable!("PlayUrl must be dispatched via play_url()");
333 }
334 PlayerCommand::EnqueueNext { url, duration } => {
335 match download_and_decode(&url) {
336 Ok(source) => {
337 *next_total = duration;
338 *next_queued = true;
339 let tapped = SampleTap::new(source, sample_buffer.clone());
340 player.append(tapped);
341 }
342 Err(e) => {
343 let _ = evt_tx.send(PlayerEvent::Error(format!("enqueue error: {e}")));
344 }
345 }
346 }
347 PlayerCommand::Pause => player.pause(),
348 PlayerCommand::Resume => player.play(),
349 PlayerCommand::Stop => {
350 player.stop();
351 *current_total = None;
352 *next_total = None;
353 *next_queued = false;
354 *about_to_finish_sent = false;
355 *prev_elapsed = Duration::ZERO;
356 *was_playing = false;
357 }
358 PlayerCommand::SetVolume(v) => player.set_volume(v),
359 PlayerCommand::Seek(pos) => {
360 let _ = player.try_seek(pos);
361 // Update prev_elapsed so the gapless-transition heuristic isn't
362 // confused by the sudden position jump.
363 *prev_elapsed = pos;
364 }
365 PlayerCommand::Quit => {
366 // Handled by the 'outer break in player_thread — should not reach here.
367 unreachable!("Quit must be handled in the outer command-drain loop");
368 }
369 }
370}
371
372// ── Stream + decode ───────────────────────────────────────────────────────────
373
374fn download_and_decode(url: &str) -> Result<Decoder<std::io::Cursor<Vec<u8>>>> {
375 // Download the full track into RAM so symphonia gets an unambiguously
376 // seekable Cursor<Vec<u8>>. StreamingReader over a BufReader was technically
377 // seekable but symphonia's demuxer cached seekability as false during probe,
378 // causing every try_seek to return Err(Unseekable).
379 //
380 // with_byte_len: tells symphonia the exact byte length, which also sets
381 // is_seekable = true internally.
382 // with_coarse_seek: bypasses the time_base requirement for accurate seeking
383 // (unavailable on transcoded MP3 streams); seeks to the nearest keyframe.
384 let bytes = reqwest::blocking::get(url)?.bytes()?;
385 let byte_len = bytes.len() as u64;
386 let cursor = std::io::Cursor::new(bytes.to_vec());
387 let decoder = Decoder::builder()
388 .with_data(cursor)
389 .with_byte_len(byte_len)
390 .with_coarse_seek(true)
391 .build()?;
392 Ok(decoder)
393}