1use std::{
2 collections::HashMap,
3 future::Future,
4 io,
5 pin::Pin,
6 process::exit,
7 sync::{Arc, OnceLock, RwLock, Weak},
8 task::{Context, Poll},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use crate::dealer::manager::DealerManager;
13use crate::{
14 Error,
15 apresolve::{ApResolver, SocketAddress},
16 audio_key::AudioKeyManager,
17 authentication::Credentials,
18 cache::Cache,
19 channel::ChannelManager,
20 config::SessionConfig,
21 connection::{self, AuthenticationError, Transport},
22 http_client::HttpClient,
23 login5::Login5Manager,
24 mercury::MercuryManager,
25 packet::PacketType,
26 protocol::keyexchange::ErrorCode,
27 spclient::SpClient,
28 token::TokenProvider,
29};
30use byteorder::{BigEndian, ByteOrder};
31use bytes::Bytes;
32use futures_core::TryStream;
33use futures_util::StreamExt;
34use librespot_protocol::authentication::AuthenticationType;
35use num_traits::FromPrimitive;
36use pin_project_lite::pin_project;
37use quick_xml::events::Event;
38use thiserror::Error;
39use tokio::{
40 sync::mpsc,
41 time::{Duration as TokioDuration, Instant as TokioInstant, Sleep, sleep},
42};
43use tokio_stream::wrappers::UnboundedReceiverStream;
44use uuid::Uuid;
45
46const SESSION_DATA_POISON_MSG: &str = "session data rwlock should not be poisoned";
47
48#[derive(Debug, Error)]
49pub enum SessionError {
50 #[error(transparent)]
51 AuthenticationError(#[from] AuthenticationError),
52 #[error("Cannot create session: {0}")]
53 IoError(#[from] io::Error),
54 #[error("Session is not connected")]
55 NotConnected,
56 #[error("packet {0} unknown")]
57 Packet(u8),
58}
59
60impl From<SessionError> for Error {
61 fn from(err: SessionError) -> Self {
62 match err {
63 SessionError::AuthenticationError(_) => Error::unauthenticated(err),
64 SessionError::IoError(_) => Error::unavailable(err),
65 SessionError::NotConnected => Error::unavailable(err),
66 SessionError::Packet(_) => Error::unimplemented(err),
67 }
68 }
69}
70
71impl From<quick_xml::encoding::EncodingError> for Error {
72 fn from(err: quick_xml::encoding::EncodingError) -> Self {
73 Error::invalid_argument(err)
74 }
75}
76
77pub type UserAttributes = HashMap<String, String>;
78
79#[derive(Debug, Clone, Default)]
80pub struct UserData {
81 pub country: String,
82 pub canonical_username: String,
83 pub attributes: UserAttributes,
84}
85
86#[derive(Debug, Clone, Default)]
87struct SessionData {
88 session_id: String,
89 client_id: String,
90 client_name: String,
91 client_brand_name: String,
92 client_model_name: String,
93 connection_id: String,
94 auth_data: Vec<u8>,
95 time_delta: i64,
96 invalid: bool,
97 user_data: UserData,
98}
99
100struct SessionInternal {
101 config: SessionConfig,
102 data: RwLock<SessionData>,
103
104 http_client: HttpClient,
105 tx_connection: OnceLock<mpsc::UnboundedSender<(u8, Vec<u8>)>>,
106
107 apresolver: OnceLock<ApResolver>,
108 audio_key: OnceLock<AudioKeyManager>,
109 channel: OnceLock<ChannelManager>,
110 mercury: OnceLock<MercuryManager>,
111 dealer: OnceLock<DealerManager>,
112 spclient: OnceLock<SpClient>,
113 token_provider: OnceLock<TokenProvider>,
114 login5: OnceLock<Login5Manager>,
115 cache: Option<Arc<Cache>>,
116
117 handle: tokio::runtime::Handle,
118}
119
120#[derive(Clone)]
130pub struct Session(Arc<SessionInternal>);
131
132impl Session {
133 pub fn new(config: SessionConfig, cache: Option<Cache>) -> Self {
134 let http_client = HttpClient::new(config.proxy.as_ref());
135
136 debug!("new Session");
137
138 let session_data = SessionData {
139 client_id: config.client_id.clone(),
140 session_id: Uuid::new_v4().as_simple().to_string(),
142 ..SessionData::default()
143 };
144
145 Self(Arc::new(SessionInternal {
146 config,
147 data: RwLock::new(session_data),
148 http_client,
149 tx_connection: OnceLock::new(),
150 cache: cache.map(Arc::new),
151 apresolver: OnceLock::new(),
152 audio_key: OnceLock::new(),
153 channel: OnceLock::new(),
154 mercury: OnceLock::new(),
155 dealer: OnceLock::new(),
156 spclient: OnceLock::new(),
157 token_provider: OnceLock::new(),
158 login5: OnceLock::new(),
159 handle: tokio::runtime::Handle::current(),
160 }))
161 }
162
163 async fn connect_inner(
164 &self,
165 access_point: &SocketAddress,
166 credentials: Credentials,
167 ) -> Result<(Credentials, Transport), Error> {
168 const MAX_RETRIES: u8 = 1;
169 let mut transport = connection::connect_with_retry(
170 &access_point.0,
171 access_point.1,
172 self.config().proxy.as_ref(),
173 MAX_RETRIES,
174 )
175 .await?;
176 let mut reusable_credentials = connection::authenticate(
177 &mut transport,
178 credentials.clone(),
179 &self.config().device_id,
180 )
181 .await?;
182
183 if credentials.auth_type == AuthenticationType::AUTHENTICATION_SPOTIFY_TOKEN {
185 trace!(
186 "Reconnect using stored credentials as token authed sessions cannot use keymaster."
187 );
188 transport = connection::connect_with_retry(
189 &access_point.0,
190 access_point.1,
191 self.config().proxy.as_ref(),
192 MAX_RETRIES,
193 )
194 .await?;
195 reusable_credentials = connection::authenticate(
196 &mut transport,
197 reusable_credentials.clone(),
198 &self.config().device_id,
199 )
200 .await?;
201 }
202
203 Ok((reusable_credentials, transport))
204 }
205
206 pub async fn connect(
207 &self,
208 credentials: Credentials,
209 store_credentials: bool,
210 ) -> Result<(), Error> {
211 const MAX_AP_TRIES: u8 = 6;
213 let mut num_ap_tries = 0;
214 let (reusable_credentials, transport) = loop {
215 let ap = self.apresolver().resolve("accesspoint").await?;
216 info!("Connecting to AP \"{}:{}\"", ap.0, ap.1);
217 match self.connect_inner(&ap, credentials.clone()).await {
218 Ok(ct) => break ct,
219 Err(e) => {
220 num_ap_tries += 1;
221 if MAX_AP_TRIES == num_ap_tries {
222 error!("Tried too many access points");
223 return Err(e);
224 }
225 if let Some(AuthenticationError::LoginFailed(ErrorCode::TryAnotherAP)) =
226 e.error.downcast_ref::<AuthenticationError>()
227 {
228 warn!("Instructed to try another access point...");
229 continue;
230 } else if let Some(AuthenticationError::LoginFailed(..)) =
231 e.error.downcast_ref::<AuthenticationError>()
232 {
233 return Err(e);
234 } else {
235 warn!("Try another access point...");
236 continue;
237 }
238 }
239 }
240 };
241
242 let username = reusable_credentials
243 .username
244 .as_ref()
245 .map_or("UNKNOWN", |s| s.as_str());
246 info!("Authenticated as '{username}' !");
247 self.set_username(username);
248 self.set_auth_data(&reusable_credentials.auth_data);
249 if let Some(cache) = self.cache() {
250 if store_credentials {
251 let cred_changed = cache
252 .credentials()
253 .map(|c| c != reusable_credentials)
254 .unwrap_or(true);
255 if cred_changed {
256 cache.save_credentials(&reusable_credentials);
257 }
258 }
259 }
260
261 let (tx_connection, rx_connection) = mpsc::unbounded_channel();
264 self.0
265 .tx_connection
266 .set(tx_connection)
267 .map_err(|_| SessionError::NotConnected)?;
268
269 let (sink, stream) = transport.split();
270 let sender_task = UnboundedReceiverStream::new(rx_connection)
271 .map(Ok)
272 .forward(sink);
273 let session_weak = self.weak();
274 tokio::spawn(async move {
275 if let Err(e) = sender_task.await {
276 error!("{e}");
277 if let Some(session) = session_weak.try_upgrade() {
278 if !session.is_invalid() {
279 session.shutdown();
280 }
281 }
282 }
283 });
284
285 tokio::spawn(DispatchTask::new(self.weak(), stream));
286
287 Ok(())
288 }
289
290 pub fn apresolver(&self) -> &ApResolver {
291 self.0
292 .apresolver
293 .get_or_init(|| ApResolver::new(self.weak()))
294 }
295
296 pub fn audio_key(&self) -> &AudioKeyManager {
297 self.0
298 .audio_key
299 .get_or_init(|| AudioKeyManager::new(self.weak()))
300 }
301
302 pub fn channel(&self) -> &ChannelManager {
303 self.0
304 .channel
305 .get_or_init(|| ChannelManager::new(self.weak()))
306 }
307
308 pub fn http_client(&self) -> &HttpClient {
309 &self.0.http_client
310 }
311
312 pub fn mercury(&self) -> &MercuryManager {
313 self.0
314 .mercury
315 .get_or_init(|| MercuryManager::new(self.weak()))
316 }
317
318 pub fn dealer(&self) -> &DealerManager {
319 self.0
320 .dealer
321 .get_or_init(|| DealerManager::new(self.weak()))
322 }
323
324 pub fn spclient(&self) -> &SpClient {
325 self.0.spclient.get_or_init(|| SpClient::new(self.weak()))
326 }
327
328 pub fn token_provider(&self) -> &TokenProvider {
329 self.0
330 .token_provider
331 .get_or_init(|| TokenProvider::new(self.weak()))
332 }
333
334 pub fn login5(&self) -> &Login5Manager {
335 self.0
336 .login5
337 .get_or_init(|| Login5Manager::new(self.weak()))
338 }
339
340 pub fn time_delta(&self) -> i64 {
341 self.0
342 .data
343 .read()
344 .expect(SESSION_DATA_POISON_MSG)
345 .time_delta
346 }
347
348 pub fn spawn<T>(&self, task: T)
349 where
350 T: Future + Send + 'static,
351 T::Output: Send + 'static,
352 {
353 self.0.handle.spawn(task);
354 }
355
356 fn debug_info(&self) {
357 debug!(
358 "Session strong={} weak={}",
359 Arc::strong_count(&self.0),
360 Arc::weak_count(&self.0)
361 );
362 }
363
364 fn check_catalogue(attributes: &UserAttributes) {
365 if let Some(account_type) = attributes.get("type") {
366 if account_type != "premium" {
367 error!("librespot does not support {account_type:?} accounts.");
368 info!("Please support Spotify and your artists and sign up for a premium account.");
369
370 exit(1);
372 }
373 }
374 }
375
376 pub fn send_packet(&self, cmd: PacketType, data: Vec<u8>) -> Result<(), Error> {
377 match self.0.tx_connection.get() {
378 Some(tx) => Ok(tx.send((cmd as u8, data))?),
379 None => Err(SessionError::NotConnected.into()),
380 }
381 }
382
383 pub fn cache(&self) -> Option<&Arc<Cache>> {
384 self.0.cache.as_ref()
385 }
386
387 pub fn config(&self) -> &SessionConfig {
388 &self.0.config
389 }
390
391 pub fn user_data(&self) -> UserData {
395 self.0
396 .data
397 .read()
398 .expect(SESSION_DATA_POISON_MSG)
399 .user_data
400 .clone()
401 }
402
403 pub fn session_id(&self) -> String {
404 self.0
405 .data
406 .read()
407 .expect(SESSION_DATA_POISON_MSG)
408 .session_id
409 .clone()
410 }
411
412 pub fn set_session_id(&self, session_id: &str) {
413 session_id.clone_into(
414 &mut self
415 .0
416 .data
417 .write()
418 .expect(SESSION_DATA_POISON_MSG)
419 .session_id,
420 );
421 }
422
423 pub fn device_id(&self) -> &str {
424 &self.config().device_id
425 }
426
427 pub fn client_id(&self) -> String {
428 self.0
429 .data
430 .read()
431 .expect(SESSION_DATA_POISON_MSG)
432 .client_id
433 .clone()
434 }
435
436 pub fn set_client_id(&self, client_id: &str) {
437 client_id.clone_into(
438 &mut self
439 .0
440 .data
441 .write()
442 .expect(SESSION_DATA_POISON_MSG)
443 .client_id,
444 );
445 }
446
447 pub fn client_name(&self) -> String {
448 self.0
449 .data
450 .read()
451 .expect(SESSION_DATA_POISON_MSG)
452 .client_name
453 .clone()
454 }
455
456 pub fn set_client_name(&self, client_name: &str) {
457 client_name.clone_into(
458 &mut self
459 .0
460 .data
461 .write()
462 .expect(SESSION_DATA_POISON_MSG)
463 .client_name,
464 );
465 }
466
467 pub fn client_brand_name(&self) -> String {
468 self.0
469 .data
470 .read()
471 .expect(SESSION_DATA_POISON_MSG)
472 .client_brand_name
473 .clone()
474 }
475
476 pub fn set_client_brand_name(&self, client_brand_name: &str) {
477 client_brand_name.clone_into(
478 &mut self
479 .0
480 .data
481 .write()
482 .expect(SESSION_DATA_POISON_MSG)
483 .client_brand_name,
484 );
485 }
486
487 pub fn client_model_name(&self) -> String {
488 self.0
489 .data
490 .read()
491 .expect(SESSION_DATA_POISON_MSG)
492 .client_model_name
493 .clone()
494 }
495
496 pub fn set_client_model_name(&self, client_model_name: &str) {
497 client_model_name.clone_into(
498 &mut self
499 .0
500 .data
501 .write()
502 .expect(SESSION_DATA_POISON_MSG)
503 .client_model_name,
504 );
505 }
506
507 pub fn connection_id(&self) -> String {
508 self.0
509 .data
510 .read()
511 .expect(SESSION_DATA_POISON_MSG)
512 .connection_id
513 .clone()
514 }
515
516 pub fn set_connection_id(&self, connection_id: &str) {
517 connection_id.clone_into(
518 &mut self
519 .0
520 .data
521 .write()
522 .expect(SESSION_DATA_POISON_MSG)
523 .connection_id,
524 );
525 }
526
527 pub fn username(&self) -> String {
528 self.0
529 .data
530 .read()
531 .expect(SESSION_DATA_POISON_MSG)
532 .user_data
533 .canonical_username
534 .clone()
535 }
536
537 pub fn set_username(&self, username: &str) {
538 username.clone_into(
539 &mut self
540 .0
541 .data
542 .write()
543 .expect(SESSION_DATA_POISON_MSG)
544 .user_data
545 .canonical_username,
546 );
547 }
548
549 pub fn auth_data(&self) -> Vec<u8> {
550 self.0
551 .data
552 .read()
553 .expect(SESSION_DATA_POISON_MSG)
554 .auth_data
555 .clone()
556 }
557
558 pub fn set_auth_data(&self, auth_data: &[u8]) {
559 auth_data.clone_into(
560 &mut self
561 .0
562 .data
563 .write()
564 .expect(SESSION_DATA_POISON_MSG)
565 .auth_data,
566 );
567 }
568
569 pub fn country(&self) -> String {
570 self.0
571 .data
572 .read()
573 .expect(SESSION_DATA_POISON_MSG)
574 .user_data
575 .country
576 .clone()
577 }
578
579 pub fn filter_explicit_content(&self) -> bool {
580 match self.get_user_attribute("filter-explicit-content") {
581 Some(value) => matches!(&*value, "1"),
582 None => false,
583 }
584 }
585
586 pub fn autoplay(&self) -> bool {
587 if let Some(overide) = self.config().autoplay {
588 return overide;
589 }
590
591 match self.get_user_attribute("autoplay") {
592 Some(value) => matches!(&*value, "1"),
593 None => false,
594 }
595 }
596
597 pub fn set_user_attribute(&self, key: &str, value: &str) -> Option<String> {
598 let mut dummy_attributes = UserAttributes::new();
599 dummy_attributes.insert(key.to_owned(), value.to_owned());
600 Self::check_catalogue(&dummy_attributes);
601
602 self.0
603 .data
604 .write()
605 .expect(SESSION_DATA_POISON_MSG)
606 .user_data
607 .attributes
608 .insert(key.to_owned(), value.to_owned())
609 }
610
611 pub fn set_user_attributes(&self, attributes: UserAttributes) {
612 Self::check_catalogue(&attributes);
613
614 self.0
615 .data
616 .write()
617 .expect(SESSION_DATA_POISON_MSG)
618 .user_data
619 .attributes
620 .extend(attributes)
621 }
622
623 pub fn get_user_attribute(&self, key: &str) -> Option<String> {
624 self.0
625 .data
626 .read()
627 .expect(SESSION_DATA_POISON_MSG)
628 .user_data
629 .attributes
630 .get(key)
631 .cloned()
632 }
633
634 fn weak(&self) -> SessionWeak {
635 SessionWeak(Arc::downgrade(&self.0))
636 }
637
638 pub fn shutdown(&self) {
639 debug!("Shutdown: Invalidating session");
640 self.0.data.write().expect(SESSION_DATA_POISON_MSG).invalid = true;
641 self.mercury().shutdown();
642 self.channel().shutdown();
643 }
644
645 pub fn is_invalid(&self) -> bool {
646 self.0.data.read().expect(SESSION_DATA_POISON_MSG).invalid
647 }
648}
649
650#[derive(Clone)]
651pub struct SessionWeak(Weak<SessionInternal>);
652
653impl SessionWeak {
654 fn try_upgrade(&self) -> Option<Session> {
655 self.0.upgrade().map(Session)
656 }
657
658 pub(crate) fn upgrade(&self) -> Session {
659 self.try_upgrade()
660 .expect("session was dropped and so should have this component")
661 }
662}
663
664impl Drop for SessionInternal {
665 fn drop(&mut self) {
666 debug!("drop Session");
667 }
668}
669
670#[derive(Clone, Copy, Default, Debug, PartialEq)]
671enum KeepAliveState {
672 #[default]
673 ExpectingPing,
675
676 PendingPong,
678
679 ExpectingPongAck,
681}
682
683const INITIAL_PING_TIMEOUT: TokioDuration = TokioDuration::from_secs(20);
684const PING_TIMEOUT: TokioDuration = TokioDuration::from_secs(80); const PONG_DELAY: TokioDuration = TokioDuration::from_secs(60);
686const PONG_ACK_TIMEOUT: TokioDuration = TokioDuration::from_secs(20);
687
688impl KeepAliveState {
689 fn debug(&self, sleep: &Sleep) {
690 let delay = sleep
691 .deadline()
692 .checked_duration_since(TokioInstant::now())
693 .map(|t| t.as_secs_f64())
694 .unwrap_or(f64::INFINITY);
695
696 trace!("keep-alive state: {self:?}, timeout in {delay:.1}");
697 }
698}
699
700pin_project! {
701 struct DispatchTask<S>
702 where
703 S: TryStream<Ok = (u8, Bytes)>
704 {
705 session: SessionWeak,
706 keep_alive_state: KeepAliveState,
707 #[pin]
708 stream: S,
709 #[pin]
710 timeout: Sleep,
711 }
712
713 impl<S> PinnedDrop for DispatchTask<S>
714 where
715 S: TryStream<Ok = (u8, Bytes)>
716 {
717 fn drop(_this: Pin<&mut Self>) {
718 debug!("drop Dispatch");
719 }
720 }
721}
722
723impl<S> DispatchTask<S>
724where
725 S: TryStream<Ok = (u8, Bytes)>,
726{
727 fn new(session: SessionWeak, stream: S) -> Self {
728 Self {
729 session,
730 keep_alive_state: KeepAliveState::ExpectingPing,
731 stream,
732 timeout: sleep(INITIAL_PING_TIMEOUT),
733 }
734 }
735
736 fn dispatch(
737 mut self: Pin<&mut Self>,
738 session: &Session,
739 cmd: u8,
740 data: Bytes,
741 ) -> Result<(), Error> {
742 use KeepAliveState::*;
743 use PacketType::*;
744
745 let packet_type = FromPrimitive::from_u8(cmd);
746 let cmd = match packet_type {
747 Some(cmd) => cmd,
748 None => {
749 trace!("Ignoring unknown packet {cmd:x}");
750 return Err(SessionError::Packet(cmd).into());
751 }
752 };
753
754 match packet_type {
755 Some(Ping) => {
756 trace!("Received Ping");
757 if self.keep_alive_state != ExpectingPing {
758 warn!("Received unexpected Ping from server")
759 }
760 let mut this = self.as_mut().project();
761 *this.keep_alive_state = PendingPong;
762 this.timeout
763 .as_mut()
764 .reset(TokioInstant::now() + PONG_DELAY);
765 this.keep_alive_state.debug(&this.timeout);
766
767 let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64;
768 let timestamp = SystemTime::now()
769 .duration_since(UNIX_EPOCH)
770 .unwrap_or(Duration::ZERO)
771 .as_secs() as i64;
772 {
773 let mut data = session.0.data.write().expect(SESSION_DATA_POISON_MSG);
774 data.time_delta = server_timestamp.saturating_sub(timestamp);
775 }
776
777 session.debug_info();
778
779 Ok(())
780 }
781 Some(PongAck) => {
782 trace!("Received PongAck");
783 if self.keep_alive_state != ExpectingPongAck {
784 warn!("Received unexpected PongAck from server")
785 }
786 let mut this = self.as_mut().project();
787 *this.keep_alive_state = ExpectingPing;
788 this.timeout
789 .as_mut()
790 .reset(TokioInstant::now() + PING_TIMEOUT);
791 this.keep_alive_state.debug(&this.timeout);
792
793 Ok(())
794 }
795 Some(CountryCode) => {
796 let country = String::from_utf8(data.as_ref().to_owned())?;
797 info!("Country: {country:?}");
798 session
799 .0
800 .data
801 .write()
802 .expect(SESSION_DATA_POISON_MSG)
803 .user_data
804 .country = country;
805 Ok(())
806 }
807 Some(StreamChunkRes) | Some(ChannelError) => session.channel().dispatch(cmd, data),
808 Some(AesKey) | Some(AesKeyError) => session.audio_key().dispatch(cmd, data),
809 Some(MercuryReq) | Some(MercurySub) | Some(MercuryUnsub) | Some(MercuryEvent) => {
810 session.mercury().dispatch(cmd, data)
811 }
812 Some(ProductInfo) => {
813 let data = std::str::from_utf8(&data)?;
814 let mut reader = quick_xml::Reader::from_str(data);
815
816 let mut buf = Vec::new();
817 let mut current_element = String::new();
818 let mut user_attributes: UserAttributes = HashMap::new();
819
820 loop {
821 match reader.read_event_into(&mut buf) {
822 Ok(Event::Start(ref element)) => {
823 std::str::from_utf8(element)?.clone_into(&mut current_element)
824 }
825 Ok(Event::End(_)) => {
826 current_element = String::new();
827 }
828 Ok(Event::Text(ref value)) => {
829 if !current_element.is_empty() {
830 let _ = user_attributes.insert(
831 current_element.clone(),
832 value.xml_content()?.to_string(),
833 );
834 }
835 }
836 Ok(Event::Eof) => break,
837 Ok(_) => (),
838 Err(e) => warn!(
839 "Error parsing XML at position {}: {:?}",
840 reader.buffer_position(),
841 e
842 ),
843 }
844 }
845
846 trace!("Received product info: {user_attributes:#?}");
847 Session::check_catalogue(&user_attributes);
848
849 session
850 .0
851 .data
852 .write()
853 .expect(SESSION_DATA_POISON_MSG)
854 .user_data
855 .attributes = user_attributes;
856 Ok(())
857 }
858 Some(SecretBlock)
859 | Some(LegacyWelcome)
860 | Some(UnknownDataAllZeros)
861 | Some(LicenseVersion) => Ok(()),
862 _ => {
863 trace!("Ignoring {cmd:?} packet with data {data:#?}");
864 Err(SessionError::Packet(cmd as u8).into())
865 }
866 }
867 }
868}
869
870impl<S> Future for DispatchTask<S>
871where
872 S: TryStream<Ok = (u8, Bytes), Error = std::io::Error>,
873 <S as TryStream>::Ok: std::fmt::Debug,
874{
875 type Output = Result<(), S::Error>;
876
877 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
878 use KeepAliveState::*;
879
880 let session = match self.session.try_upgrade() {
881 Some(session) => session,
882 None => return Poll::Ready(Ok(())),
883 };
884
885 loop {
887 match self.as_mut().project().stream.try_poll_next(cx) {
888 Poll::Ready(Some(Ok((cmd, data)))) => {
889 let result = self.as_mut().dispatch(&session, cmd, data);
890 if let Err(e) = result {
891 debug!("could not dispatch command: {e}");
892 }
893 }
894 Poll::Ready(None) => {
895 warn!("Connection to server closed.");
896 session.shutdown();
897 return Poll::Ready(Ok(()));
898 }
899 Poll::Ready(Some(Err(e))) => {
900 error!("Connection to server closed.");
901 session.shutdown();
902 return Poll::Ready(Err(e));
903 }
904 Poll::Pending => break,
905 }
906 }
907
908 let mut this = self.as_mut().project();
926 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
927 match this.keep_alive_state {
928 ExpectingPing | ExpectingPongAck => {
929 if !session.is_invalid() {
930 session.shutdown();
931 }
932 return Poll::Ready(Err(io::Error::new(
934 io::ErrorKind::TimedOut,
935 format!(
936 "session lost connection to server ({:?})",
937 this.keep_alive_state
938 ),
939 )));
940 }
941 PendingPong => {
942 trace!("Sending Pong");
943 let _ = session.send_packet(PacketType::Pong, vec![0, 0, 0, 0]);
946 *this.keep_alive_state = ExpectingPongAck;
947 this.timeout
948 .as_mut()
949 .reset(TokioInstant::now() + PONG_ACK_TIMEOUT);
950 this.keep_alive_state.debug(&this.timeout);
951 }
952 }
953 }
954
955 Poll::Pending
956 }
957}