1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicU32, Ordering},
5 mpsc::{Receiver, Sender, TryRecvError, channel},
6 },
7 thread,
8};
9
10use lunar_lib::error;
11use ringbuf::traits::Observer;
12
13use crate::{
14 PlayerEvent, SHUTDOWN,
15 event_handler::EventTx,
16 player::{
17 OpenedDecoder, PlayerError, PlayerRequest, PlayerTx,
18 playback::{CpalHandle, DeviceConfig},
19 },
20 playlist::{AtomicPlaybackStatus, PlaybackStatus, ResolvedTrack},
21 wait,
22};
23
24pub struct DecoderHandle {
25 rx: Receiver<DecoderCommand>,
26 player_tx: Sender<PlayerRequest>,
27 event_tx: Sender<PlayerEvent>,
28
29 cpal_handle: Option<CpalHandle>,
30 device_config: Arc<Mutex<DeviceConfig>>,
31
32 current_decoder: Option<OpenedDecoder>,
33 preload_decoder: Option<OpenedDecoder>,
34
35 playback_state: Arc<AtomicPlaybackStatus>,
36 volume: Arc<AtomicU32>,
37}
38
39impl DecoderHandle {
40 pub(crate) fn open(
41 player_tx: Sender<PlayerRequest>,
42 event_tx: Sender<PlayerEvent>,
43 device_config: Arc<Mutex<DeviceConfig>>,
44 playback_state: Arc<AtomicPlaybackStatus>,
45 volume: Arc<AtomicU32>,
46 ) -> Result<Sender<DecoderCommand>, PlayerError> {
47 let (tx, rx) = channel();
48
49 let handle = Self {
50 rx,
51 player_tx,
52 event_tx,
53
54 cpal_handle: None,
55 device_config,
56
57 current_decoder: None,
58 preload_decoder: None,
59
60 playback_state,
61 volume,
62 };
63
64 thread::spawn(move || {
65 if let Err(err) = handle.run() {
66 error!("DecoderHandle failed with error: {err}");
67 }
68 SHUTDOWN.store(true, Ordering::Relaxed);
69 });
70
71 Ok(tx)
72 }
73
74 fn run(mut self) -> Result<(), PlayerError> {
75 loop {
76 if SHUTDOWN.load(Ordering::Relaxed) {
77 return Ok(());
78 }
79
80 loop {
81 match self.rx.try_recv() {
82 Ok(command) => self.run_command(command)?,
83 Err(TryRecvError::Empty) => break,
84 Err(TryRecvError::Disconnected) => return Ok(()),
85 }
86 }
87
88 if !matches!(
89 self.playback_state.load(Ordering::Relaxed),
90 PlaybackStatus::Playing
91 ) {
92 wait();
93 continue;
94 }
95
96 let volume = f32::from_bits(self.volume.load(Ordering::Relaxed));
97 if let Some(ref mut cpal_handle) = self.cpal_handle
98 && let Some(finished_consuming) = cpal_handle.consume_packet(volume)
99 && !finished_consuming
100 {
101 wait();
102 continue;
103 }
104
105 if self.current_decoder.is_none() {
106 if let Some(ref cpal_handle) = self.cpal_handle
107 && !cpal_handle.audio_buf.is_empty()
108 {
109 wait();
110 continue;
111 }
112
113 self.cpal_handle = None;
114 self.playback_state
115 .store(PlaybackStatus::Stopped, Ordering::SeqCst);
116 self.event_tx.event(PlayerEvent::PlaybackStopped);
117 continue;
118 }
119
120 self.ensure_cpal_handle()?;
122
123 if self.current_decoder.as_ref().unwrap().at_eof {
124 if let Some(take) = self.preload_decoder.take() {
125 self.player_tx
126 .decoder_event(DecoderEvent::PreloadConsumed)?;
127 let currently_playing = take.decoded_from.clone();
128 self.current_decoder = Some(take);
129 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
130 currently_playing: Box::new(currently_playing),
131 });
132 } else {
133 self.current_decoder = None;
134 continue;
135 }
136 }
137
138 let packet = self
139 .current_decoder
140 .as_mut()
141 .unwrap()
142 .decode_next_packet()?;
143 self.cpal_handle.as_mut().unwrap().pending_packet = packet;
144 }
145 }
146
147 fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
148 match command {
149 DecoderCommand::Load { decoder } => {
150 let currently_playing = decoder.decoded_from.clone();
151 self.current_decoder = Some(*decoder);
152 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
153 currently_playing: Box::new(currently_playing),
154 });
155 }
156 DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
157 DecoderCommand::LoadAndPreload { load, preload } => {
158 let currently_playing = load.decoded_from.clone();
159 self.current_decoder = Some(*load);
160 self.preload_decoder = Some(*preload);
161 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
162 currently_playing: Box::new(currently_playing),
163 });
164 }
165 DecoderCommand::SetPlaying(playing) => {
166 if let Some(ref decoder) = self.current_decoder {
167 let new_state = if playing {
168 PlaybackStatus::Playing
169 } else {
170 PlaybackStatus::Paused
171 };
172
173 if self.playback_state.load(Ordering::SeqCst) == new_state {
174 return Ok(());
175 }
176
177 self.playback_state.store(new_state, Ordering::SeqCst);
178
179 self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
180 is_playing: playing,
181 changed_at: decoder.time(),
182 });
183 }
184 }
185 DecoderCommand::TogglePlaying { callback } => {
186 let current_state = self.playback_state.load(Ordering::SeqCst);
187 if let Some(ref decoder) = self.current_decoder {
188 let new_state = match current_state {
189 PlaybackStatus::Playing => PlaybackStatus::Paused,
190 PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
191 };
192
193 self.playback_state.store(new_state, Ordering::SeqCst);
194 callback.send(new_state)?;
195
196 self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
197 is_playing: matches!(new_state, PlaybackStatus::Playing),
198 changed_at: decoder.time(),
199 });
200 } else {
201 callback.send(PlaybackStatus::Stopped)?;
202 }
203 }
204 DecoderCommand::Seek {
205 time,
206 increment,
207 callback,
208 } => {
209 if let Some(ref mut opened_decoder) = self.current_decoder {
210 let time = opened_decoder.seek(time, increment)?;
211 self.event_tx.event(PlayerEvent::SeekOccured { time });
212 callback.send(Some(time))?;
213 } else {
214 callback.send(None)?;
215 }
216 }
217 DecoderCommand::GetTime { callback } => {
218 if let Some(ref opened_decoder) = self.current_decoder {
219 callback.send(Some(opened_decoder.time()))?;
220 } else {
221 callback.send(None)?;
222 }
223 }
224 DecoderCommand::GetPlaying { callback } => {
225 if let Some(ref opened_decoder) = self.current_decoder {
226 callback.send(Some(opened_decoder.decoded_from.clone()))?;
227 } else {
228 callback.send(None)?;
229 }
230 }
231 DecoderCommand::Stop => {
232 self.current_decoder = None;
233 self.preload_decoder = None;
234 self.playback_state
235 .store(PlaybackStatus::Stopped, Ordering::SeqCst);
236 self.event_tx.event(PlayerEvent::PlaybackStopped);
237 }
238 DecoderCommand::Skip => {
239 if let Some(take) = self.preload_decoder.take() {
240 self.player_tx
241 .decoder_event(DecoderEvent::PreloadConsumed)?;
242 let currently_playing = take.decoded_from.clone();
243 self.current_decoder = Some(take);
244 self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
245 currently_playing: Box::new(currently_playing),
246 });
247 } else {
248 self.current_decoder = None;
249 }
250 }
251 }
252 Ok(())
253 }
254}
255
256impl DecoderHandle {
257 pub fn ensure_cpal_handle(&mut self) -> Result<(), PlayerError> {
263 if self.cpal_handle.is_none() {
264 self.cpal_handle = Some(CpalHandle::open(&*self.device_config.lock()?)?);
265 }
266
267 Ok(())
268 }
269}
270
271pub enum DecoderEvent {
272 PreloadConsumed,
273}
274
275pub enum DecoderCommand {
276 Load {
277 decoder: Box<OpenedDecoder>,
278 },
279 Preload {
280 decoder: Box<OpenedDecoder>,
281 },
282 LoadAndPreload {
283 load: Box<OpenedDecoder>,
284 preload: Box<OpenedDecoder>,
285 },
286 SetPlaying(bool),
287 TogglePlaying {
288 callback: Sender<PlaybackStatus>,
289 },
290 Seek {
291 time: f64,
292 increment: bool,
293 callback: Sender<Option<f64>>,
294 },
295 GetTime {
296 callback: Sender<Option<f64>>,
297 },
298 GetPlaying {
299 callback: Sender<Option<ResolvedTrack>>,
300 },
301 Skip,
302 Stop,
303}
304
305pub trait DecoderTx {
306 fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
307 fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
308 fn load_and_preload(
309 &self,
310 load: OpenedDecoder,
311 preload: OpenedDecoder,
312 ) -> Result<(), PlayerError>;
313 fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
314 fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
315 fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
316 fn get_time(&self) -> Result<Option<f64>, PlayerError>;
317 fn get_playing(&self) -> Result<Option<ResolvedTrack>, PlayerError>;
318 fn stop(&self) -> Result<(), PlayerError>;
319 fn skip(&self) -> Result<(), PlayerError>;
320}
321
322impl DecoderTx for Sender<DecoderCommand> {
323 fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
324 self.send(DecoderCommand::Load {
325 decoder: Box::new(decoder),
326 })?;
327 Ok(())
328 }
329
330 fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
331 self.send(DecoderCommand::Preload {
332 decoder: Box::new(decoder),
333 })?;
334 Ok(())
335 }
336
337 fn load_and_preload(
338 &self,
339 load: OpenedDecoder,
340 preload: OpenedDecoder,
341 ) -> Result<(), PlayerError> {
342 self.send(DecoderCommand::LoadAndPreload {
343 load: Box::new(load),
344 preload: Box::new(preload),
345 })?;
346 Ok(())
347 }
348
349 fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
350 self.send(DecoderCommand::SetPlaying(is_playing))?;
351 Ok(())
352 }
353
354 fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
355 let (tx, rx) = channel();
356 self.send(DecoderCommand::TogglePlaying { callback: tx })?;
357 Ok(rx.recv()?)
358 }
359
360 fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
361 let (tx, rx) = channel();
362 self.send(DecoderCommand::Seek {
363 time,
364 increment,
365 callback: tx,
366 })?;
367 Ok(rx.recv()?)
368 }
369
370 fn get_time(&self) -> Result<Option<f64>, PlayerError> {
371 let (tx, rx) = channel();
372 self.send(DecoderCommand::GetTime { callback: tx })?;
373 Ok(rx.recv()?)
374 }
375
376 fn get_playing(&self) -> Result<Option<ResolvedTrack>, PlayerError> {
377 let (tx, rx) = channel();
378 self.send(DecoderCommand::GetPlaying { callback: tx })?;
379 Ok(rx.recv()?)
380 }
381
382 fn stop(&self) -> Result<(), PlayerError> {
383 self.send(DecoderCommand::Stop)?;
384 Ok(())
385 }
386
387 fn skip(&self) -> Result<(), PlayerError> {
388 self.send(DecoderCommand::Skip)?;
389 Ok(())
390 }
391}