1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicU32, Ordering},
5 mpsc::{Receiver, Sender, TryRecvError, channel},
6 },
7 thread,
8};
9
10use lunar_lib::error;
11use ringbuf::traits::Observer;
12use selene_core::library::track::ResolvedTrack;
13
14use crate::{
15 PlayerEvent, SHUTDOWN,
16 event_handler::EventTx,
17 player::{
18 OpenedDecoder, PlayerError, PlayerRequest, PlayerTx,
19 playback::{CpalHandle, DeviceConfig},
20 },
21 playlist::{AtomicPlaybackStatus, PlaybackStatus},
22 wait,
23};
24
25pub struct DecoderHandle {
26 rx: Receiver<DecoderCommand>,
27 player_tx: Sender<PlayerRequest>,
28 event_tx: Sender<PlayerEvent>,
29
30 cpal_handle: Option<CpalHandle>,
31 device_config: Arc<Mutex<DeviceConfig>>,
32
33 current_decoder: Option<OpenedDecoder>,
34 preload_decoder: Option<OpenedDecoder>,
35
36 playback_state: Arc<AtomicPlaybackStatus>,
37 volume: Arc<AtomicU32>,
38}
39
40impl DecoderHandle {
41 pub(crate) fn open(
42 player_tx: Sender<PlayerRequest>,
43 event_tx: Sender<PlayerEvent>,
44 device_config: Arc<Mutex<DeviceConfig>>,
45 playback_state: Arc<AtomicPlaybackStatus>,
46 volume: Arc<AtomicU32>,
47 ) -> Result<Sender<DecoderCommand>, PlayerError> {
48 let (tx, rx) = channel();
49
50 let handle = Self {
51 rx,
52 player_tx,
53 event_tx,
54
55 cpal_handle: None,
56 device_config,
57
58 current_decoder: None,
59 preload_decoder: None,
60
61 playback_state,
62 volume,
63 };
64
65 thread::spawn(move || {
66 if let Err(err) = handle.run() {
67 error!("DecoderHandle failed with error: {err}");
68 }
69 SHUTDOWN.store(true, Ordering::Relaxed);
70 });
71
72 Ok(tx)
73 }
74
75 fn run(mut self) -> Result<(), PlayerError> {
76 loop {
77 if SHUTDOWN.load(Ordering::Relaxed) {
78 return Ok(());
79 }
80
81 loop {
82 match self.rx.try_recv() {
83 Ok(command) => self.run_command(command)?,
84 Err(TryRecvError::Empty) => break,
85 Err(TryRecvError::Disconnected) => return Ok(()),
86 }
87 }
88
89 if !matches!(
90 self.playback_state.load(Ordering::Relaxed),
91 PlaybackStatus::Playing
92 ) {
93 wait();
94 continue;
95 }
96
97 let volume = f32::from_bits(self.volume.load(Ordering::Relaxed));
98 if let Some(ref mut cpal_handle) = self.cpal_handle
99 && let Some(finished_consuming) = cpal_handle.consume_packet(volume)
100 && !finished_consuming
101 {
102 wait();
103 continue;
104 }
105
106 if self.current_decoder.is_none() {
107 if let Some(ref cpal_handle) = self.cpal_handle
108 && !cpal_handle.audio_buf.is_empty()
109 {
110 wait();
111 continue;
112 }
113
114 self.cpal_handle = None;
115 self.playback_state
116 .store(PlaybackStatus::Stopped, Ordering::SeqCst);
117 self.event_tx.event(PlayerEvent::PlaybackStopped);
118 continue;
119 }
120
121 self.ensure_cpal_handle()?;
123
124 if self.current_decoder.as_ref().unwrap().at_eof {
125 if let Some(preload) = self.preload_decoder.take() {
126 self.player_tx
127 .decoder_event(DecoderEvent::PreloadConsumed)?;
128
129 let currently_playing = Box::new((*preload.decoded_from).clone());
130 self.current_decoder = Some(preload);
131
132 self.event_tx
133 .event(PlayerEvent::CurrentlyPlayingChanged { currently_playing });
134 } else {
135 self.current_decoder = None;
136 continue;
137 }
138 }
139
140 let Some(decoder) = self.current_decoder.as_mut() else {
141 panic!("Decoder was already validated")
142 };
143
144 if decoder.started() {
145 self.player_tx.decoder_event(DecoderEvent::NowPlaying {
146 track: decoder.decoded_from.clone(),
147 })?
148 };
149
150 let packet = decoder.decode_next_packet()?;
151 self.cpal_handle.as_mut().unwrap().pending_packet = packet;
152
153 let decoded_time = decoder.decoded_time();
154 if (decoded_time >= (decoder.decoded_from.track.container().stream().duration() / 2.0)
155 || decoded_time >= 240.0)
156 && !decoder.sent_scrobble
157 {
158 decoder.sent_scrobble = true;
159 self.player_tx.decoder_event(DecoderEvent::Scrobble {
160 track: decoder.decoded_from.clone(),
161 start_time: decoder.start_time().unwrap(),
162 })?
163 }
164 }
165 }
166
167 fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
168 match command {
169 DecoderCommand::Load { decoder } => {
170 let currently_playing = decoder.decoded_from.clone();
171 self.current_decoder = Some(*decoder);
172 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
173 currently_playing: Box::new(Arc::unwrap_or_clone(currently_playing)),
174 });
175 }
176 DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
177 DecoderCommand::LoadAndPreload { load, preload } => {
178 let currently_playing = load.decoded_from.clone();
179 self.current_decoder = Some(*load);
180 self.preload_decoder = Some(*preload);
181 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
182 currently_playing: Box::new(Arc::unwrap_or_clone(currently_playing)),
183 });
184 }
185 DecoderCommand::SetPlaying(playing) => {
186 if let Some(ref decoder) = self.current_decoder {
187 let new_state = if playing {
188 PlaybackStatus::Playing
189 } else {
190 PlaybackStatus::Paused
191 };
192
193 if self.playback_state.load(Ordering::SeqCst) == new_state {
194 return Ok(());
195 }
196
197 self.playback_state.store(new_state, Ordering::SeqCst);
198
199 self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
200 is_playing: playing,
201 changed_at: decoder.time(),
202 });
203 }
204 }
205 DecoderCommand::TogglePlaying { callback } => {
206 let current_state = self.playback_state.load(Ordering::SeqCst);
207 if let Some(ref decoder) = self.current_decoder {
208 let new_state = match current_state {
209 PlaybackStatus::Playing => PlaybackStatus::Paused,
210 PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
211 };
212
213 self.playback_state.store(new_state, Ordering::SeqCst);
214 callback.send(new_state)?;
215
216 self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
217 is_playing: matches!(new_state, PlaybackStatus::Playing),
218 changed_at: decoder.time(),
219 });
220 } else {
221 callback.send(PlaybackStatus::Stopped)?;
222 }
223 }
224 DecoderCommand::Seek {
225 time,
226 increment,
227 callback,
228 } => {
229 if let Some(ref mut opened_decoder) = self.current_decoder {
230 let time = opened_decoder.seek(time, increment)?;
231 self.event_tx.event(PlayerEvent::SeekOccured { time });
232 callback.send(Some(time))?;
233 } else {
234 callback.send(None)?;
235 }
236 }
237 DecoderCommand::GetTime { callback } => {
238 if let Some(ref opened_decoder) = self.current_decoder {
239 callback.send(Some(opened_decoder.time()))?;
240 } else {
241 callback.send(None)?;
242 }
243 }
244 DecoderCommand::GetPlaying { callback } => {
245 if let Some(ref opened_decoder) = self.current_decoder {
246 callback.send(Some(opened_decoder.decoded_from.clone()))?;
247 } else {
248 callback.send(None)?;
249 }
250 }
251 DecoderCommand::Stop => {
252 self.current_decoder = None;
253 self.preload_decoder = None;
254 self.playback_state
255 .store(PlaybackStatus::Stopped, Ordering::SeqCst);
256 self.event_tx.event(PlayerEvent::PlaybackStopped);
257 }
258 DecoderCommand::Skip => {
259 if let Some(take) = self.preload_decoder.take() {
260 self.player_tx
261 .decoder_event(DecoderEvent::PreloadConsumed)?;
262 let currently_playing = take.decoded_from.clone();
263 self.current_decoder = Some(take);
264 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
265 currently_playing: Box::new(Arc::unwrap_or_clone(currently_playing)),
266 });
267 } else {
268 self.current_decoder = None;
269 }
270 }
271 }
272 Ok(())
273 }
274}
275
276impl DecoderHandle {
277 pub fn ensure_cpal_handle(&mut self) -> Result<(), PlayerError> {
283 if self.cpal_handle.is_none() {
284 self.cpal_handle = Some(CpalHandle::open(&*self.device_config.lock()?)?);
285 }
286
287 Ok(())
288 }
289}
290
291pub enum DecoderEvent {
292 PreloadConsumed,
293 Scrobble {
294 track: Arc<ResolvedTrack>,
295 start_time: u64,
296 },
297 NowPlaying {
298 track: Arc<ResolvedTrack>,
299 },
300}
301
302pub enum DecoderCommand {
303 Load {
304 decoder: Box<OpenedDecoder>,
305 },
306 Preload {
307 decoder: Box<OpenedDecoder>,
308 },
309 LoadAndPreload {
310 load: Box<OpenedDecoder>,
311 preload: Box<OpenedDecoder>,
312 },
313 SetPlaying(bool),
314 TogglePlaying {
315 callback: Sender<PlaybackStatus>,
316 },
317 Seek {
318 time: f64,
319 increment: bool,
320 callback: Sender<Option<f64>>,
321 },
322 GetTime {
323 callback: Sender<Option<f64>>,
324 },
325 GetPlaying {
326 callback: Sender<Option<Arc<ResolvedTrack>>>,
327 },
328 Skip,
329 Stop,
330}
331
332pub trait DecoderTx {
333 fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
334 fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
335 fn load_and_preload(
336 &self,
337 load: OpenedDecoder,
338 preload: OpenedDecoder,
339 ) -> Result<(), PlayerError>;
340 fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
341 fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
342 fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
343 fn get_time(&self) -> Result<Option<f64>, PlayerError>;
344 fn get_playing(&self) -> Result<Option<Arc<ResolvedTrack>>, PlayerError>;
345 fn stop(&self) -> Result<(), PlayerError>;
346 fn skip(&self) -> Result<(), PlayerError>;
347}
348
349impl DecoderTx for Sender<DecoderCommand> {
350 fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
351 self.send(DecoderCommand::Load {
352 decoder: Box::new(decoder),
353 })?;
354 Ok(())
355 }
356
357 fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
358 self.send(DecoderCommand::Preload {
359 decoder: Box::new(decoder),
360 })?;
361 Ok(())
362 }
363
364 fn load_and_preload(
365 &self,
366 load: OpenedDecoder,
367 preload: OpenedDecoder,
368 ) -> Result<(), PlayerError> {
369 self.send(DecoderCommand::LoadAndPreload {
370 load: Box::new(load),
371 preload: Box::new(preload),
372 })?;
373 Ok(())
374 }
375
376 fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
377 self.send(DecoderCommand::SetPlaying(is_playing))?;
378 Ok(())
379 }
380
381 fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
382 let (tx, rx) = channel();
383 self.send(DecoderCommand::TogglePlaying { callback: tx })?;
384 Ok(rx.recv()?)
385 }
386
387 fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
388 let (tx, rx) = channel();
389 self.send(DecoderCommand::Seek {
390 time,
391 increment,
392 callback: tx,
393 })?;
394 Ok(rx.recv()?)
395 }
396
397 fn get_time(&self) -> Result<Option<f64>, PlayerError> {
398 let (tx, rx) = channel();
399 self.send(DecoderCommand::GetTime { callback: tx })?;
400 Ok(rx.recv()?)
401 }
402
403 fn get_playing(&self) -> Result<Option<Arc<ResolvedTrack>>, PlayerError> {
404 let (tx, rx) = channel();
405 self.send(DecoderCommand::GetPlaying { callback: tx })?;
406 Ok(rx.recv()?)
407 }
408
409 fn stop(&self) -> Result<(), PlayerError> {
410 self.send(DecoderCommand::Stop)?;
411 Ok(())
412 }
413
414 fn skip(&self) -> Result<(), PlayerError> {
415 self.send(DecoderCommand::Skip)?;
416 Ok(())
417 }
418}