1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicBool, AtomicU32, Ordering},
5 mpsc::{Receiver, Sender, TryRecvError, channel},
6 },
7 thread,
8};
9
10use lunar_lib::error;
11use ringbuf::traits::Observer;
12
13use crate::{
14 SHUTDOWN,
15 event_handler::EventTx,
16 player::{
17 AtomicPlaybackStatus, OpenedDecoder, PlaybackStatus, PlayerError, PlayerEvent,
18 PlayerRequest, PlayerTx,
19 playback::{CpalHandle, DeviceConfig},
20 },
21 playlist::PlayingTrack,
22 wait,
23};
24
25pub struct DecoderHandle {
26 rx: Receiver<DecoderCommand>,
27 player_tx: Sender<PlayerRequest>,
28 event_tx: Sender<PlayerEvent>,
29
30 cpal_handle: Option<CpalHandle>,
31 device_config: Arc<Mutex<DeviceConfig>>,
32
33 current_decoder: Option<OpenedDecoder>,
34 preload_decoder: Option<OpenedDecoder>,
35
36 playback_state: Arc<AtomicPlaybackStatus>,
37 looping: Arc<AtomicBool>,
38 volume: Arc<AtomicU32>,
39}
40
41impl DecoderHandle {
42 pub(crate) fn open(
43 player_tx: Sender<PlayerRequest>,
44 event_tx: Sender<PlayerEvent>,
45 device_config: Arc<Mutex<DeviceConfig>>,
46 playback_state: Arc<AtomicPlaybackStatus>,
47 volume: Arc<AtomicU32>,
48 looping: Arc<AtomicBool>,
49 ) -> Result<Sender<DecoderCommand>, PlayerError> {
50 let (tx, rx) = channel();
51
52 let handle = Self {
53 rx,
54 player_tx,
55 event_tx,
56
57 cpal_handle: None,
58 device_config,
59
60 current_decoder: None,
61 preload_decoder: None,
62
63 playback_state,
64 looping,
65 volume,
66 };
67
68 thread::spawn(move || {
69 if let Err(err) = handle.run() {
70 error!("DecoderHandle failed with error: {err}");
71 }
72 SHUTDOWN.store(true, Ordering::Relaxed);
73 });
74
75 Ok(tx)
76 }
77
78 fn run(mut self) -> Result<(), PlayerError> {
79 loop {
80 if SHUTDOWN.load(Ordering::Relaxed) {
81 return Ok(());
82 }
83
84 loop {
85 match self.rx.try_recv() {
86 Ok(command) => self.run_command(command)?,
87 Err(TryRecvError::Empty) => break,
88 Err(TryRecvError::Disconnected) => return Ok(()),
89 }
90 }
91
92 if !matches!(
93 self.playback_state.load(Ordering::Relaxed),
94 PlaybackStatus::Playing
95 ) {
96 wait();
97 continue;
98 }
99
100 if let Some(cpal_handle) = self.cpal_handle.as_mut()
101 && let Some(finished_consuming) = cpal_handle.consume_packet()
102 && !finished_consuming
103 {
104 wait();
105 continue;
106 }
107
108 let Some(mut decoder) = self.current_decoder.take() else {
109 if let Some(ref cpal_handle) = self.cpal_handle
110 && !cpal_handle.audio_buf.is_empty()
111 {
112 wait();
113 continue;
114 }
115
116 self.cpal_handle = None;
117 self.playback_state
118 .store(PlaybackStatus::Stopped, Ordering::SeqCst);
119 self.event_tx.event(PlayerEvent::PlaybackStopped);
120 continue;
121 };
122
123 let mut cpal_handle = self.get_cpal_handle()?;
125
126 if decoder.at_eof {
127 if self.looping.load(Ordering::Relaxed) {
128 decoder.seek(0.0, false)?;
129 decoder.started_at = None;
130 decoder.sent_scrobble = false;
131 decoder.decoded_frames = 0;
132
133 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
134 currently_playing: decoder.decoded_from.clone(),
135 });
136
137 self.current_decoder = Some(decoder);
138 self.cpal_handle = Some(cpal_handle);
139 continue;
140 }
141
142 if let Some(preload) = self.preload_decoder.take() {
143 self.player_tx
144 .decoder_event(DecoderEvent::PreloadConsumed)?;
145
146 let currently_playing = preload.decoded_from.clone();
147
148 decoder = preload;
149
150 self.event_tx
151 .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
152 } else {
153 self.cpal_handle = Some(cpal_handle);
154 continue;
155 }
156 }
157
158 if decoder.started() {
159 self.player_tx.decoder_event(DecoderEvent::NowPlaying {
160 track: decoder.decoded_from.clone(),
161 })?;
162 }
163
164 let packet = decoder.decode_next_packet()?;
165 cpal_handle.pending_packet = packet;
166
167 let decoded_time = decoder.decoded_time();
168 if (decoded_time >= (decoder.decoded_from.source.container().stream().duration() / 2.0)
169 || decoded_time >= 240.0)
170 && !decoder.sent_scrobble
171 {
172 decoder.sent_scrobble = true;
173 self.player_tx.decoder_event(DecoderEvent::Scrobble {
174 track: decoder.decoded_from.clone(),
175 start_time: decoder.start_time().unwrap(),
176 })?;
177 }
178
179 self.cpal_handle = Some(cpal_handle);
180 self.current_decoder = Some(decoder);
181 }
182 }
183
184 fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
185 match command {
186 DecoderCommand::Load { decoder } => {
187 let currently_playing = decoder.decoded_from.clone();
188 self.current_decoder = Some(*decoder);
189 self.event_tx
190 .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
191 }
192 DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
193 DecoderCommand::LoadAndPreload { load, preload } => {
194 let currently_playing = load.decoded_from.clone();
195 self.current_decoder = Some(*load);
196 self.preload_decoder = Some(*preload);
197 self.event_tx
198 .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
199 }
200 DecoderCommand::Unpreload => {
201 self.preload_decoder = None;
202 }
203 DecoderCommand::SetPlaying(playing) => {
204 if let Some(ref decoder) = self.current_decoder {
205 let new_state = if playing {
206 PlaybackStatus::Playing
207 } else {
208 PlaybackStatus::Paused
209 };
210
211 if self.playback_state.load(Ordering::SeqCst) == new_state {
212 return Ok(());
213 }
214
215 self.playback_state.store(new_state, Ordering::SeqCst);
216
217 self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
218 is_playing: playing,
219 changed_at: decoder.time(),
220 });
221 }
222 }
223 DecoderCommand::TogglePlaying { callback } => {
224 let current_state = self.playback_state.load(Ordering::SeqCst);
225 if let Some(ref decoder) = self.current_decoder {
226 let new_state = match current_state {
227 PlaybackStatus::Playing => PlaybackStatus::Paused,
228 PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
229 };
230
231 self.playback_state.store(new_state, Ordering::SeqCst);
232 callback.send(new_state)?;
233
234 self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
235 is_playing: matches!(new_state, PlaybackStatus::Playing),
236 changed_at: decoder.time(),
237 });
238 } else {
239 callback.send(PlaybackStatus::Stopped)?;
240 }
241 }
242 DecoderCommand::Seek {
243 time,
244 increment,
245 callback,
246 } => {
247 if let Some(ref mut opened_decoder) = self.current_decoder {
248 let time = opened_decoder.seek(time, increment)?;
249 self.event_tx.event(PlayerEvent::SeekOccured { time });
250 callback.send(Some(time))?;
251 } else {
252 callback.send(None)?;
253 }
254 }
255 DecoderCommand::GetTime { callback } => {
256 if let Some(ref opened_decoder) = self.current_decoder {
257 callback.send(Some(opened_decoder.time()))?;
258 } else {
259 callback.send(None)?;
260 }
261 }
262 DecoderCommand::GetPlaying { callback } => {
263 if let Some(ref opened_decoder) = self.current_decoder {
264 callback.send(Some(opened_decoder.decoded_from.clone()))?;
265 } else {
266 callback.send(None)?;
267 }
268 }
269 DecoderCommand::Stop => {
270 self.current_decoder = None;
271 self.preload_decoder = None;
272 self.playback_state
273 .store(PlaybackStatus::Stopped, Ordering::SeqCst);
274 self.event_tx.event(PlayerEvent::PlaybackStopped);
275 }
276 DecoderCommand::Skip => {
277 if let Some(take) = self.preload_decoder.take() {
278 self.player_tx
279 .decoder_event(DecoderEvent::PreloadConsumed)?;
280 let currently_playing = take.decoded_from.clone();
281 self.current_decoder = Some(take);
282 self.event_tx
283 .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
284 } else {
285 self.current_decoder = None;
286 }
287 }
288 }
289 Ok(())
290 }
291}
292
293impl DecoderHandle {
294 pub fn get_cpal_handle(&mut self) -> Result<CpalHandle, PlayerError> {
300 if let Some(current) = self.cpal_handle.take() {
301 Ok(current)
302 } else {
303 CpalHandle::open(&*self.device_config.lock()?, self.volume.clone())
304 }
305 }
306}
307
308pub enum DecoderEvent {
309 PreloadConsumed,
310 Scrobble {
311 track: PlayingTrack,
312 start_time: u64,
313 },
314 NowPlaying {
315 track: PlayingTrack,
316 },
317}
318
319pub enum DecoderCommand {
320 Load {
321 decoder: Box<OpenedDecoder>,
322 },
323 Preload {
324 decoder: Box<OpenedDecoder>,
325 },
326 Unpreload,
327 LoadAndPreload {
328 load: Box<OpenedDecoder>,
329 preload: Box<OpenedDecoder>,
330 },
331 SetPlaying(bool),
332 TogglePlaying {
333 callback: Sender<PlaybackStatus>,
334 },
335 Seek {
336 time: f64,
337 increment: bool,
338 callback: Sender<Option<f64>>,
339 },
340 GetTime {
341 callback: Sender<Option<f64>>,
342 },
343 GetPlaying {
344 callback: Sender<Option<PlayingTrack>>,
345 },
346 Skip,
347 Stop,
348}
349
350pub trait DecoderTx {
351 fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
352 fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
353 fn load_and_preload(
354 &self,
355 load: OpenedDecoder,
356 preload: OpenedDecoder,
357 ) -> Result<(), PlayerError>;
358 fn unpreload(&self) -> Result<(), PlayerError>;
359 fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
360 fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
361 fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
362 fn get_time(&self) -> Result<Option<f64>, PlayerError>;
363 fn get_playing(&self) -> Result<Option<PlayingTrack>, PlayerError>;
364 fn stop(&self) -> Result<(), PlayerError>;
365 fn skip(&self) -> Result<(), PlayerError>;
366}
367
368impl DecoderTx for Sender<DecoderCommand> {
369 fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
370 self.send(DecoderCommand::Load {
371 decoder: Box::new(decoder),
372 })?;
373 Ok(())
374 }
375
376 fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
377 self.send(DecoderCommand::Preload {
378 decoder: Box::new(decoder),
379 })?;
380 Ok(())
381 }
382
383 fn load_and_preload(
384 &self,
385 load: OpenedDecoder,
386 preload: OpenedDecoder,
387 ) -> Result<(), PlayerError> {
388 self.send(DecoderCommand::LoadAndPreload {
389 load: Box::new(load),
390 preload: Box::new(preload),
391 })?;
392 Ok(())
393 }
394
395 fn unpreload(&self) -> Result<(), PlayerError> {
396 self.send(DecoderCommand::Unpreload)?;
397 Ok(())
398 }
399
400 fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
401 self.send(DecoderCommand::SetPlaying(is_playing))?;
402 Ok(())
403 }
404
405 fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
406 let (tx, rx) = channel();
407 self.send(DecoderCommand::TogglePlaying { callback: tx })?;
408 Ok(rx.recv()?)
409 }
410
411 fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
412 let (tx, rx) = channel();
413 self.send(DecoderCommand::Seek {
414 time,
415 increment,
416 callback: tx,
417 })?;
418 Ok(rx.recv()?)
419 }
420
421 fn get_time(&self) -> Result<Option<f64>, PlayerError> {
422 let (tx, rx) = channel();
423 self.send(DecoderCommand::GetTime { callback: tx })?;
424 Ok(rx.recv()?)
425 }
426
427 fn get_playing(&self) -> Result<Option<PlayingTrack>, PlayerError> {
428 let (tx, rx) = channel();
429 self.send(DecoderCommand::GetPlaying { callback: tx })?;
430 Ok(rx.recv()?)
431 }
432
433 fn stop(&self) -> Result<(), PlayerError> {
434 self.send(DecoderCommand::Stop)?;
435 Ok(())
436 }
437
438 fn skip(&self) -> Result<(), PlayerError> {
439 self.send(DecoderCommand::Skip)?;
440 Ok(())
441 }
442}