librespot_playback/
player.rs

1use std::{
2    collections::HashMap,
3    fmt, fs,
4    fs::File,
5    future::Future,
6    io::{self, Read, Seek, SeekFrom},
7    mem,
8    pin::Pin,
9    process::exit,
10    sync::Mutex,
11    sync::{
12        Arc,
13        atomic::{AtomicUsize, Ordering},
14    },
15    task::{Context, Poll},
16    thread,
17    time::{Duration, Instant},
18};
19
20#[cfg(feature = "passthrough-decoder")]
21use crate::decoder::PassthroughDecoder;
22use crate::{
23    audio::{AudioDecrypt, AudioFetchParams, AudioFile, StreamLoaderController},
24    audio_backend::Sink,
25    config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig},
26    convert::Converter,
27    core::{Error, Session, SpotifyId, SpotifyUri, util::SeqGenerator},
28    decoder::{AudioDecoder, AudioPacket, AudioPacketPosition, SymphoniaDecoder},
29    local_file::{LocalFileLookup, create_local_file_lookup},
30    metadata::audio::{AudioFileFormat, AudioFiles, AudioItem},
31    mixer::VolumeGetter,
32};
33use futures_util::{
34    StreamExt, TryFutureExt, future, future::FusedFuture,
35    stream::futures_unordered::FuturesUnordered,
36};
37use librespot_metadata::{audio::UniqueFields, track::Tracks};
38
39use symphonia::core::io::MediaSource;
40use symphonia::core::probe::Hint;
41use tokio::sync::{mpsc, oneshot};
42
43use crate::SAMPLES_PER_SECOND;
44
45const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
46pub const DB_VOLTAGE_RATIO: f64 = 20.0;
47pub const PCM_AT_0DBFS: f64 = 1.0;
48
49// Spotify inserts a custom Ogg packet at the start with custom metadata values, that you would
50// otherwise expect in Vorbis comments. This packet isn't well-formed and players may balk at it.
51const SPOTIFY_OGG_HEADER_END: u64 = 0xa7;
52
53const LOAD_HANDLES_POISON_MSG: &str = "load handles mutex should not be poisoned";
54
55pub type PlayerResult = Result<(), Error>;
56
57pub struct Player {
58    commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
59    thread_handle: Option<thread::JoinHandle<()>>,
60}
61
62#[derive(PartialEq, Eq, Debug, Clone, Copy)]
63pub enum SinkStatus {
64    Running,
65    Closed,
66    TemporarilyClosed,
67}
68
69pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
70
71struct PlayerInternal {
72    session: Session,
73    config: PlayerConfig,
74    commands: mpsc::UnboundedReceiver<PlayerCommand>,
75    load_handles: Arc<Mutex<HashMap<thread::ThreadId, thread::JoinHandle<()>>>>,
76
77    state: PlayerState,
78    preload: PlayerPreload,
79    sink: Box<dyn Sink>,
80    sink_status: SinkStatus,
81    sink_event_callback: Option<SinkEventCallback>,
82    volume_getter: Box<dyn VolumeGetter + Send>,
83    event_senders: Vec<mpsc::UnboundedSender<PlayerEvent>>,
84    converter: Converter,
85
86    normalisation_integrators: [f64; 2],
87    normalisation_peaks: [f64; 2],
88    normalisation_channel: usize,
89    normalisation_knee_factor: f64,
90
91    auto_normalise_as_album: bool,
92
93    player_id: usize,
94    play_request_id_generator: SeqGenerator<u64>,
95    last_progress_update: Instant,
96
97    local_file_lookup: Arc<LocalFileLookup>,
98}
99
100static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0);
101
102enum PlayerCommand {
103    Load {
104        track_id: SpotifyUri,
105        play: bool,
106        position_ms: u32,
107    },
108    Preload {
109        track_id: SpotifyUri,
110    },
111    Play,
112    Pause,
113    Stop,
114    Seek(u32),
115    SetSession(Session),
116    AddEventSender(mpsc::UnboundedSender<PlayerEvent>),
117    SetSinkEventCallback(Option<SinkEventCallback>),
118    EmitVolumeChangedEvent(u16),
119    SetAutoNormaliseAsAlbum(bool),
120    EmitSessionDisconnectedEvent {
121        connection_id: String,
122        user_name: String,
123    },
124    EmitSessionConnectedEvent {
125        connection_id: String,
126        user_name: String,
127    },
128    EmitSessionClientChangedEvent {
129        client_id: String,
130        client_name: String,
131        client_brand_name: String,
132        client_model_name: String,
133    },
134    EmitFilterExplicitContentChangedEvent(bool),
135    EmitShuffleChangedEvent(bool),
136    EmitRepeatChangedEvent {
137        context: bool,
138        track: bool,
139    },
140    EmitAutoPlayChangedEvent(bool),
141}
142
143#[derive(Debug, Clone)]
144pub enum PlayerEvent {
145    // Play request id changed
146    PlayRequestIdChanged {
147        play_request_id: u64,
148    },
149    // Fired when the player is stopped (e.g. by issuing a "stop" command to the player).
150    Stopped {
151        play_request_id: u64,
152        track_id: SpotifyUri,
153    },
154    // The player is delayed by loading a track.
155    Loading {
156        play_request_id: u64,
157        track_id: SpotifyUri,
158        position_ms: u32,
159    },
160    // The player is preloading a track.
161    Preloading {
162        track_id: SpotifyUri,
163    },
164    // The player is playing a track.
165    // This event is issued at the start of playback of whenever the position must be communicated
166    // because it is out of sync. This includes:
167    // start of a track
168    // un-pausing
169    // after a seek
170    // after a buffer-underrun
171    Playing {
172        play_request_id: u64,
173        track_id: SpotifyUri,
174        position_ms: u32,
175    },
176    // The player entered a paused state.
177    Paused {
178        play_request_id: u64,
179        track_id: SpotifyUri,
180        position_ms: u32,
181    },
182    // The player thinks it's a good idea to issue a preload command for the next track now.
183    // This event is intended for use within spirc.
184    TimeToPreloadNextTrack {
185        play_request_id: u64,
186        track_id: SpotifyUri,
187    },
188    // The player reached the end of a track.
189    // This event is intended for use within spirc. Spirc will respond by issuing another command.
190    EndOfTrack {
191        play_request_id: u64,
192        track_id: SpotifyUri,
193    },
194    // The player was unable to load the requested track.
195    Unavailable {
196        play_request_id: u64,
197        track_id: SpotifyUri,
198    },
199    // The mixer volume was set to a new level.
200    VolumeChanged {
201        volume: u16,
202    },
203    PositionCorrection {
204        play_request_id: u64,
205        track_id: SpotifyUri,
206        position_ms: u32,
207    },
208    /// Requires `PlayerConfig::position_update_interval` to be set to Some.
209    /// Once set this event will be sent periodically while playing the track to inform about the
210    /// current playback position
211    PositionChanged {
212        play_request_id: u64,
213        track_id: SpotifyUri,
214        position_ms: u32,
215    },
216    Seeked {
217        play_request_id: u64,
218        track_id: SpotifyUri,
219        position_ms: u32,
220    },
221    TrackChanged {
222        audio_item: Box<AudioItem>,
223    },
224    SessionConnected {
225        connection_id: String,
226        user_name: String,
227    },
228    SessionDisconnected {
229        connection_id: String,
230        user_name: String,
231    },
232    SessionClientChanged {
233        client_id: String,
234        client_name: String,
235        client_brand_name: String,
236        client_model_name: String,
237    },
238    ShuffleChanged {
239        shuffle: bool,
240    },
241    RepeatChanged {
242        context: bool,
243        track: bool,
244    },
245    AutoPlayChanged {
246        auto_play: bool,
247    },
248    FilterExplicitContentChanged {
249        filter: bool,
250    },
251}
252
253impl PlayerEvent {
254    pub fn get_play_request_id(&self) -> Option<u64> {
255        use PlayerEvent::*;
256        match self {
257            Loading {
258                play_request_id, ..
259            }
260            | Unavailable {
261                play_request_id, ..
262            }
263            | Playing {
264                play_request_id, ..
265            }
266            | TimeToPreloadNextTrack {
267                play_request_id, ..
268            }
269            | EndOfTrack {
270                play_request_id, ..
271            }
272            | Paused {
273                play_request_id, ..
274            }
275            | Stopped {
276                play_request_id, ..
277            }
278            | PositionCorrection {
279                play_request_id, ..
280            }
281            | Seeked {
282                play_request_id, ..
283            } => Some(*play_request_id),
284            _ => None,
285        }
286    }
287}
288
289pub type PlayerEventChannel = mpsc::UnboundedReceiver<PlayerEvent>;
290
291#[inline]
292pub fn db_to_ratio(db: f64) -> f64 {
293    f64::powf(10.0, db / DB_VOLTAGE_RATIO)
294}
295
296#[inline]
297pub fn ratio_to_db(ratio: f64) -> f64 {
298    ratio.log10() * DB_VOLTAGE_RATIO
299}
300
301pub fn duration_to_coefficient(duration: Duration) -> f64 {
302    f64::exp(-1.0 / (duration.as_secs_f64() * SAMPLES_PER_SECOND as f64))
303}
304
305pub fn coefficient_to_duration(coefficient: f64) -> Duration {
306    Duration::from_secs_f64(-1.0 / f64::ln(coefficient) / SAMPLES_PER_SECOND as f64)
307}
308
309#[derive(Clone, Copy, Debug)]
310pub struct NormalisationData {
311    // Spotify provides these as `f32`, but audio metadata can contain up to `f64`.
312    // Also, this negates the need for casting during sample processing.
313    pub track_gain_db: f64,
314    pub track_peak: f64,
315    pub album_gain_db: f64,
316    pub album_peak: f64,
317}
318
319impl Default for NormalisationData {
320    fn default() -> Self {
321        Self {
322            track_gain_db: 0.0,
323            track_peak: 1.0,
324            album_gain_db: 0.0,
325            album_peak: 1.0,
326        }
327    }
328}
329
330impl NormalisationData {
331    fn parse_from_ogg<T: Read + Seek>(mut file: T) -> io::Result<NormalisationData> {
332        const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144;
333        const NORMALISATION_DATA_SIZE: usize = 16;
334
335        let newpos = file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET))?;
336        if newpos != SPOTIFY_NORMALIZATION_HEADER_START_OFFSET {
337            error!(
338                "NormalisationData::parse_from_file seeking to {SPOTIFY_NORMALIZATION_HEADER_START_OFFSET} but position is now {newpos}"
339            );
340
341            error!("Falling back to default (non-track and non-album) normalisation data.");
342
343            return Ok(NormalisationData::default());
344        }
345
346        let mut buf = [0u8; NORMALISATION_DATA_SIZE];
347
348        file.read_exact(&mut buf)?;
349
350        let track_gain_db = f32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as f64;
351        let track_peak = f32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as f64;
352        let album_gain_db = f32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]) as f64;
353        let album_peak = f32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]) as f64;
354
355        Ok(Self {
356            track_gain_db,
357            track_peak,
358            album_gain_db,
359            album_peak,
360        })
361    }
362
363    fn get_factor(config: &PlayerConfig, data: NormalisationData) -> f64 {
364        if !config.normalisation {
365            return 1.0;
366        }
367
368        let (gain_db, gain_peak) = if config.normalisation_type == NormalisationType::Album {
369            (data.album_gain_db, data.album_peak)
370        } else {
371            (data.track_gain_db, data.track_peak)
372        };
373
374        // As per the ReplayGain 1.0 & 2.0 (proposed) spec:
375        // https://wiki.hydrogenaud.io/index.php?title=ReplayGain_1.0_specification#Clipping_prevention
376        // https://wiki.hydrogenaud.io/index.php?title=ReplayGain_2.0_specification#Clipping_prevention
377        let normalisation_factor = if config.normalisation_method == NormalisationMethod::Basic {
378            // For Basic Normalisation, factor = min(ratio of (ReplayGain + PreGain), 1.0 / peak level).
379            // https://wiki.hydrogenaud.io/index.php?title=ReplayGain_1.0_specification#Peak_amplitude
380            // https://wiki.hydrogenaud.io/index.php?title=ReplayGain_2.0_specification#Peak_amplitude
381            // We then limit that to 1.0 as not to exceed dBFS (0.0 dB).
382            let factor = f64::min(
383                db_to_ratio(gain_db + config.normalisation_pregain_db),
384                PCM_AT_0DBFS / gain_peak,
385            );
386
387            if factor > PCM_AT_0DBFS {
388                info!(
389                    "Lowering gain by {:.2} dB for the duration of this track to avoid potentially exceeding dBFS.",
390                    ratio_to_db(factor)
391                );
392
393                PCM_AT_0DBFS
394            } else {
395                factor
396            }
397        } else {
398            // For Dynamic Normalisation it's up to the player to decide,
399            // factor = ratio of (ReplayGain + PreGain).
400            // We then let the dynamic limiter handle gain reduction.
401            let factor = db_to_ratio(gain_db + config.normalisation_pregain_db);
402            let threshold_ratio = db_to_ratio(config.normalisation_threshold_dbfs);
403
404            if factor > PCM_AT_0DBFS {
405                let factor_db = gain_db + config.normalisation_pregain_db;
406                let limiting_db = factor_db + config.normalisation_threshold_dbfs.abs();
407
408                warn!(
409                    "This track may exceed dBFS by {factor_db:.2} dB and be subject to {limiting_db:.2} dB of dynamic limiting at its peak."
410                );
411            } else if factor > threshold_ratio {
412                let limiting_db = gain_db
413                    + config.normalisation_pregain_db
414                    + config.normalisation_threshold_dbfs.abs();
415
416                info!(
417                    "This track may be subject to {limiting_db:.2} dB of dynamic limiting at its peak."
418                );
419            }
420
421            factor
422        };
423
424        debug!("Normalisation Data: {data:?}");
425        debug!(
426            "Calculated Normalisation Factor for {:?}: {:.2}%",
427            config.normalisation_type,
428            normalisation_factor * 100.0
429        );
430
431        normalisation_factor
432    }
433}
434
435impl Player {
436    pub fn new<F>(
437        config: PlayerConfig,
438        session: Session,
439        volume_getter: Box<dyn VolumeGetter + Send>,
440        sink_builder: F,
441    ) -> Arc<Self>
442    where
443        F: FnOnce() -> Box<dyn Sink> + Send + 'static,
444    {
445        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
446
447        if config.normalisation {
448            debug!("Normalisation Type: {:?}", config.normalisation_type);
449            debug!(
450                "Normalisation Pregain: {:.1} dB",
451                config.normalisation_pregain_db
452            );
453            debug!(
454                "Normalisation Threshold: {:.1} dBFS",
455                config.normalisation_threshold_dbfs
456            );
457            debug!("Normalisation Method: {:?}", config.normalisation_method);
458
459            if config.normalisation_method == NormalisationMethod::Dynamic {
460                // as_millis() has rounding errors (truncates)
461                debug!(
462                    "Normalisation Attack: {:.0} ms",
463                    coefficient_to_duration(config.normalisation_attack_cf).as_secs_f64() * 1000.
464                );
465                debug!(
466                    "Normalisation Release: {:.0} ms",
467                    coefficient_to_duration(config.normalisation_release_cf).as_secs_f64() * 1000.
468                );
469                debug!("Normalisation Knee: {} dB", config.normalisation_knee_db);
470            }
471        }
472
473        let handle = thread::spawn(move || {
474            let player_id = PLAYER_COUNTER.fetch_add(1, Ordering::AcqRel);
475            debug!("new Player [{player_id}]");
476
477            let converter = Converter::new(config.ditherer);
478            let normalisation_knee_factor = 1.0 / (8.0 * config.normalisation_knee_db);
479
480            // TODO: it would be neat if we could watch for added or modified files in the
481            // specified directories, and dynamically update the lookup. Currently, a new player
482            // must be created for any new local files to be playable.
483            let local_file_lookup =
484                create_local_file_lookup(config.local_file_directories.as_slice());
485
486            let internal = PlayerInternal {
487                session,
488                config,
489                commands: cmd_rx,
490                load_handles: Arc::new(Mutex::new(HashMap::new())),
491
492                state: PlayerState::Stopped,
493                preload: PlayerPreload::None,
494                sink: sink_builder(),
495                sink_status: SinkStatus::Closed,
496                sink_event_callback: None,
497                volume_getter,
498                event_senders: vec![],
499                converter,
500
501                normalisation_peaks: [0.0; 2],
502                normalisation_integrators: [0.0; 2],
503                normalisation_channel: 0,
504                normalisation_knee_factor,
505
506                auto_normalise_as_album: false,
507
508                player_id,
509                play_request_id_generator: SeqGenerator::new(0),
510                last_progress_update: Instant::now(),
511
512                local_file_lookup: Arc::new(local_file_lookup),
513            };
514
515            // While PlayerInternal is written as a future, it still contains blocking code.
516            // It must be run by using block_on() in a dedicated thread.
517            let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
518            runtime.block_on(internal);
519
520            debug!("PlayerInternal thread finished.");
521        });
522
523        Arc::new(Self {
524            commands: Some(cmd_tx),
525            thread_handle: Some(handle),
526        })
527    }
528
529    pub fn is_invalid(&self) -> bool {
530        if let Some(handle) = self.thread_handle.as_ref() {
531            return handle.is_finished();
532        }
533        true
534    }
535
536    fn command(&self, cmd: PlayerCommand) {
537        if let Some(commands) = self.commands.as_ref() {
538            if let Err(e) = commands.send(cmd) {
539                error!("Player Commands Error: {e}");
540            }
541        }
542    }
543
544    pub fn load(&self, track_id: SpotifyUri, start_playing: bool, position_ms: u32) {
545        self.command(PlayerCommand::Load {
546            track_id,
547            play: start_playing,
548            position_ms,
549        });
550    }
551
552    pub fn preload(&self, track_id: SpotifyUri) {
553        self.command(PlayerCommand::Preload { track_id });
554    }
555
556    pub fn play(&self) {
557        self.command(PlayerCommand::Play)
558    }
559
560    pub fn pause(&self) {
561        self.command(PlayerCommand::Pause)
562    }
563
564    pub fn stop(&self) {
565        self.command(PlayerCommand::Stop)
566    }
567
568    pub fn seek(&self, position_ms: u32) {
569        self.command(PlayerCommand::Seek(position_ms));
570    }
571
572    pub fn set_session(&self, session: Session) {
573        self.command(PlayerCommand::SetSession(session));
574    }
575
576    pub fn get_player_event_channel(&self) -> PlayerEventChannel {
577        let (event_sender, event_receiver) = mpsc::unbounded_channel();
578        self.command(PlayerCommand::AddEventSender(event_sender));
579        event_receiver
580    }
581
582    pub async fn await_end_of_track(&self) {
583        let mut channel = self.get_player_event_channel();
584        while let Some(event) = channel.recv().await {
585            if matches!(
586                event,
587                PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
588            ) {
589                return;
590            }
591        }
592    }
593
594    pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
595        self.command(PlayerCommand::SetSinkEventCallback(callback));
596    }
597
598    pub fn emit_volume_changed_event(&self, volume: u16) {
599        self.command(PlayerCommand::EmitVolumeChangedEvent(volume));
600    }
601
602    pub fn set_auto_normalise_as_album(&self, setting: bool) {
603        self.command(PlayerCommand::SetAutoNormaliseAsAlbum(setting));
604    }
605
606    pub fn emit_filter_explicit_content_changed_event(&self, filter: bool) {
607        self.command(PlayerCommand::EmitFilterExplicitContentChangedEvent(filter));
608    }
609
610    pub fn emit_session_connected_event(&self, connection_id: String, user_name: String) {
611        self.command(PlayerCommand::EmitSessionConnectedEvent {
612            connection_id,
613            user_name,
614        });
615    }
616
617    pub fn emit_session_disconnected_event(&self, connection_id: String, user_name: String) {
618        self.command(PlayerCommand::EmitSessionDisconnectedEvent {
619            connection_id,
620            user_name,
621        });
622    }
623
624    pub fn emit_session_client_changed_event(
625        &self,
626        client_id: String,
627        client_name: String,
628        client_brand_name: String,
629        client_model_name: String,
630    ) {
631        self.command(PlayerCommand::EmitSessionClientChangedEvent {
632            client_id,
633            client_name,
634            client_brand_name,
635            client_model_name,
636        });
637    }
638
639    pub fn emit_shuffle_changed_event(&self, shuffle: bool) {
640        self.command(PlayerCommand::EmitShuffleChangedEvent(shuffle));
641    }
642
643    pub fn emit_repeat_changed_event(&self, context: bool, track: bool) {
644        self.command(PlayerCommand::EmitRepeatChangedEvent { context, track });
645    }
646
647    pub fn emit_auto_play_changed_event(&self, auto_play: bool) {
648        self.command(PlayerCommand::EmitAutoPlayChangedEvent(auto_play));
649    }
650}
651
652impl Drop for Player {
653    fn drop(&mut self) {
654        debug!("Shutting down player thread ...");
655        self.commands = None;
656        if let Some(handle) = self.thread_handle.take() {
657            if let Err(e) = handle.join() {
658                error!("Player thread Error: {e:?}");
659            }
660        }
661    }
662}
663
664struct PlayerLoadedTrackData {
665    decoder: Decoder,
666    normalisation_data: NormalisationData,
667    stream_loader_controller: StreamLoaderController,
668    audio_item: AudioItem,
669    bytes_per_second: usize,
670    duration_ms: u32,
671    stream_position_ms: u32,
672    is_explicit: bool,
673}
674
675enum PlayerPreload {
676    None,
677    Loading {
678        track_id: SpotifyUri,
679        loader: Pin<Box<dyn FusedFuture<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
680    },
681    Ready {
682        track_id: SpotifyUri,
683        loaded_track: Box<PlayerLoadedTrackData>,
684    },
685}
686
687type Decoder = Box<dyn AudioDecoder + Send>;
688
689enum PlayerState {
690    Stopped,
691    Loading {
692        track_id: SpotifyUri,
693        play_request_id: u64,
694        start_playback: bool,
695        loader: Pin<Box<dyn FusedFuture<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
696    },
697    Paused {
698        track_id: SpotifyUri,
699        play_request_id: u64,
700        decoder: Decoder,
701        audio_item: AudioItem,
702        normalisation_data: NormalisationData,
703        normalisation_factor: f64,
704        stream_loader_controller: StreamLoaderController,
705        bytes_per_second: usize,
706        duration_ms: u32,
707        stream_position_ms: u32,
708        suggested_to_preload_next_track: bool,
709        is_explicit: bool,
710    },
711    Playing {
712        track_id: SpotifyUri,
713        play_request_id: u64,
714        decoder: Decoder,
715        normalisation_data: NormalisationData,
716        audio_item: AudioItem,
717        normalisation_factor: f64,
718        stream_loader_controller: StreamLoaderController,
719        bytes_per_second: usize,
720        duration_ms: u32,
721        stream_position_ms: u32,
722        reported_nominal_start_time: Option<Instant>,
723        suggested_to_preload_next_track: bool,
724        is_explicit: bool,
725    },
726    EndOfTrack {
727        track_id: SpotifyUri,
728        play_request_id: u64,
729        loaded_track: PlayerLoadedTrackData,
730    },
731    Invalid,
732}
733
734impl PlayerState {
735    fn is_playing(&self) -> bool {
736        use self::PlayerState::*;
737        match *self {
738            Stopped | EndOfTrack { .. } | Paused { .. } | Loading { .. } => false,
739            Playing { .. } => true,
740            Invalid => {
741                error!("PlayerState::is_playing in invalid state");
742                exit(1);
743            }
744        }
745    }
746
747    #[allow(dead_code)]
748    fn is_stopped(&self) -> bool {
749        use self::PlayerState::*;
750        matches!(self, Stopped)
751    }
752
753    #[allow(dead_code)]
754    fn is_loading(&self) -> bool {
755        use self::PlayerState::*;
756        matches!(self, Loading { .. })
757    }
758
759    fn decoder(&mut self) -> Option<&mut Decoder> {
760        use self::PlayerState::*;
761        match *self {
762            Stopped | EndOfTrack { .. } | Loading { .. } => None,
763            Paused {
764                ref mut decoder, ..
765            }
766            | Playing {
767                ref mut decoder, ..
768            } => Some(decoder),
769            Invalid => {
770                error!("PlayerState::decoder in invalid state");
771                exit(1);
772            }
773        }
774    }
775
776    fn playing_to_end_of_track(&mut self) {
777        use self::PlayerState::*;
778        let new_state = mem::replace(self, Invalid);
779        match new_state {
780            Playing {
781                track_id,
782                play_request_id,
783                decoder,
784                duration_ms,
785                bytes_per_second,
786                normalisation_data,
787                stream_loader_controller,
788                stream_position_ms,
789                is_explicit,
790                audio_item,
791                ..
792            } => {
793                *self = EndOfTrack {
794                    track_id,
795                    play_request_id,
796                    loaded_track: PlayerLoadedTrackData {
797                        decoder,
798                        normalisation_data,
799                        stream_loader_controller,
800                        audio_item,
801                        bytes_per_second,
802                        duration_ms,
803                        stream_position_ms,
804                        is_explicit,
805                    },
806                };
807            }
808            _ => {
809                error!("Called playing_to_end_of_track in non-playing state: {new_state:?}");
810                exit(1);
811            }
812        }
813    }
814
815    fn paused_to_playing(&mut self) {
816        use self::PlayerState::*;
817        let new_state = mem::replace(self, Invalid);
818        match new_state {
819            Paused {
820                track_id,
821                play_request_id,
822                decoder,
823                audio_item,
824                normalisation_data,
825                normalisation_factor,
826                stream_loader_controller,
827                duration_ms,
828                bytes_per_second,
829                stream_position_ms,
830                suggested_to_preload_next_track,
831                is_explicit,
832            } => {
833                *self = Playing {
834                    track_id,
835                    play_request_id,
836                    decoder,
837                    audio_item,
838                    normalisation_data,
839                    normalisation_factor,
840                    stream_loader_controller,
841                    duration_ms,
842                    bytes_per_second,
843                    stream_position_ms,
844                    reported_nominal_start_time: Instant::now()
845                        .checked_sub(Duration::from_millis(stream_position_ms as u64)),
846                    suggested_to_preload_next_track,
847                    is_explicit,
848                };
849            }
850            _ => {
851                error!("PlayerState::paused_to_playing in invalid state: {new_state:?}");
852                exit(1);
853            }
854        }
855    }
856
857    fn playing_to_paused(&mut self) {
858        use self::PlayerState::*;
859        let new_state = mem::replace(self, Invalid);
860        match new_state {
861            Playing {
862                track_id,
863                play_request_id,
864                decoder,
865                audio_item,
866                normalisation_data,
867                normalisation_factor,
868                stream_loader_controller,
869                duration_ms,
870                bytes_per_second,
871                stream_position_ms,
872                suggested_to_preload_next_track,
873                is_explicit,
874                ..
875            } => {
876                *self = Paused {
877                    track_id,
878                    play_request_id,
879                    decoder,
880                    audio_item,
881                    normalisation_data,
882                    normalisation_factor,
883                    stream_loader_controller,
884                    duration_ms,
885                    bytes_per_second,
886                    stream_position_ms,
887                    suggested_to_preload_next_track,
888                    is_explicit,
889                };
890            }
891            _ => {
892                error!("PlayerState::playing_to_paused in invalid state: {new_state:?}");
893                exit(1);
894            }
895        }
896    }
897}
898
899struct PlayerTrackLoader {
900    session: Session,
901    config: PlayerConfig,
902    local_file_lookup: Arc<LocalFileLookup>,
903}
904
905impl PlayerTrackLoader {
906    async fn find_available_alternative(&self, audio_item: AudioItem) -> Option<AudioItem> {
907        if let Err(e) = audio_item.availability {
908            error!("Track is unavailable: {e}");
909            None
910        } else if !audio_item.files.is_empty() {
911            Some(audio_item)
912        } else if let Some(alternatives) = audio_item.alternatives {
913            let Tracks(alternatives_vec) = alternatives; // required to make `into_iter` able to move
914
915            let alternatives: FuturesUnordered<_> = alternatives_vec
916                .into_iter()
917                .map(|alt_id| AudioItem::get_file(&self.session, alt_id))
918                .collect();
919
920            alternatives
921                .filter_map(|x| future::ready(x.ok()))
922                .filter(|x| future::ready(x.availability.is_ok()))
923                .next()
924                .await
925        } else {
926            error!("Track should be available, but no alternatives found.");
927            None
928        }
929    }
930
931    fn stream_data_rate(&self, format: AudioFileFormat) -> Option<usize> {
932        let kbps = match format {
933            AudioFileFormat::OGG_VORBIS_96 => 12.,
934            AudioFileFormat::OGG_VORBIS_160 => 20.,
935            AudioFileFormat::OGG_VORBIS_320 => 40.,
936            AudioFileFormat::MP3_256 => 32.,
937            AudioFileFormat::MP3_320 => 40.,
938            AudioFileFormat::MP3_160 => 20.,
939            AudioFileFormat::MP3_96 => 12.,
940            AudioFileFormat::MP3_160_ENC => 20.,
941            AudioFileFormat::AAC_24 => 3.,
942            AudioFileFormat::AAC_48 => 6.,
943            AudioFileFormat::AAC_160 => 20.,
944            AudioFileFormat::AAC_320 => 40.,
945            AudioFileFormat::MP4_128 => 16.,
946            AudioFileFormat::OTHER5 => 40.,
947            AudioFileFormat::FLAC_FLAC => 112., // assume 900 kbit/s on average
948            AudioFileFormat::XHE_AAC_12 => 1.5,
949            AudioFileFormat::XHE_AAC_16 => 2.,
950            AudioFileFormat::XHE_AAC_24 => 3.,
951            AudioFileFormat::FLAC_FLAC_24BIT => 3.,
952        };
953        let data_rate: f32 = kbps * 1024.;
954        Some(data_rate.ceil() as usize)
955    }
956
957    async fn load_track(
958        &self,
959        track_uri: SpotifyUri,
960        position_ms: u32,
961    ) -> Option<PlayerLoadedTrackData> {
962        match track_uri {
963            SpotifyUri::Track { .. } | SpotifyUri::Episode { .. } => {
964                self.load_remote_track(track_uri, position_ms).await
965            }
966            SpotifyUri::Local { .. } => self.load_local_track(track_uri, position_ms).await,
967            _ => {
968                error!("Cannot handle load of track with URI: <{track_uri}>",);
969                None
970            }
971        }
972    }
973
974    async fn load_remote_track(
975        &self,
976        track_uri: SpotifyUri,
977        position_ms: u32,
978    ) -> Option<PlayerLoadedTrackData> {
979        let track_id: SpotifyId = match (&track_uri).try_into() {
980            Ok(id) => id,
981            Err(_) => {
982                warn!("<{track_uri}> could not be converted to a base62 ID");
983                return None;
984            }
985        };
986
987        let audio_item = match AudioItem::get_file(&self.session, track_uri).await {
988            Ok(audio) => match self.find_available_alternative(audio).await {
989                Some(audio) => audio,
990                None => {
991                    warn!(
992                        "spotify:track:<{}> is not available",
993                        track_id.to_base62().unwrap_or_default()
994                    );
995                    return None;
996                }
997            },
998            Err(e) => {
999                error!("Unable to load audio item: {e:?}");
1000                return None;
1001            }
1002        };
1003
1004        info!(
1005            "Loading <{}> with Spotify URI <{}>",
1006            audio_item.name, audio_item.uri
1007        );
1008
1009        // (Most) podcasts seem to support only 96 kbps Ogg Vorbis, so fall back to it
1010        let formats = match self.config.bitrate {
1011            Bitrate::Bitrate96 => [
1012                AudioFileFormat::OGG_VORBIS_96,
1013                AudioFileFormat::MP3_96,
1014                AudioFileFormat::OGG_VORBIS_160,
1015                AudioFileFormat::MP3_160,
1016                AudioFileFormat::MP3_256,
1017                AudioFileFormat::OGG_VORBIS_320,
1018                AudioFileFormat::MP3_320,
1019            ],
1020            Bitrate::Bitrate160 => [
1021                AudioFileFormat::OGG_VORBIS_160,
1022                AudioFileFormat::MP3_160,
1023                AudioFileFormat::OGG_VORBIS_96,
1024                AudioFileFormat::MP3_96,
1025                AudioFileFormat::MP3_256,
1026                AudioFileFormat::OGG_VORBIS_320,
1027                AudioFileFormat::MP3_320,
1028            ],
1029            Bitrate::Bitrate320 => [
1030                AudioFileFormat::OGG_VORBIS_320,
1031                AudioFileFormat::MP3_320,
1032                AudioFileFormat::MP3_256,
1033                AudioFileFormat::OGG_VORBIS_160,
1034                AudioFileFormat::MP3_160,
1035                AudioFileFormat::OGG_VORBIS_96,
1036                AudioFileFormat::MP3_96,
1037            ],
1038        };
1039
1040        let (format, file_id) =
1041            match formats
1042                .iter()
1043                .find_map(|format| match audio_item.files.get(format) {
1044                    Some(&file_id) => Some((*format, file_id)),
1045                    _ => None,
1046                }) {
1047                Some(t) => t,
1048                None => {
1049                    warn!(
1050                        "<{}> is not available in any supported format",
1051                        audio_item.name
1052                    );
1053                    return None;
1054                }
1055            };
1056
1057        let bytes_per_second = self.stream_data_rate(format)?;
1058
1059        // This is only a loop to be able to reload the file if an error occurred
1060        // while opening a cached file.
1061        loop {
1062            let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second);
1063
1064            let encrypted_file = match encrypted_file.await {
1065                Ok(encrypted_file) => encrypted_file,
1066                Err(e) => {
1067                    error!("Unable to load encrypted file: {e:?}");
1068                    return None;
1069                }
1070            };
1071
1072            let is_cached = encrypted_file.is_cached();
1073
1074            let stream_loader_controller = encrypted_file.get_stream_loader_controller().ok()?;
1075
1076            // Not all audio files are encrypted. If we can't get a key, try loading the track
1077            // without decryption. If the file was encrypted after all, the decoder will fail
1078            // parsing and bail out, so we should be safe from outputting ear-piercing noise.
1079            let key = match self.session.audio_key().request(track_id, file_id).await {
1080                Ok(key) => Some(key),
1081                Err(e) => {
1082                    warn!("Unable to load key, continuing without decryption: {e}");
1083                    None
1084                }
1085            };
1086
1087            let mut decrypted_file = AudioDecrypt::new(key, encrypted_file);
1088
1089            let is_ogg_vorbis = AudioFiles::is_ogg_vorbis(format);
1090            let (offset, mut normalisation_data) = if is_ogg_vorbis {
1091                // Spotify stores normalisation data in a custom Ogg packet instead of Vorbis comments.
1092                let normalisation_data =
1093                    NormalisationData::parse_from_ogg(&mut decrypted_file).ok();
1094                (SPOTIFY_OGG_HEADER_END, normalisation_data)
1095            } else {
1096                (0, None)
1097            };
1098
1099            let audio_file = match Subfile::new(
1100                decrypted_file,
1101                offset,
1102                stream_loader_controller.len() as u64,
1103            ) {
1104                Ok(audio_file) => audio_file,
1105                Err(e) => {
1106                    error!("PlayerTrackLoader::load_track error opening subfile: {e}");
1107                    return None;
1108                }
1109            };
1110
1111            let mut symphonia_decoder = |audio_file, format| {
1112                SymphoniaDecoder::new(audio_file, format).map(|mut decoder| {
1113                    // For formats other that Vorbis, we'll try getting normalisation data from
1114                    // ReplayGain metadata fields, if present.
1115                    if normalisation_data.is_none() {
1116                        normalisation_data = decoder.normalisation_data();
1117                    }
1118                    Box::new(decoder) as Decoder
1119                })
1120            };
1121
1122            let mut hint = Hint::new();
1123            if let Some(mime_type) = AudioFiles::mime_type(format) {
1124                hint.mime_type(mime_type);
1125            }
1126
1127            #[cfg(feature = "passthrough-decoder")]
1128            let decoder_type = if self.config.passthrough {
1129                PassthroughDecoder::new(audio_file, format).map(|x| Box::new(x) as Decoder)
1130            } else {
1131                symphonia_decoder(audio_file, hint)
1132            };
1133
1134            #[cfg(not(feature = "passthrough-decoder"))]
1135            let decoder_type = { symphonia_decoder(audio_file, hint) };
1136
1137            let normalisation_data = normalisation_data.unwrap_or_else(|| {
1138                warn!("Unable to get normalisation data, continuing with defaults.");
1139                NormalisationData::default()
1140            });
1141
1142            let mut decoder = match decoder_type {
1143                Ok(decoder) => decoder,
1144                Err(e) if is_cached => {
1145                    warn!("Unable to read cached audio file: {e}. Trying to download it.");
1146
1147                    match self.session.cache() {
1148                        Some(cache) => {
1149                            if cache.remove_file(file_id).is_err() {
1150                                error!("Error removing file from cache");
1151                                return None;
1152                            }
1153                        }
1154                        None => {
1155                            error!("If the audio file is cached, a cache should exist");
1156                            return None;
1157                        }
1158                    }
1159
1160                    // Just try it again
1161                    continue;
1162                }
1163                Err(e) => {
1164                    error!("Unable to read audio file: {e}");
1165                    return None;
1166                }
1167            };
1168
1169            let duration_ms = audio_item.duration_ms;
1170            // Don't try to seek past the track's duration.
1171            // If the position is invalid just start from
1172            // the beginning of the track.
1173            let position_ms = if position_ms > duration_ms {
1174                warn!(
1175                    "Invalid start position of {position_ms} ms exceeds track's duration of {duration_ms} ms, starting track from the beginning"
1176                );
1177                0
1178            } else {
1179                position_ms
1180            };
1181
1182            // Ensure the starting position. Even when we want to play from the beginning,
1183            // the cursor may have been moved by parsing normalisation data. This may not
1184            // matter for playback (but won't hurt either), but may be useful for the
1185            // passthrough decoder.
1186            let stream_position_ms = match decoder.seek(position_ms) {
1187                Ok(new_position_ms) => new_position_ms,
1188                Err(e) => {
1189                    error!(
1190                        "PlayerTrackLoader::load_track error seeking to starting position {position_ms}: {e}"
1191                    );
1192                    return None;
1193                }
1194            };
1195
1196            // Ensure streaming mode now that we are ready to play from the requested position.
1197            stream_loader_controller.set_stream_mode();
1198
1199            let is_explicit = audio_item.is_explicit;
1200
1201            info!("<{}> ({} ms) loaded", audio_item.name, duration_ms);
1202
1203            return Some(PlayerLoadedTrackData {
1204                decoder,
1205                normalisation_data,
1206                stream_loader_controller,
1207                audio_item,
1208                bytes_per_second,
1209                duration_ms,
1210                stream_position_ms,
1211                is_explicit,
1212            });
1213        }
1214    }
1215
1216    async fn load_local_track(
1217        &self,
1218        track_uri: SpotifyUri,
1219        position_ms: u32,
1220    ) -> Option<PlayerLoadedTrackData> {
1221        info!("Loading local file with Spotify URI <{}>", track_uri);
1222
1223        let SpotifyUri::Local { duration, .. } = track_uri else {
1224            error!("Unable to determine track duration for local file: not a local file URI");
1225            return None;
1226        };
1227
1228        let entry = self.local_file_lookup.get(&track_uri);
1229
1230        let Some(path) = entry else {
1231            error!("Unable to find file path for local file <{track_uri}>");
1232            return None;
1233        };
1234
1235        let src = match File::open(path) {
1236            Ok(src) => src,
1237            Err(e) => {
1238                error!("Failed to open local file: {e}");
1239                return None;
1240            }
1241        };
1242
1243        let mut hint = Hint::new();
1244        if let Some(file_extension) = path.extension().and_then(|e| e.to_str()) {
1245            hint.with_extension(file_extension);
1246        }
1247
1248        let decoder = match SymphoniaDecoder::new(src, hint) {
1249            Ok(decoder) => decoder,
1250            Err(e) => {
1251                error!("Error decoding local file: {e}");
1252                return None;
1253            }
1254        };
1255
1256        let mut decoder = Box::new(decoder);
1257        let normalisation_data = decoder.normalisation_data().unwrap_or_else(|| {
1258            warn!("Unable to get normalisation data, continuing with defaults.");
1259            NormalisationData::default()
1260        });
1261
1262        let local_file_metadata = decoder.local_file_metadata().unwrap_or_default();
1263
1264        let stream_position_ms = match decoder.seek(position_ms) {
1265            Ok(new_position_ms) => new_position_ms,
1266            Err(e) => {
1267                error!(
1268                    "PlayerTrackLoader::load_local_track error seeking to starting position {position_ms}: {e}"
1269                );
1270                return None;
1271            }
1272        };
1273
1274        let file_size = fs::metadata(path).ok()?.len();
1275        let bytes_per_second = (file_size / duration.as_secs()) as usize;
1276
1277        let stream_loader_controller = StreamLoaderController::from_local_file(file_size);
1278
1279        let name = local_file_metadata.name.unwrap_or_default();
1280
1281        info!("Loaded <{name}> from path <{}>", path.display());
1282
1283        Some(PlayerLoadedTrackData {
1284            decoder,
1285            normalisation_data,
1286            stream_loader_controller,
1287            bytes_per_second,
1288            duration_ms: duration.as_millis() as u32,
1289            stream_position_ms,
1290            is_explicit: false,
1291            audio_item: AudioItem {
1292                duration_ms: duration.as_millis() as u32,
1293                uri: track_uri.to_uri().unwrap_or_default(),
1294                track_id: track_uri,
1295                files: Default::default(),
1296                name,
1297                // We can't get a CoverImage.URL for the track image, applications will have to parse the file metadata themselves using unique_fields.path
1298                covers: vec![],
1299                language: local_file_metadata
1300                    .language
1301                    .map(|val| vec![val])
1302                    .unwrap_or_default(),
1303                is_explicit: false,
1304                availability: Ok(()),
1305                alternatives: None,
1306                unique_fields: UniqueFields::Local {
1307                    artists: local_file_metadata.artists,
1308                    album: local_file_metadata.album,
1309                    album_artists: local_file_metadata.album_artists,
1310                    number: local_file_metadata.number,
1311                    disc_number: local_file_metadata.disc_number,
1312                    path: path.to_path_buf(),
1313                },
1314            },
1315        })
1316    }
1317}
1318
1319impl Future for PlayerInternal {
1320    type Output = ();
1321
1322    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1323        // While this is written as a future, it still contains blocking code.
1324        // It must be run on its own thread.
1325        let passthrough = self.config.passthrough;
1326
1327        loop {
1328            let mut all_futures_completed_or_not_ready = true;
1329
1330            // process commands that were sent to us
1331            let cmd = match self.commands.poll_recv(cx) {
1332                Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
1333                Poll::Ready(Some(cmd)) => {
1334                    all_futures_completed_or_not_ready = false;
1335                    Some(cmd)
1336                }
1337                _ => None,
1338            };
1339
1340            if let Some(cmd) = cmd {
1341                if let Err(e) = self.handle_command(cmd) {
1342                    error!("Error handling command: {e}");
1343                }
1344            }
1345
1346            // Handle loading of a new track to play
1347            if let PlayerState::Loading {
1348                ref mut loader,
1349                ref track_id,
1350                start_playback,
1351                play_request_id,
1352            } = self.state
1353            {
1354                // The loader may be terminated if we are trying to load the same track
1355                // as before, and that track failed to open before.
1356                let track_id = track_id.clone();
1357
1358                if !loader.as_mut().is_terminated() {
1359                    match loader.as_mut().poll(cx) {
1360                        Poll::Ready(Ok(loaded_track)) => {
1361                            self.start_playback(
1362                                track_id,
1363                                play_request_id,
1364                                loaded_track,
1365                                start_playback,
1366                            );
1367                            if let PlayerState::Loading { .. } = self.state {
1368                                error!("The state wasn't changed by start_playback()");
1369                                exit(1);
1370                            }
1371                        }
1372                        Poll::Ready(Err(e)) => {
1373                            error!(
1374                                "Skipping to next track, unable to load track <{track_id:?}>: {e:?}"
1375                            );
1376                            self.send_event(PlayerEvent::Unavailable {
1377                                track_id,
1378                                play_request_id,
1379                            })
1380                        }
1381                        Poll::Pending => (),
1382                    }
1383                }
1384            }
1385
1386            // handle pending preload requests.
1387            if let PlayerPreload::Loading {
1388                ref mut loader,
1389                ref track_id,
1390            } = self.preload
1391            {
1392                let track_id = track_id.clone();
1393                match loader.as_mut().poll(cx) {
1394                    Poll::Ready(Ok(loaded_track)) => {
1395                        self.send_event(PlayerEvent::Preloading {
1396                            track_id: track_id.clone(),
1397                        });
1398                        self.preload = PlayerPreload::Ready {
1399                            track_id,
1400                            loaded_track: Box::new(loaded_track),
1401                        };
1402                    }
1403                    Poll::Ready(Err(_)) => {
1404                        debug!("Unable to preload {track_id:?}");
1405                        self.preload = PlayerPreload::None;
1406                        // Let Spirc know that the track was unavailable.
1407                        if let PlayerState::Playing {
1408                            play_request_id, ..
1409                        }
1410                        | PlayerState::Paused {
1411                            play_request_id, ..
1412                        } = self.state
1413                        {
1414                            self.send_event(PlayerEvent::Unavailable {
1415                                track_id,
1416                                play_request_id,
1417                            });
1418                        }
1419                    }
1420                    Poll::Pending => (),
1421                }
1422            }
1423
1424            if self.state.is_playing() {
1425                self.ensure_sink_running();
1426
1427                if let PlayerState::Playing {
1428                    ref track_id,
1429                    play_request_id,
1430                    ref mut decoder,
1431                    normalisation_factor,
1432                    ref mut stream_position_ms,
1433                    ref mut reported_nominal_start_time,
1434                    ..
1435                } = self.state
1436                {
1437                    let track_id = track_id.clone();
1438                    match decoder.next_packet() {
1439                        Ok(result) => {
1440                            if let Some((ref packet_position, ref packet)) = result {
1441                                let new_stream_position_ms = packet_position.position_ms;
1442                                let expected_position_ms = std::mem::replace(
1443                                    &mut *stream_position_ms,
1444                                    new_stream_position_ms,
1445                                );
1446
1447                                if !passthrough {
1448                                    match packet.samples() {
1449                                        Ok(_) => {
1450                                            let new_stream_position = Duration::from_millis(
1451                                                new_stream_position_ms as u64,
1452                                            );
1453
1454                                            let now = Instant::now();
1455
1456                                            // Only notify if we're skipped some packets *or* we are behind.
1457                                            // If we're ahead it's probably due to a buffer of the backend
1458                                            // and we're actually in time.
1459                                            let notify_about_position =
1460                                                match *reported_nominal_start_time {
1461                                                    None => true,
1462                                                    Some(reported_nominal_start_time) => {
1463                                                        let mut notify = false;
1464
1465                                                        if packet_position.skipped {
1466                                                            if let Some(ahead) = new_stream_position
1467                                                                .checked_sub(Duration::from_millis(
1468                                                                    expected_position_ms as u64,
1469                                                                ))
1470                                                            {
1471                                                                notify |=
1472                                                                    ahead >= Duration::from_secs(1)
1473                                                            }
1474                                                        }
1475
1476                                                        if let Some(lag) = now
1477                                                            .checked_duration_since(
1478                                                                reported_nominal_start_time,
1479                                                            )
1480                                                        {
1481                                                            if let Some(lag) =
1482                                                                lag.checked_sub(new_stream_position)
1483                                                            {
1484                                                                notify |=
1485                                                                    lag >= Duration::from_secs(1)
1486                                                            }
1487                                                        }
1488
1489                                                        notify
1490                                                    }
1491                                                };
1492
1493                                            if notify_about_position {
1494                                                *reported_nominal_start_time =
1495                                                    now.checked_sub(new_stream_position);
1496                                                self.send_event(PlayerEvent::PositionCorrection {
1497                                                    play_request_id,
1498                                                    track_id: track_id.clone(),
1499                                                    position_ms: new_stream_position_ms,
1500                                                });
1501                                            }
1502
1503                                            if let Some(interval) =
1504                                                self.config.position_update_interval
1505                                            {
1506                                                let last_progress_update_since_ms =
1507                                                    now.duration_since(self.last_progress_update);
1508
1509                                                if last_progress_update_since_ms > interval {
1510                                                    self.last_progress_update = now;
1511                                                    self.send_event(PlayerEvent::PositionChanged {
1512                                                        play_request_id,
1513                                                        track_id,
1514                                                        position_ms: new_stream_position_ms,
1515                                                    });
1516                                                }
1517                                            }
1518                                        }
1519                                        Err(e) => {
1520                                            error!(
1521                                                "Skipping to next track, unable to decode samples for track <{track_id:?}>: {e:?}"
1522                                            );
1523                                            self.send_event(PlayerEvent::EndOfTrack {
1524                                                track_id,
1525                                                play_request_id,
1526                                            })
1527                                        }
1528                                    }
1529                                }
1530                            }
1531
1532                            self.handle_packet(result, normalisation_factor);
1533                        }
1534                        Err(e) => {
1535                            error!(
1536                                "Skipping to next track, unable to get next packet for track <{track_id:?}>: {e:?}"
1537                            );
1538                            self.send_event(PlayerEvent::EndOfTrack {
1539                                track_id,
1540                                play_request_id,
1541                            })
1542                        }
1543                    }
1544                } else {
1545                    error!("PlayerInternal poll: Invalid PlayerState");
1546                    exit(1);
1547                };
1548            }
1549
1550            if let PlayerState::Playing {
1551                ref track_id,
1552                play_request_id,
1553                duration_ms,
1554                stream_position_ms,
1555                ref mut stream_loader_controller,
1556                ref mut suggested_to_preload_next_track,
1557                ..
1558            }
1559            | PlayerState::Paused {
1560                ref track_id,
1561                play_request_id,
1562                duration_ms,
1563                stream_position_ms,
1564                ref mut stream_loader_controller,
1565                ref mut suggested_to_preload_next_track,
1566                ..
1567            } = self.state
1568            {
1569                let track_id = track_id.clone();
1570
1571                if (!*suggested_to_preload_next_track)
1572                    && ((duration_ms as i64 - stream_position_ms as i64)
1573                        < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64)
1574                    && stream_loader_controller.range_to_end_available()
1575                {
1576                    *suggested_to_preload_next_track = true;
1577                    self.send_event(PlayerEvent::TimeToPreloadNextTrack {
1578                        track_id,
1579                        play_request_id,
1580                    });
1581                }
1582            }
1583
1584            if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
1585                return Poll::Pending;
1586            }
1587        }
1588    }
1589}
1590
1591impl PlayerInternal {
1592    fn ensure_sink_running(&mut self) {
1593        if self.sink_status != SinkStatus::Running {
1594            trace!("== Starting sink ==");
1595            if let Some(callback) = &mut self.sink_event_callback {
1596                callback(SinkStatus::Running);
1597            }
1598            match self.sink.start() {
1599                Ok(()) => self.sink_status = SinkStatus::Running,
1600                Err(e) => {
1601                    error!("{e}");
1602                    self.handle_pause();
1603                }
1604            }
1605        }
1606    }
1607
1608    fn ensure_sink_stopped(&mut self, temporarily: bool) {
1609        match self.sink_status {
1610            SinkStatus::Running => {
1611                trace!("== Stopping sink ==");
1612                match self.sink.stop() {
1613                    Ok(()) => {
1614                        self.sink_status = if temporarily {
1615                            SinkStatus::TemporarilyClosed
1616                        } else {
1617                            SinkStatus::Closed
1618                        };
1619                        if let Some(callback) = &mut self.sink_event_callback {
1620                            callback(self.sink_status);
1621                        }
1622                    }
1623                    Err(e) => {
1624                        error!("{e}");
1625                        exit(1);
1626                    }
1627                }
1628            }
1629            SinkStatus::TemporarilyClosed => {
1630                if !temporarily {
1631                    self.sink_status = SinkStatus::Closed;
1632                    if let Some(callback) = &mut self.sink_event_callback {
1633                        callback(SinkStatus::Closed);
1634                    }
1635                }
1636            }
1637            SinkStatus::Closed => (),
1638        }
1639    }
1640
1641    fn handle_player_stop(&mut self) {
1642        match self.state {
1643            PlayerState::Playing {
1644                ref track_id,
1645                play_request_id,
1646                ..
1647            }
1648            | PlayerState::Paused {
1649                ref track_id,
1650                play_request_id,
1651                ..
1652            }
1653            | PlayerState::EndOfTrack {
1654                ref track_id,
1655                play_request_id,
1656                ..
1657            }
1658            | PlayerState::Loading {
1659                ref track_id,
1660                play_request_id,
1661                ..
1662            } => {
1663                let track_id = track_id.clone();
1664
1665                self.ensure_sink_stopped(false);
1666                self.send_event(PlayerEvent::Stopped {
1667                    track_id,
1668                    play_request_id,
1669                });
1670                self.state = PlayerState::Stopped;
1671            }
1672            PlayerState::Stopped => (),
1673            PlayerState::Invalid => {
1674                error!("PlayerInternal::handle_player_stop in invalid state");
1675                exit(1);
1676            }
1677        }
1678    }
1679
1680    fn handle_play(&mut self) {
1681        match self.state {
1682            PlayerState::Paused {
1683                ref track_id,
1684                play_request_id,
1685                stream_position_ms,
1686                ..
1687            } => {
1688                let track_id = track_id.clone();
1689
1690                self.state.paused_to_playing();
1691                self.send_event(PlayerEvent::Playing {
1692                    track_id,
1693                    play_request_id,
1694                    position_ms: stream_position_ms,
1695                });
1696                self.ensure_sink_running();
1697            }
1698            PlayerState::Loading {
1699                ref mut start_playback,
1700                ..
1701            } => {
1702                *start_playback = true;
1703            }
1704            _ => error!("Player::play called from invalid state: {:?}", self.state),
1705        }
1706    }
1707
1708    fn handle_pause(&mut self) {
1709        match self.state {
1710            PlayerState::Paused { .. } => self.ensure_sink_stopped(false),
1711            PlayerState::Playing {
1712                ref track_id,
1713                play_request_id,
1714                stream_position_ms,
1715                ..
1716            } => {
1717                let track_id = track_id.clone();
1718
1719                self.state.playing_to_paused();
1720
1721                self.ensure_sink_stopped(false);
1722                self.send_event(PlayerEvent::Paused {
1723                    track_id,
1724                    play_request_id,
1725                    position_ms: stream_position_ms,
1726                });
1727            }
1728            PlayerState::Loading {
1729                ref mut start_playback,
1730                ..
1731            } => {
1732                *start_playback = false;
1733            }
1734            _ => error!("Player::pause called from invalid state: {:?}", self.state),
1735        }
1736    }
1737
1738    fn handle_packet(
1739        &mut self,
1740        packet: Option<(AudioPacketPosition, AudioPacket)>,
1741        normalisation_factor: f64,
1742    ) {
1743        match packet {
1744            Some((_, mut packet)) => {
1745                if !packet.is_empty() {
1746                    if let AudioPacket::Samples(ref mut data) = packet {
1747                        // Get the volume for the packet. In the case of hardware volume control
1748                        // this will always be 1.0 (no change).
1749                        let volume = self.volume_getter.attenuation_factor();
1750
1751                        // For the basic normalisation method, a normalisation factor of 1.0
1752                        // indicates that there is nothing to normalise (all samples should pass
1753                        // unaltered). For the dynamic method, there may still be peaks that we
1754                        // want to shave off.
1755                        //
1756                        // No matter the case we apply volume attenuation last if there is any.
1757                        match (self.config.normalisation, self.config.normalisation_method) {
1758                            (false, _) => {
1759                                if volume < 1.0 {
1760                                    for sample in data.iter_mut() {
1761                                        *sample *= volume;
1762                                    }
1763                                }
1764                            }
1765                            (true, NormalisationMethod::Dynamic) => {
1766                                // zero-cost shorthands
1767                                let threshold_db = self.config.normalisation_threshold_dbfs;
1768                                let knee_db = self.config.normalisation_knee_db;
1769                                let attack_cf = self.config.normalisation_attack_cf;
1770                                let release_cf = self.config.normalisation_release_cf;
1771
1772                                for sample in data.iter_mut() {
1773                                    // Feedforward limiter in the log domain
1774                                    // After: Giannoulis, D., Massberg, M., & Reiss, J.D. (2012).
1775                                    // Digital Dynamic Range Compressor Design—A Tutorial and
1776                                    // Analysis. Journal of The Audio Engineering Society, 60,
1777                                    // 399-408.
1778
1779                                    // This implementation assumes audio is stereo.
1780
1781                                    // step 0: apply gain stage
1782                                    *sample *= normalisation_factor;
1783
1784                                    // step 1-4: half-wave rectification and conversion into dB, and
1785                                    // gain computer with soft knee and subtractor
1786                                    let limiter_db = {
1787                                        // Add slight DC offset. Some samples are silence, which is
1788                                        // -inf dB and gets the limiter stuck. Adding a small
1789                                        // positive offset prevents this.
1790                                        *sample += f64::MIN_POSITIVE;
1791
1792                                        let bias_db = ratio_to_db(sample.abs()) - threshold_db;
1793                                        let knee_boundary_db = bias_db * 2.0;
1794                                        if knee_boundary_db < -knee_db {
1795                                            0.0
1796                                        } else if knee_boundary_db.abs() <= knee_db {
1797                                            let term = knee_boundary_db + knee_db;
1798                                            term * term * self.normalisation_knee_factor
1799                                        } else {
1800                                            bias_db
1801                                        }
1802                                    };
1803
1804                                    // track left/right channel
1805                                    let channel = self.normalisation_channel;
1806                                    self.normalisation_channel ^= 1;
1807
1808                                    // step 5: smooth, decoupled peak detector for each channel
1809                                    // Use direct references to reduce repeated array indexing
1810                                    let integrator = &mut self.normalisation_integrators[channel];
1811                                    let peak = &mut self.normalisation_peaks[channel];
1812
1813                                    *integrator = f64::max(
1814                                        limiter_db,
1815                                        release_cf * *integrator + (1.0 - release_cf) * limiter_db,
1816                                    );
1817                                    *peak = attack_cf * *peak + (1.0 - attack_cf) * *integrator;
1818
1819                                    // steps 6-8: conversion into level and multiplication into gain
1820                                    // stage. Find maximum peak across both channels to couple the
1821                                    // gain and maintain stereo imaging.
1822                                    let max_peak = f64::max(
1823                                        self.normalisation_peaks[0],
1824                                        self.normalisation_peaks[1],
1825                                    );
1826                                    *sample *= db_to_ratio(-max_peak) * volume;
1827                                }
1828                            }
1829                            (true, NormalisationMethod::Basic) => {
1830                                if normalisation_factor < 1.0 || volume < 1.0 {
1831                                    for sample in data.iter_mut() {
1832                                        *sample *= normalisation_factor * volume;
1833                                    }
1834                                }
1835                            }
1836                        }
1837                    }
1838
1839                    if let Err(e) = self.sink.write(packet, &mut self.converter) {
1840                        error!("{e}");
1841                        self.handle_pause();
1842                    }
1843                }
1844            }
1845
1846            None => {
1847                self.state.playing_to_end_of_track();
1848                if let PlayerState::EndOfTrack {
1849                    ref track_id,
1850                    play_request_id,
1851                    ..
1852                } = self.state
1853                {
1854                    self.send_event(PlayerEvent::EndOfTrack {
1855                        track_id: track_id.clone(),
1856                        play_request_id,
1857                    })
1858                } else {
1859                    error!("PlayerInternal handle_packet: Invalid PlayerState");
1860                    exit(1);
1861                }
1862            }
1863        }
1864    }
1865
1866    fn start_playback(
1867        &mut self,
1868        track_id: SpotifyUri,
1869        play_request_id: u64,
1870        loaded_track: PlayerLoadedTrackData,
1871        start_playback: bool,
1872    ) {
1873        let audio_item = Box::new(loaded_track.audio_item.clone());
1874
1875        self.send_event(PlayerEvent::TrackChanged { audio_item });
1876
1877        let position_ms = loaded_track.stream_position_ms;
1878
1879        let mut config = self.config.clone();
1880        if config.normalisation_type == NormalisationType::Auto {
1881            if self.auto_normalise_as_album {
1882                config.normalisation_type = NormalisationType::Album;
1883            } else {
1884                config.normalisation_type = NormalisationType::Track;
1885            }
1886        };
1887        let normalisation_factor =
1888            NormalisationData::get_factor(&config, loaded_track.normalisation_data);
1889
1890        if start_playback {
1891            self.ensure_sink_running();
1892            self.send_event(PlayerEvent::Playing {
1893                track_id: track_id.clone(),
1894                play_request_id,
1895                position_ms,
1896            });
1897
1898            self.state = PlayerState::Playing {
1899                track_id,
1900                play_request_id,
1901                decoder: loaded_track.decoder,
1902                audio_item: loaded_track.audio_item,
1903                normalisation_data: loaded_track.normalisation_data,
1904                normalisation_factor,
1905                stream_loader_controller: loaded_track.stream_loader_controller,
1906                duration_ms: loaded_track.duration_ms,
1907                bytes_per_second: loaded_track.bytes_per_second,
1908                stream_position_ms: loaded_track.stream_position_ms,
1909                reported_nominal_start_time: Instant::now()
1910                    .checked_sub(Duration::from_millis(position_ms as u64)),
1911                suggested_to_preload_next_track: false,
1912                is_explicit: loaded_track.is_explicit,
1913            };
1914        } else {
1915            self.ensure_sink_stopped(false);
1916
1917            self.state = PlayerState::Paused {
1918                track_id: track_id.clone(),
1919                play_request_id,
1920                decoder: loaded_track.decoder,
1921                audio_item: loaded_track.audio_item,
1922                normalisation_data: loaded_track.normalisation_data,
1923                normalisation_factor,
1924                stream_loader_controller: loaded_track.stream_loader_controller,
1925                duration_ms: loaded_track.duration_ms,
1926                bytes_per_second: loaded_track.bytes_per_second,
1927                stream_position_ms: loaded_track.stream_position_ms,
1928                suggested_to_preload_next_track: false,
1929                is_explicit: loaded_track.is_explicit,
1930            };
1931
1932            self.send_event(PlayerEvent::Paused {
1933                track_id,
1934                play_request_id,
1935                position_ms,
1936            });
1937        }
1938    }
1939
1940    fn handle_command_load(
1941        &mut self,
1942        track_id: SpotifyUri,
1943        play_request_id_option: Option<u64>,
1944        play: bool,
1945        position_ms: u32,
1946    ) -> PlayerResult {
1947        let play_request_id =
1948            play_request_id_option.unwrap_or(self.play_request_id_generator.get());
1949
1950        self.send_event(PlayerEvent::PlayRequestIdChanged { play_request_id });
1951
1952        if !self.config.gapless {
1953            self.ensure_sink_stopped(play);
1954        }
1955
1956        if matches!(self.state, PlayerState::Invalid) {
1957            return Err(Error::internal(format!(
1958                "Player::handle_command_load called from invalid state: {:?}",
1959                self.state
1960            )));
1961        }
1962
1963        // Now we check at different positions whether we already have a pre-loaded version
1964        // of this track somewhere. If so, use it and return.
1965
1966        // Check if there's a matching loaded track in the EndOfTrack player state.
1967        // This is the case if we're repeating the same track again.
1968        if let PlayerState::EndOfTrack {
1969            track_id: previous_track_id,
1970            ..
1971        } = &self.state
1972        {
1973            if *previous_track_id == track_id {
1974                let mut loaded_track = match mem::replace(&mut self.state, PlayerState::Invalid) {
1975                    PlayerState::EndOfTrack { loaded_track, .. } => loaded_track,
1976                    _ => {
1977                        return Err(Error::internal(format!(
1978                            "PlayerInternal::handle_command_load repeating the same track: invalid state: {:?}",
1979                            self.state
1980                        )));
1981                    }
1982                };
1983
1984                if position_ms != loaded_track.stream_position_ms {
1985                    // This may be blocking.
1986                    loaded_track.stream_position_ms = loaded_track.decoder.seek(position_ms)?;
1987                }
1988                self.preload = PlayerPreload::None;
1989                self.start_playback(track_id, play_request_id, loaded_track, play);
1990                if let PlayerState::Invalid = self.state {
1991                    return Err(Error::internal(format!(
1992                        "PlayerInternal::handle_command_load repeating the same track: start_playback() did not transition to valid player state: {:?}",
1993                        self.state
1994                    )));
1995                }
1996                return Ok(());
1997            }
1998        }
1999
2000        // Check if we are already playing the track. If so, just do a seek and update our info.
2001        if let PlayerState::Playing {
2002            track_id: ref current_track_id,
2003            ref mut stream_position_ms,
2004            ref mut decoder,
2005            ..
2006        }
2007        | PlayerState::Paused {
2008            track_id: ref current_track_id,
2009            ref mut stream_position_ms,
2010            ref mut decoder,
2011            ..
2012        } = self.state
2013        {
2014            if *current_track_id == track_id {
2015                // we can use the current decoder. Ensure it's at the correct position.
2016                if position_ms != *stream_position_ms {
2017                    // This may be blocking.
2018                    *stream_position_ms = decoder.seek(position_ms)?;
2019                }
2020
2021                // Move the info from the current state into a PlayerLoadedTrackData so we can use
2022                // the usual code path to start playback.
2023                let old_state = mem::replace(&mut self.state, PlayerState::Invalid);
2024
2025                if let PlayerState::Playing {
2026                    stream_position_ms,
2027                    decoder,
2028                    audio_item,
2029                    stream_loader_controller,
2030                    bytes_per_second,
2031                    duration_ms,
2032                    normalisation_data,
2033                    is_explicit,
2034                    ..
2035                }
2036                | PlayerState::Paused {
2037                    stream_position_ms,
2038                    decoder,
2039                    audio_item,
2040                    stream_loader_controller,
2041                    bytes_per_second,
2042                    duration_ms,
2043                    normalisation_data,
2044                    is_explicit,
2045                    ..
2046                } = old_state
2047                {
2048                    let loaded_track = PlayerLoadedTrackData {
2049                        decoder,
2050                        normalisation_data,
2051                        stream_loader_controller,
2052                        audio_item,
2053                        bytes_per_second,
2054                        duration_ms,
2055                        stream_position_ms,
2056                        is_explicit,
2057                    };
2058
2059                    self.preload = PlayerPreload::None;
2060                    self.start_playback(track_id, play_request_id, loaded_track, play);
2061
2062                    if let PlayerState::Invalid = self.state {
2063                        return Err(Error::internal(format!(
2064                            "PlayerInternal::handle_command_load already playing this track: start_playback() did not transition to valid player state: {:?}",
2065                            self.state
2066                        )));
2067                    }
2068
2069                    return Ok(());
2070                } else {
2071                    return Err(Error::internal(format!(
2072                        "PlayerInternal::handle_command_load already playing this track: invalid state: {:?}",
2073                        self.state
2074                    )));
2075                }
2076            }
2077        }
2078
2079        // Check if the requested track has been preloaded already. If so use the preloaded data.
2080        if let PlayerPreload::Ready {
2081            track_id: loaded_track_id,
2082            ..
2083        } = &self.preload
2084        {
2085            if track_id == *loaded_track_id {
2086                let preload = std::mem::replace(&mut self.preload, PlayerPreload::None);
2087                if let PlayerPreload::Ready {
2088                    track_id,
2089                    mut loaded_track,
2090                } = preload
2091                {
2092                    if position_ms != loaded_track.stream_position_ms {
2093                        // This may be blocking
2094                        loaded_track.stream_position_ms = loaded_track.decoder.seek(position_ms)?;
2095                    }
2096                    self.start_playback(track_id, play_request_id, *loaded_track, play);
2097                    return Ok(());
2098                } else {
2099                    return Err(Error::internal(format!(
2100                        "PlayerInternal::handle_command_loading preloaded track: invalid state: {:?}",
2101                        self.state
2102                    )));
2103                }
2104            }
2105        }
2106
2107        self.send_event(PlayerEvent::Loading {
2108            track_id: track_id.clone(),
2109            play_request_id,
2110            position_ms,
2111        });
2112
2113        // Try to extract a pending loader from the preloading mechanism
2114        let loader = if let PlayerPreload::Loading {
2115            track_id: loaded_track_id,
2116            ..
2117        } = &self.preload
2118        {
2119            if (track_id == *loaded_track_id) && (position_ms == 0) {
2120                let mut preload = PlayerPreload::None;
2121                std::mem::swap(&mut preload, &mut self.preload);
2122                if let PlayerPreload::Loading { loader, .. } = preload {
2123                    Some(loader)
2124                } else {
2125                    None
2126                }
2127            } else {
2128                None
2129            }
2130        } else {
2131            None
2132        };
2133
2134        self.preload = PlayerPreload::None;
2135
2136        // If we don't have a loader yet, create one from scratch.
2137        let loader =
2138            loader.unwrap_or_else(|| Box::pin(self.load_track(track_id.clone(), position_ms)));
2139
2140        // Set ourselves to a loading state.
2141        self.state = PlayerState::Loading {
2142            track_id,
2143            play_request_id,
2144            start_playback: play,
2145            loader,
2146        };
2147
2148        Ok(())
2149    }
2150
2151    fn handle_command_preload(&mut self, track_id: SpotifyUri) {
2152        debug!("Preloading track");
2153        let mut preload_track = true;
2154        // check whether the track is already loaded somewhere or being loaded.
2155        if let PlayerPreload::Loading {
2156            track_id: currently_loading,
2157            ..
2158        }
2159        | PlayerPreload::Ready {
2160            track_id: currently_loading,
2161            ..
2162        } = &self.preload
2163        {
2164            if *currently_loading == track_id {
2165                // we're already preloading the requested track.
2166                preload_track = false;
2167            } else {
2168                // we're preloading something else - cancel it.
2169                self.preload = PlayerPreload::None;
2170            }
2171        }
2172
2173        if let PlayerState::Playing {
2174            track_id: current_track_id,
2175            ..
2176        }
2177        | PlayerState::Paused {
2178            track_id: current_track_id,
2179            ..
2180        }
2181        | PlayerState::EndOfTrack {
2182            track_id: current_track_id,
2183            ..
2184        } = &self.state
2185        {
2186            if *current_track_id == track_id {
2187                // we already have the requested track loaded.
2188                preload_track = false;
2189            }
2190        }
2191
2192        // schedule the preload of the current track if desired.
2193        if preload_track {
2194            let loader = self.load_track(track_id.clone(), 0);
2195            self.preload = PlayerPreload::Loading {
2196                track_id,
2197                loader: Box::pin(loader),
2198            }
2199        }
2200    }
2201
2202    fn handle_command_seek(&mut self, position_ms: u32) -> PlayerResult {
2203        // When we are still loading, the user may immediately ask to
2204        // seek to another position yet the decoder won't be ready for
2205        // that. In this case just restart the loading process but
2206        // with the requested position.
2207        if let PlayerState::Loading {
2208            ref track_id,
2209            play_request_id,
2210            start_playback,
2211            ..
2212        } = self.state
2213        {
2214            return self.handle_command_load(
2215                track_id.clone(),
2216                Some(play_request_id),
2217                start_playback,
2218                position_ms,
2219            );
2220        }
2221
2222        if let Some(decoder) = self.state.decoder() {
2223            match decoder.seek(position_ms) {
2224                Ok(new_position_ms) => {
2225                    if let PlayerState::Playing {
2226                        ref mut stream_position_ms,
2227                        ref track_id,
2228                        play_request_id,
2229                        ..
2230                    }
2231                    | PlayerState::Paused {
2232                        ref mut stream_position_ms,
2233                        ref track_id,
2234                        play_request_id,
2235                        ..
2236                    } = self.state
2237                    {
2238                        *stream_position_ms = new_position_ms;
2239
2240                        self.send_event(PlayerEvent::Seeked {
2241                            play_request_id,
2242                            track_id: track_id.clone(),
2243                            position_ms: new_position_ms,
2244                        });
2245                    }
2246                }
2247                Err(e) => error!("PlayerInternal::handle_command_seek error: {e}"),
2248            }
2249        } else {
2250            error!("Player::seek called from invalid state: {:?}", self.state);
2251        }
2252
2253        // ensure we have a bit of a buffer of downloaded data
2254        self.preload_data_before_playback()?;
2255
2256        if let PlayerState::Playing {
2257            ref mut reported_nominal_start_time,
2258            ..
2259        } = self.state
2260        {
2261            *reported_nominal_start_time =
2262                Instant::now().checked_sub(Duration::from_millis(position_ms as u64));
2263        }
2264
2265        Ok(())
2266    }
2267
2268    fn handle_command(&mut self, cmd: PlayerCommand) -> PlayerResult {
2269        debug!("command={cmd:?}");
2270        match cmd {
2271            PlayerCommand::Load {
2272                track_id,
2273                play,
2274                position_ms,
2275            } => self.handle_command_load(track_id, None, play, position_ms)?,
2276
2277            PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id),
2278
2279            PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms)?,
2280
2281            PlayerCommand::Play => self.handle_play(),
2282
2283            PlayerCommand::Pause => self.handle_pause(),
2284
2285            PlayerCommand::Stop => self.handle_player_stop(),
2286
2287            PlayerCommand::SetSession(session) => self.session = session,
2288
2289            PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),
2290
2291            PlayerCommand::SetSinkEventCallback(callback) => self.sink_event_callback = callback,
2292
2293            PlayerCommand::EmitVolumeChangedEvent(volume) => {
2294                self.send_event(PlayerEvent::VolumeChanged { volume })
2295            }
2296
2297            PlayerCommand::EmitRepeatChangedEvent { context, track } => {
2298                self.send_event(PlayerEvent::RepeatChanged { context, track })
2299            }
2300
2301            PlayerCommand::EmitShuffleChangedEvent(shuffle) => {
2302                self.send_event(PlayerEvent::ShuffleChanged { shuffle })
2303            }
2304
2305            PlayerCommand::EmitAutoPlayChangedEvent(auto_play) => {
2306                self.send_event(PlayerEvent::AutoPlayChanged { auto_play })
2307            }
2308
2309            PlayerCommand::EmitSessionClientChangedEvent {
2310                client_id,
2311                client_name,
2312                client_brand_name,
2313                client_model_name,
2314            } => self.send_event(PlayerEvent::SessionClientChanged {
2315                client_id,
2316                client_name,
2317                client_brand_name,
2318                client_model_name,
2319            }),
2320
2321            PlayerCommand::EmitSessionConnectedEvent {
2322                connection_id,
2323                user_name,
2324            } => self.send_event(PlayerEvent::SessionConnected {
2325                connection_id,
2326                user_name,
2327            }),
2328
2329            PlayerCommand::EmitSessionDisconnectedEvent {
2330                connection_id,
2331                user_name,
2332            } => self.send_event(PlayerEvent::SessionDisconnected {
2333                connection_id,
2334                user_name,
2335            }),
2336
2337            PlayerCommand::SetAutoNormaliseAsAlbum(setting) => {
2338                self.auto_normalise_as_album = setting
2339            }
2340
2341            PlayerCommand::EmitFilterExplicitContentChangedEvent(filter) => {
2342                self.send_event(PlayerEvent::FilterExplicitContentChanged { filter });
2343
2344                if filter {
2345                    if let PlayerState::Playing {
2346                        ref track_id,
2347                        play_request_id,
2348                        is_explicit,
2349                        ..
2350                    }
2351                    | PlayerState::Paused {
2352                        ref track_id,
2353                        play_request_id,
2354                        is_explicit,
2355                        ..
2356                    } = self.state
2357                    {
2358                        let track_id = track_id.clone();
2359
2360                        if is_explicit {
2361                            warn!(
2362                                "Currently loaded track is explicit, which client setting forbids -- skipping to next track."
2363                            );
2364                            self.send_event(PlayerEvent::EndOfTrack {
2365                                track_id,
2366                                play_request_id,
2367                            })
2368                        }
2369                    }
2370                }
2371            }
2372        };
2373
2374        Ok(())
2375    }
2376
2377    fn send_event(&mut self, event: PlayerEvent) {
2378        self.event_senders
2379            .retain(|sender| sender.send(event.clone()).is_ok());
2380    }
2381
2382    fn load_track(
2383        &mut self,
2384        spotify_uri: SpotifyUri,
2385        position_ms: u32,
2386    ) -> impl FusedFuture<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
2387        // This method creates a future that returns the loaded stream and associated info.
2388        // Ideally all work should be done using asynchronous code. However, seek() on the
2389        // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future
2390        // easily. Instead we spawn a thread to do the work and return a one-shot channel as the
2391        // future to work with.
2392
2393        let loader = PlayerTrackLoader {
2394            session: self.session.clone(),
2395            config: self.config.clone(),
2396            local_file_lookup: self.local_file_lookup.clone(),
2397        };
2398
2399        let (result_tx, result_rx) = oneshot::channel();
2400
2401        let load_handles_clone = self.load_handles.clone();
2402        let handle = tokio::runtime::Handle::current();
2403
2404        let load_handle = thread::spawn(move || {
2405            let data = handle.block_on(loader.load_track(spotify_uri, position_ms));
2406            if let Some(data) = data {
2407                let _ = result_tx.send(data);
2408            }
2409
2410            let mut load_handles = load_handles_clone.lock().expect(LOAD_HANDLES_POISON_MSG);
2411            load_handles.remove(&thread::current().id());
2412        });
2413
2414        let mut load_handles = self.load_handles.lock().expect(LOAD_HANDLES_POISON_MSG);
2415        load_handles.insert(load_handle.thread().id(), load_handle);
2416
2417        result_rx.map_err(|_| ())
2418    }
2419
2420    fn preload_data_before_playback(&mut self) -> PlayerResult {
2421        if let PlayerState::Playing {
2422            bytes_per_second,
2423            ref mut stream_loader_controller,
2424            ..
2425        } = self.state
2426        {
2427            let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
2428            // Request our read ahead range
2429            let request_data_length =
2430                (read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;
2431
2432            // Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete.
2433            let wait_for_data_length =
2434                (read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;
2435
2436            stream_loader_controller.fetch_next_and_wait(request_data_length, wait_for_data_length)
2437        } else {
2438            Ok(())
2439        }
2440    }
2441}
2442
2443impl Drop for PlayerInternal {
2444    fn drop(&mut self) {
2445        debug!("drop PlayerInternal[{}]", self.player_id);
2446
2447        let handles: Vec<thread::JoinHandle<()>> = {
2448            // waiting for the thread while holding the mutex would result in a deadlock
2449            let mut load_handles = self.load_handles.lock().expect(LOAD_HANDLES_POISON_MSG);
2450
2451            load_handles
2452                .drain()
2453                .map(|(_thread_id, handle)| handle)
2454                .collect()
2455        };
2456
2457        for handle in handles {
2458            let _ = handle.join();
2459        }
2460    }
2461}
2462
2463impl fmt::Debug for PlayerCommand {
2464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2465        match self {
2466            PlayerCommand::Load {
2467                track_id,
2468                play,
2469                position_ms,
2470                ..
2471            } => f
2472                .debug_tuple("Load")
2473                .field(&track_id)
2474                .field(&play)
2475                .field(&position_ms)
2476                .finish(),
2477            PlayerCommand::Preload { track_id } => {
2478                f.debug_tuple("Preload").field(&track_id).finish()
2479            }
2480            PlayerCommand::Play => f.debug_tuple("Play").finish(),
2481            PlayerCommand::Pause => f.debug_tuple("Pause").finish(),
2482            PlayerCommand::Stop => f.debug_tuple("Stop").finish(),
2483            PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(),
2484            PlayerCommand::SetSession(_) => f.debug_tuple("SetSession").finish(),
2485            PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(),
2486            PlayerCommand::SetSinkEventCallback(_) => {
2487                f.debug_tuple("SetSinkEventCallback").finish()
2488            }
2489            PlayerCommand::EmitVolumeChangedEvent(volume) => f
2490                .debug_tuple("EmitVolumeChangedEvent")
2491                .field(&volume)
2492                .finish(),
2493            PlayerCommand::SetAutoNormaliseAsAlbum(setting) => f
2494                .debug_tuple("SetAutoNormaliseAsAlbum")
2495                .field(&setting)
2496                .finish(),
2497            PlayerCommand::EmitFilterExplicitContentChangedEvent(filter) => f
2498                .debug_tuple("EmitFilterExplicitContentChangedEvent")
2499                .field(&filter)
2500                .finish(),
2501            PlayerCommand::EmitSessionConnectedEvent {
2502                connection_id,
2503                user_name,
2504            } => f
2505                .debug_tuple("EmitSessionConnectedEvent")
2506                .field(&connection_id)
2507                .field(&user_name)
2508                .finish(),
2509            PlayerCommand::EmitSessionDisconnectedEvent {
2510                connection_id,
2511                user_name,
2512            } => f
2513                .debug_tuple("EmitSessionDisconnectedEvent")
2514                .field(&connection_id)
2515                .field(&user_name)
2516                .finish(),
2517            PlayerCommand::EmitSessionClientChangedEvent {
2518                client_id,
2519                client_name,
2520                client_brand_name,
2521                client_model_name,
2522            } => f
2523                .debug_tuple("EmitSessionClientChangedEvent")
2524                .field(&client_id)
2525                .field(&client_name)
2526                .field(&client_brand_name)
2527                .field(&client_model_name)
2528                .finish(),
2529            PlayerCommand::EmitShuffleChangedEvent(shuffle) => f
2530                .debug_tuple("EmitShuffleChangedEvent")
2531                .field(&shuffle)
2532                .finish(),
2533            PlayerCommand::EmitRepeatChangedEvent { context, track } => f
2534                .debug_tuple("EmitRepeatChangedEvent")
2535                .field(&context)
2536                .field(&track)
2537                .finish(),
2538            PlayerCommand::EmitAutoPlayChangedEvent(auto_play) => f
2539                .debug_tuple("EmitAutoPlayChangedEvent")
2540                .field(&auto_play)
2541                .finish(),
2542        }
2543    }
2544}
2545
2546impl fmt::Debug for PlayerState {
2547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2548        use PlayerState::*;
2549        match self {
2550            Stopped => f.debug_struct("Stopped").finish(),
2551            Loading {
2552                track_id,
2553                play_request_id,
2554                ..
2555            } => f
2556                .debug_struct("Loading")
2557                .field("track_id", &track_id)
2558                .field("play_request_id", &play_request_id)
2559                .finish(),
2560            Paused {
2561                track_id,
2562                play_request_id,
2563                ..
2564            } => f
2565                .debug_struct("Paused")
2566                .field("track_id", &track_id)
2567                .field("play_request_id", &play_request_id)
2568                .finish(),
2569            Playing {
2570                track_id,
2571                play_request_id,
2572                ..
2573            } => f
2574                .debug_struct("Playing")
2575                .field("track_id", &track_id)
2576                .field("play_request_id", &play_request_id)
2577                .finish(),
2578            EndOfTrack {
2579                track_id,
2580                play_request_id,
2581                ..
2582            } => f
2583                .debug_struct("EndOfTrack")
2584                .field("track_id", &track_id)
2585                .field("play_request_id", &play_request_id)
2586                .finish(),
2587            Invalid => f.debug_struct("Invalid").finish(),
2588        }
2589    }
2590}
2591
2592struct Subfile<T: Read + Seek> {
2593    stream: T,
2594    offset: u64,
2595    length: u64,
2596}
2597
2598impl<T: Read + Seek> Subfile<T> {
2599    pub fn new(mut stream: T, offset: u64, length: u64) -> Result<Subfile<T>, io::Error> {
2600        let target = SeekFrom::Start(offset);
2601        stream.seek(target)?;
2602
2603        Ok(Subfile {
2604            stream,
2605            offset,
2606            length,
2607        })
2608    }
2609}
2610
2611impl<T: Read + Seek> Read for Subfile<T> {
2612    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2613        self.stream.read(buf)
2614    }
2615}
2616
2617impl<T: Read + Seek> Seek for Subfile<T> {
2618    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
2619        let pos = match pos {
2620            SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
2621            SeekFrom::End(offset) => {
2622                if (self.length as i64 - offset) < self.offset as i64 {
2623                    return Err(io::Error::new(
2624                        io::ErrorKind::InvalidInput,
2625                        "newpos would be < self.offset",
2626                    ));
2627                }
2628                pos
2629            }
2630            _ => pos,
2631        };
2632
2633        let newpos = self.stream.seek(pos)?;
2634        Ok(newpos - self.offset)
2635    }
2636}
2637
2638impl<R> MediaSource for Subfile<R>
2639where
2640    R: Read + Seek + Send + Sync,
2641{
2642    fn is_seekable(&self) -> bool {
2643        true
2644    }
2645
2646    fn byte_len(&self) -> Option<u64> {
2647        Some(self.length)
2648    }
2649}