librespot_core/
session.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    io,
5    pin::Pin,
6    process::exit,
7    sync::{Arc, OnceLock, RwLock, Weak},
8    task::{Context, Poll},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use crate::dealer::manager::DealerManager;
13use crate::{
14    Error,
15    apresolve::{ApResolver, SocketAddress},
16    audio_key::AudioKeyManager,
17    authentication::Credentials,
18    cache::Cache,
19    channel::ChannelManager,
20    config::SessionConfig,
21    connection::{self, AuthenticationError, Transport},
22    http_client::HttpClient,
23    login5::Login5Manager,
24    mercury::MercuryManager,
25    packet::PacketType,
26    protocol::keyexchange::ErrorCode,
27    spclient::SpClient,
28    token::TokenProvider,
29};
30use byteorder::{BigEndian, ByteOrder};
31use bytes::Bytes;
32use futures_core::TryStream;
33use futures_util::StreamExt;
34use librespot_protocol::authentication::AuthenticationType;
35use num_traits::FromPrimitive;
36use pin_project_lite::pin_project;
37use quick_xml::events::Event;
38use thiserror::Error;
39use tokio::{
40    sync::mpsc,
41    time::{Duration as TokioDuration, Instant as TokioInstant, Sleep, sleep},
42};
43use tokio_stream::wrappers::UnboundedReceiverStream;
44use uuid::Uuid;
45
46const SESSION_DATA_POISON_MSG: &str = "session data rwlock should not be poisoned";
47
48#[derive(Debug, Error)]
49pub enum SessionError {
50    #[error(transparent)]
51    AuthenticationError(#[from] AuthenticationError),
52    #[error("Cannot create session: {0}")]
53    IoError(#[from] io::Error),
54    #[error("Session is not connected")]
55    NotConnected,
56    #[error("packet {0} unknown")]
57    Packet(u8),
58}
59
60impl From<SessionError> for Error {
61    fn from(err: SessionError) -> Self {
62        match err {
63            SessionError::AuthenticationError(_) => Error::unauthenticated(err),
64            SessionError::IoError(_) => Error::unavailable(err),
65            SessionError::NotConnected => Error::unavailable(err),
66            SessionError::Packet(_) => Error::unimplemented(err),
67        }
68    }
69}
70
71impl From<quick_xml::encoding::EncodingError> for Error {
72    fn from(err: quick_xml::encoding::EncodingError) -> Self {
73        Error::invalid_argument(err)
74    }
75}
76
77pub type UserAttributes = HashMap<String, String>;
78
79#[derive(Debug, Clone, Default)]
80pub struct UserData {
81    pub country: String,
82    pub canonical_username: String,
83    pub attributes: UserAttributes,
84}
85
86#[derive(Debug, Clone, Default)]
87struct SessionData {
88    session_id: String,
89    client_id: String,
90    client_name: String,
91    client_brand_name: String,
92    client_model_name: String,
93    connection_id: String,
94    auth_data: Vec<u8>,
95    time_delta: i64,
96    invalid: bool,
97    user_data: UserData,
98}
99
100struct SessionInternal {
101    config: SessionConfig,
102    data: RwLock<SessionData>,
103
104    http_client: HttpClient,
105    tx_connection: OnceLock<mpsc::UnboundedSender<(u8, Vec<u8>)>>,
106
107    apresolver: OnceLock<ApResolver>,
108    audio_key: OnceLock<AudioKeyManager>,
109    channel: OnceLock<ChannelManager>,
110    mercury: OnceLock<MercuryManager>,
111    dealer: OnceLock<DealerManager>,
112    spclient: OnceLock<SpClient>,
113    token_provider: OnceLock<TokenProvider>,
114    login5: OnceLock<Login5Manager>,
115    cache: Option<Arc<Cache>>,
116
117    handle: tokio::runtime::Handle,
118}
119
120/// A shared reference to a Spotify session.
121///
122/// After instantiating, you need to login via [Session::connect].
123/// You can either implement the whole playback logic yourself by using
124/// this structs interface directly or hand it to a
125/// `Player`.
126///
127/// *Note*: [Session] instances cannot yet be reused once invalidated. After
128/// an unexpectedly closed connection, you'll need to create a new [Session].
129#[derive(Clone)]
130pub struct Session(Arc<SessionInternal>);
131
132impl Session {
133    pub fn new(config: SessionConfig, cache: Option<Cache>) -> Self {
134        let http_client = HttpClient::new(config.proxy.as_ref());
135
136        debug!("new Session");
137
138        let session_data = SessionData {
139            client_id: config.client_id.clone(),
140            // can be any guid, doesn't need to be simple
141            session_id: Uuid::new_v4().as_simple().to_string(),
142            ..SessionData::default()
143        };
144
145        Self(Arc::new(SessionInternal {
146            config,
147            data: RwLock::new(session_data),
148            http_client,
149            tx_connection: OnceLock::new(),
150            cache: cache.map(Arc::new),
151            apresolver: OnceLock::new(),
152            audio_key: OnceLock::new(),
153            channel: OnceLock::new(),
154            mercury: OnceLock::new(),
155            dealer: OnceLock::new(),
156            spclient: OnceLock::new(),
157            token_provider: OnceLock::new(),
158            login5: OnceLock::new(),
159            handle: tokio::runtime::Handle::current(),
160        }))
161    }
162
163    async fn connect_inner(
164        &self,
165        access_point: &SocketAddress,
166        credentials: Credentials,
167    ) -> Result<(Credentials, Transport), Error> {
168        const MAX_RETRIES: u8 = 1;
169        let mut transport = connection::connect_with_retry(
170            &access_point.0,
171            access_point.1,
172            self.config().proxy.as_ref(),
173            MAX_RETRIES,
174        )
175        .await?;
176        let mut reusable_credentials = connection::authenticate(
177            &mut transport,
178            credentials.clone(),
179            &self.config().device_id,
180        )
181        .await?;
182
183        // Might be able to remove this once keymaster is replaced with login5.
184        if credentials.auth_type == AuthenticationType::AUTHENTICATION_SPOTIFY_TOKEN {
185            trace!(
186                "Reconnect using stored credentials as token authed sessions cannot use keymaster."
187            );
188            transport = connection::connect_with_retry(
189                &access_point.0,
190                access_point.1,
191                self.config().proxy.as_ref(),
192                MAX_RETRIES,
193            )
194            .await?;
195            reusable_credentials = connection::authenticate(
196                &mut transport,
197                reusable_credentials.clone(),
198                &self.config().device_id,
199            )
200            .await?;
201        }
202
203        Ok((reusable_credentials, transport))
204    }
205
206    pub async fn connect(
207        &self,
208        credentials: Credentials,
209        store_credentials: bool,
210    ) -> Result<(), Error> {
211        // There currently happen to be 6 APs but anything will do to avoid an infinite loop.
212        const MAX_AP_TRIES: u8 = 6;
213        let mut num_ap_tries = 0;
214        let (reusable_credentials, transport) = loop {
215            let ap = self.apresolver().resolve("accesspoint").await?;
216            info!("Connecting to AP \"{}:{}\"", ap.0, ap.1);
217            match self.connect_inner(&ap, credentials.clone()).await {
218                Ok(ct) => break ct,
219                Err(e) => {
220                    num_ap_tries += 1;
221                    if MAX_AP_TRIES == num_ap_tries {
222                        error!("Tried too many access points");
223                        return Err(e);
224                    }
225                    if let Some(AuthenticationError::LoginFailed(ErrorCode::TryAnotherAP)) =
226                        e.error.downcast_ref::<AuthenticationError>()
227                    {
228                        warn!("Instructed to try another access point...");
229                        continue;
230                    } else if let Some(AuthenticationError::LoginFailed(..)) =
231                        e.error.downcast_ref::<AuthenticationError>()
232                    {
233                        return Err(e);
234                    } else {
235                        warn!("Try another access point...");
236                        continue;
237                    }
238                }
239            }
240        };
241
242        let username = reusable_credentials
243            .username
244            .as_ref()
245            .map_or("UNKNOWN", |s| s.as_str());
246        info!("Authenticated as '{username}' !");
247        self.set_username(username);
248        self.set_auth_data(&reusable_credentials.auth_data);
249        if let Some(cache) = self.cache() {
250            if store_credentials {
251                let cred_changed = cache
252                    .credentials()
253                    .map(|c| c != reusable_credentials)
254                    .unwrap_or(true);
255                if cred_changed {
256                    cache.save_credentials(&reusable_credentials);
257                }
258            }
259        }
260
261        // This channel serves as a buffer for packets and serializes access to the TcpStream, such
262        // that `self.send_packet` can return immediately and needs no additional synchronization.
263        let (tx_connection, rx_connection) = mpsc::unbounded_channel();
264        self.0
265            .tx_connection
266            .set(tx_connection)
267            .map_err(|_| SessionError::NotConnected)?;
268
269        let (sink, stream) = transport.split();
270        let sender_task = UnboundedReceiverStream::new(rx_connection)
271            .map(Ok)
272            .forward(sink);
273        let session_weak = self.weak();
274        tokio::spawn(async move {
275            if let Err(e) = sender_task.await {
276                error!("{e}");
277                if let Some(session) = session_weak.try_upgrade() {
278                    if !session.is_invalid() {
279                        session.shutdown();
280                    }
281                }
282            }
283        });
284
285        tokio::spawn(DispatchTask::new(self.weak(), stream));
286
287        Ok(())
288    }
289
290    pub fn apresolver(&self) -> &ApResolver {
291        self.0
292            .apresolver
293            .get_or_init(|| ApResolver::new(self.weak()))
294    }
295
296    pub fn audio_key(&self) -> &AudioKeyManager {
297        self.0
298            .audio_key
299            .get_or_init(|| AudioKeyManager::new(self.weak()))
300    }
301
302    pub fn channel(&self) -> &ChannelManager {
303        self.0
304            .channel
305            .get_or_init(|| ChannelManager::new(self.weak()))
306    }
307
308    pub fn http_client(&self) -> &HttpClient {
309        &self.0.http_client
310    }
311
312    pub fn mercury(&self) -> &MercuryManager {
313        self.0
314            .mercury
315            .get_or_init(|| MercuryManager::new(self.weak()))
316    }
317
318    pub fn dealer(&self) -> &DealerManager {
319        self.0
320            .dealer
321            .get_or_init(|| DealerManager::new(self.weak()))
322    }
323
324    pub fn spclient(&self) -> &SpClient {
325        self.0.spclient.get_or_init(|| SpClient::new(self.weak()))
326    }
327
328    pub fn token_provider(&self) -> &TokenProvider {
329        self.0
330            .token_provider
331            .get_or_init(|| TokenProvider::new(self.weak()))
332    }
333
334    pub fn login5(&self) -> &Login5Manager {
335        self.0
336            .login5
337            .get_or_init(|| Login5Manager::new(self.weak()))
338    }
339
340    pub fn time_delta(&self) -> i64 {
341        self.0
342            .data
343            .read()
344            .expect(SESSION_DATA_POISON_MSG)
345            .time_delta
346    }
347
348    pub fn spawn<T>(&self, task: T)
349    where
350        T: Future + Send + 'static,
351        T::Output: Send + 'static,
352    {
353        self.0.handle.spawn(task);
354    }
355
356    fn debug_info(&self) {
357        debug!(
358            "Session strong={} weak={}",
359            Arc::strong_count(&self.0),
360            Arc::weak_count(&self.0)
361        );
362    }
363
364    fn check_catalogue(attributes: &UserAttributes) {
365        if let Some(account_type) = attributes.get("type") {
366            if account_type != "premium" {
367                error!("librespot does not support {account_type:?} accounts.");
368                info!("Please support Spotify and your artists and sign up for a premium account.");
369
370                // TODO: logout instead of exiting
371                exit(1);
372            }
373        }
374    }
375
376    pub fn send_packet(&self, cmd: PacketType, data: Vec<u8>) -> Result<(), Error> {
377        match self.0.tx_connection.get() {
378            Some(tx) => Ok(tx.send((cmd as u8, data))?),
379            None => Err(SessionError::NotConnected.into()),
380        }
381    }
382
383    pub fn cache(&self) -> Option<&Arc<Cache>> {
384        self.0.cache.as_ref()
385    }
386
387    pub fn config(&self) -> &SessionConfig {
388        &self.0.config
389    }
390
391    // This clones a fairly large struct, so use a specific getter or setter unless
392    // you need more fields at once, in which case this can spare multiple `read`
393    // locks.
394    pub fn user_data(&self) -> UserData {
395        self.0
396            .data
397            .read()
398            .expect(SESSION_DATA_POISON_MSG)
399            .user_data
400            .clone()
401    }
402
403    pub fn session_id(&self) -> String {
404        self.0
405            .data
406            .read()
407            .expect(SESSION_DATA_POISON_MSG)
408            .session_id
409            .clone()
410    }
411
412    pub fn set_session_id(&self, session_id: &str) {
413        session_id.clone_into(
414            &mut self
415                .0
416                .data
417                .write()
418                .expect(SESSION_DATA_POISON_MSG)
419                .session_id,
420        );
421    }
422
423    pub fn device_id(&self) -> &str {
424        &self.config().device_id
425    }
426
427    pub fn client_id(&self) -> String {
428        self.0
429            .data
430            .read()
431            .expect(SESSION_DATA_POISON_MSG)
432            .client_id
433            .clone()
434    }
435
436    pub fn set_client_id(&self, client_id: &str) {
437        client_id.clone_into(
438            &mut self
439                .0
440                .data
441                .write()
442                .expect(SESSION_DATA_POISON_MSG)
443                .client_id,
444        );
445    }
446
447    pub fn client_name(&self) -> String {
448        self.0
449            .data
450            .read()
451            .expect(SESSION_DATA_POISON_MSG)
452            .client_name
453            .clone()
454    }
455
456    pub fn set_client_name(&self, client_name: &str) {
457        client_name.clone_into(
458            &mut self
459                .0
460                .data
461                .write()
462                .expect(SESSION_DATA_POISON_MSG)
463                .client_name,
464        );
465    }
466
467    pub fn client_brand_name(&self) -> String {
468        self.0
469            .data
470            .read()
471            .expect(SESSION_DATA_POISON_MSG)
472            .client_brand_name
473            .clone()
474    }
475
476    pub fn set_client_brand_name(&self, client_brand_name: &str) {
477        client_brand_name.clone_into(
478            &mut self
479                .0
480                .data
481                .write()
482                .expect(SESSION_DATA_POISON_MSG)
483                .client_brand_name,
484        );
485    }
486
487    pub fn client_model_name(&self) -> String {
488        self.0
489            .data
490            .read()
491            .expect(SESSION_DATA_POISON_MSG)
492            .client_model_name
493            .clone()
494    }
495
496    pub fn set_client_model_name(&self, client_model_name: &str) {
497        client_model_name.clone_into(
498            &mut self
499                .0
500                .data
501                .write()
502                .expect(SESSION_DATA_POISON_MSG)
503                .client_model_name,
504        );
505    }
506
507    pub fn connection_id(&self) -> String {
508        self.0
509            .data
510            .read()
511            .expect(SESSION_DATA_POISON_MSG)
512            .connection_id
513            .clone()
514    }
515
516    pub fn set_connection_id(&self, connection_id: &str) {
517        connection_id.clone_into(
518            &mut self
519                .0
520                .data
521                .write()
522                .expect(SESSION_DATA_POISON_MSG)
523                .connection_id,
524        );
525    }
526
527    pub fn username(&self) -> String {
528        self.0
529            .data
530            .read()
531            .expect(SESSION_DATA_POISON_MSG)
532            .user_data
533            .canonical_username
534            .clone()
535    }
536
537    pub fn set_username(&self, username: &str) {
538        username.clone_into(
539            &mut self
540                .0
541                .data
542                .write()
543                .expect(SESSION_DATA_POISON_MSG)
544                .user_data
545                .canonical_username,
546        );
547    }
548
549    pub fn auth_data(&self) -> Vec<u8> {
550        self.0
551            .data
552            .read()
553            .expect(SESSION_DATA_POISON_MSG)
554            .auth_data
555            .clone()
556    }
557
558    pub fn set_auth_data(&self, auth_data: &[u8]) {
559        auth_data.clone_into(
560            &mut self
561                .0
562                .data
563                .write()
564                .expect(SESSION_DATA_POISON_MSG)
565                .auth_data,
566        );
567    }
568
569    pub fn country(&self) -> String {
570        self.0
571            .data
572            .read()
573            .expect(SESSION_DATA_POISON_MSG)
574            .user_data
575            .country
576            .clone()
577    }
578
579    pub fn filter_explicit_content(&self) -> bool {
580        match self.get_user_attribute("filter-explicit-content") {
581            Some(value) => matches!(&*value, "1"),
582            None => false,
583        }
584    }
585
586    pub fn autoplay(&self) -> bool {
587        if let Some(overide) = self.config().autoplay {
588            return overide;
589        }
590
591        match self.get_user_attribute("autoplay") {
592            Some(value) => matches!(&*value, "1"),
593            None => false,
594        }
595    }
596
597    pub fn set_user_attribute(&self, key: &str, value: &str) -> Option<String> {
598        let mut dummy_attributes = UserAttributes::new();
599        dummy_attributes.insert(key.to_owned(), value.to_owned());
600        Self::check_catalogue(&dummy_attributes);
601
602        self.0
603            .data
604            .write()
605            .expect(SESSION_DATA_POISON_MSG)
606            .user_data
607            .attributes
608            .insert(key.to_owned(), value.to_owned())
609    }
610
611    pub fn set_user_attributes(&self, attributes: UserAttributes) {
612        Self::check_catalogue(&attributes);
613
614        self.0
615            .data
616            .write()
617            .expect(SESSION_DATA_POISON_MSG)
618            .user_data
619            .attributes
620            .extend(attributes)
621    }
622
623    pub fn get_user_attribute(&self, key: &str) -> Option<String> {
624        self.0
625            .data
626            .read()
627            .expect(SESSION_DATA_POISON_MSG)
628            .user_data
629            .attributes
630            .get(key)
631            .cloned()
632    }
633
634    fn weak(&self) -> SessionWeak {
635        SessionWeak(Arc::downgrade(&self.0))
636    }
637
638    pub fn shutdown(&self) {
639        debug!("Shutdown: Invalidating session");
640        self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true;
641        self.mercury().shutdown();
642        self.channel().shutdown();
643    }
644
645    pub fn is_invalid(&self) -> bool {
646        self.0.data.read().expect(SESSION_DATA_POISON_MSG).invalid
647    }
648}
649
650#[derive(Clone)]
651pub struct SessionWeak(Weak<SessionInternal>);
652
653impl SessionWeak {
654    fn try_upgrade(&self) -> Option<Session> {
655        self.0.upgrade().map(Session)
656    }
657
658    pub(crate) fn upgrade(&self) -> Session {
659        self.try_upgrade()
660            .expect("session was dropped and so should have this component")
661    }
662}
663
664impl Drop for SessionInternal {
665    fn drop(&mut self) {
666        debug!("drop Session");
667    }
668}
669
670#[derive(Clone, Copy, Default, Debug, PartialEq)]
671enum KeepAliveState {
672    #[default]
673    // Expecting a Ping from the server, either after startup or after a PongAck.
674    ExpectingPing,
675
676    // We need to send a Pong at the given time.
677    PendingPong,
678
679    // We just sent a Pong and wait for it be ACK'd.
680    ExpectingPongAck,
681}
682
683const INITIAL_PING_TIMEOUT: TokioDuration = TokioDuration::from_secs(20);
684const PING_TIMEOUT: TokioDuration = TokioDuration::from_secs(80); // 60s expected + 20s buffer
685const PONG_DELAY: TokioDuration = TokioDuration::from_secs(60);
686const PONG_ACK_TIMEOUT: TokioDuration = TokioDuration::from_secs(20);
687
688impl KeepAliveState {
689    fn debug(&self, sleep: &Sleep) {
690        let delay = sleep
691            .deadline()
692            .checked_duration_since(TokioInstant::now())
693            .map(|t| t.as_secs_f64())
694            .unwrap_or(f64::INFINITY);
695
696        trace!("keep-alive state: {self:?}, timeout in {delay:.1}");
697    }
698}
699
700pin_project! {
701    struct DispatchTask<S>
702    where
703        S: TryStream<Ok = (u8, Bytes)>
704    {
705        session: SessionWeak,
706        keep_alive_state: KeepAliveState,
707        #[pin]
708        stream: S,
709        #[pin]
710        timeout: Sleep,
711    }
712
713    impl<S> PinnedDrop for DispatchTask<S>
714    where
715        S: TryStream<Ok = (u8, Bytes)>
716    {
717        fn drop(_this: Pin<&mut Self>) {
718            debug!("drop Dispatch");
719        }
720    }
721}
722
723impl<S> DispatchTask<S>
724where
725    S: TryStream<Ok = (u8, Bytes)>,
726{
727    fn new(session: SessionWeak, stream: S) -> Self {
728        Self {
729            session,
730            keep_alive_state: KeepAliveState::ExpectingPing,
731            stream,
732            timeout: sleep(INITIAL_PING_TIMEOUT),
733        }
734    }
735
736    fn dispatch(
737        mut self: Pin<&mut Self>,
738        session: &Session,
739        cmd: u8,
740        data: Bytes,
741    ) -> Result<(), Error> {
742        use KeepAliveState::*;
743        use PacketType::*;
744
745        let packet_type = FromPrimitive::from_u8(cmd);
746        let cmd = match packet_type {
747            Some(cmd) => cmd,
748            None => {
749                trace!("Ignoring unknown packet {cmd:x}");
750                return Err(SessionError::Packet(cmd).into());
751            }
752        };
753
754        match packet_type {
755            Some(Ping) => {
756                trace!("Received Ping");
757                if self.keep_alive_state != ExpectingPing {
758                    warn!("Received unexpected Ping from server")
759                }
760                let mut this = self.as_mut().project();
761                *this.keep_alive_state = PendingPong;
762                this.timeout
763                    .as_mut()
764                    .reset(TokioInstant::now() + PONG_DELAY);
765                this.keep_alive_state.debug(&this.timeout);
766
767                let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64;
768                let timestamp = SystemTime::now()
769                    .duration_since(UNIX_EPOCH)
770                    .unwrap_or(Duration::ZERO)
771                    .as_secs() as i64;
772                {
773                    let mut data = session.0.data.write().expect(SESSION_DATA_POISON_MSG);
774                    data.time_delta = server_timestamp.saturating_sub(timestamp);
775                }
776
777                session.debug_info();
778
779                Ok(())
780            }
781            Some(PongAck) => {
782                trace!("Received PongAck");
783                if self.keep_alive_state != ExpectingPongAck {
784                    warn!("Received unexpected PongAck from server")
785                }
786                let mut this = self.as_mut().project();
787                *this.keep_alive_state = ExpectingPing;
788                this.timeout
789                    .as_mut()
790                    .reset(TokioInstant::now() + PING_TIMEOUT);
791                this.keep_alive_state.debug(&this.timeout);
792
793                Ok(())
794            }
795            Some(CountryCode) => {
796                let country = String::from_utf8(data.as_ref().to_owned())?;
797                info!("Country: {country:?}");
798                session
799                    .0
800                    .data
801                    .write()
802                    .expect(SESSION_DATA_POISON_MSG)
803                    .user_data
804                    .country = country;
805                Ok(())
806            }
807            Some(StreamChunkRes) | Some(ChannelError) => session.channel().dispatch(cmd, data),
808            Some(AesKey) | Some(AesKeyError) => session.audio_key().dispatch(cmd, data),
809            Some(MercuryReq) | Some(MercurySub) | Some(MercuryUnsub) | Some(MercuryEvent) => {
810                session.mercury().dispatch(cmd, data)
811            }
812            Some(ProductInfo) => {
813                let data = std::str::from_utf8(&data)?;
814                let mut reader = quick_xml::Reader::from_str(data);
815
816                let mut buf = Vec::new();
817                let mut current_element = String::new();
818                let mut user_attributes: UserAttributes = HashMap::new();
819
820                loop {
821                    match reader.read_event_into(&mut buf) {
822                        Ok(Event::Start(ref element)) => {
823                            std::str::from_utf8(element)?.clone_into(&mut current_element)
824                        }
825                        Ok(Event::End(_)) => {
826                            current_element = String::new();
827                        }
828                        Ok(Event::Text(ref value)) => {
829                            if !current_element.is_empty() {
830                                let _ = user_attributes.insert(
831                                    current_element.clone(),
832                                    value.xml_content()?.to_string(),
833                                );
834                            }
835                        }
836                        Ok(Event::Eof) => break,
837                        Ok(_) => (),
838                        Err(e) => warn!(
839                            "Error parsing XML at position {}: {:?}",
840                            reader.buffer_position(),
841                            e
842                        ),
843                    }
844                }
845
846                trace!("Received product info: {user_attributes:#?}");
847                Session::check_catalogue(&user_attributes);
848
849                session
850                    .0
851                    .data
852                    .write()
853                    .expect(SESSION_DATA_POISON_MSG)
854                    .user_data
855                    .attributes = user_attributes;
856                Ok(())
857            }
858            Some(SecretBlock)
859            | Some(LegacyWelcome)
860            | Some(UnknownDataAllZeros)
861            | Some(LicenseVersion) => Ok(()),
862            _ => {
863                trace!("Ignoring {cmd:?} packet with data {data:#?}");
864                Err(SessionError::Packet(cmd as u8).into())
865            }
866        }
867    }
868}
869
870impl<S> Future for DispatchTask<S>
871where
872    S: TryStream<Ok = (u8, Bytes), Error = std::io::Error>,
873    <S as TryStream>::Ok: std::fmt::Debug,
874{
875    type Output = Result<(), S::Error>;
876
877    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
878        use KeepAliveState::*;
879
880        let session = match self.session.try_upgrade() {
881            Some(session) => session,
882            None => return Poll::Ready(Ok(())),
883        };
884
885        // Process all messages that are immediately ready
886        loop {
887            match self.as_mut().project().stream.try_poll_next(cx) {
888                Poll::Ready(Some(Ok((cmd, data)))) => {
889                    let result = self.as_mut().dispatch(&session, cmd, data);
890                    if let Err(e) = result {
891                        debug!("could not dispatch command: {e}");
892                    }
893                }
894                Poll::Ready(None) => {
895                    warn!("Connection to server closed.");
896                    session.shutdown();
897                    return Poll::Ready(Ok(()));
898                }
899                Poll::Ready(Some(Err(e))) => {
900                    error!("Connection to server closed.");
901                    session.shutdown();
902                    return Poll::Ready(Err(e));
903                }
904                Poll::Pending => break,
905            }
906        }
907
908        // Handle the keep-alive sequence, returning an error when we haven't received a
909        // Ping/PongAck for too long.
910        //
911        // The expected keepalive sequence is
912        // - Server: Ping
913        // - wait 60s
914        // - Client: Pong
915        // - Server: PongAck
916        // - wait 60s
917        // - repeat
918        //
919        // This means that we silently lost connection to Spotify servers if
920        // - we don't receive Ping immediately after connecting,
921        // - we don't receive a Ping 60s after the last PongAck or
922        // - we don't receive a PongAck immediately after our Pong.
923        //
924        // Currently, we add a safety margin of 20s to these expected deadlines.
925        let mut this = self.as_mut().project();
926        if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
927            match this.keep_alive_state {
928                ExpectingPing | ExpectingPongAck => {
929                    if !session.is_invalid() {
930                        session.shutdown();
931                    }
932                    // TODO: Optionally reconnect (with cached/last credentials?)
933                    return Poll::Ready(Err(io::Error::new(
934                        io::ErrorKind::TimedOut,
935                        format!(
936                            "session lost connection to server ({:?})",
937                            this.keep_alive_state
938                        ),
939                    )));
940                }
941                PendingPong => {
942                    trace!("Sending Pong");
943                    // TODO: Ideally, this should flush the `Framed<TcpStream> as Sink`
944                    // before starting the timeout.
945                    let _ = session.send_packet(PacketType::Pong, vec![0, 0, 0, 0]);
946                    *this.keep_alive_state = ExpectingPongAck;
947                    this.timeout
948                        .as_mut()
949                        .reset(TokioInstant::now() + PONG_ACK_TIMEOUT);
950                    this.keep_alive_state.debug(&this.timeout);
951                }
952            }
953        }
954
955        Poll::Pending
956    }
957}