1use std::{
4 collections::{HashMap, VecDeque},
5 pin::Pin,
6 sync::Arc,
7 time::Duration,
8};
9
10use chrono::{DateTime, Utc};
11use prost::Message;
12use steam_auth::{CredentialsDetails, EAuthSessionGuardType, EAuthTokenPlatformType, LoginSession, LoginSessionOptions};
13use steam_enums::{EMsg, EPersonaState, EResult};
14use steam_protos::{CMsgClientHello, CMsgClientLogOff, CMsgClientLogon, CMsgClientLogonResponse, PROTOCOL_VERSION};
15use steamid::SteamID;
16use tracing::{debug, error, info, info_span, warn, Instrument};
17
18use crate::{
19 connection::{CmServer, CmServerProvider, HttpCmServerProvider, SteamConnection, WebSocketConnection},
20 error::SteamError,
21 options::SteamOptions,
22 protocol::SteamMessage,
23 types::{AccountInfo, EmailInfo, Limitations, LogOnDetails, LogOnResponse, VacStatus, WalletInfo},
24};
25
26#[derive(Debug, Clone)]
28pub struct UserPersona {
29 pub steam_id: SteamID,
30 pub player_name: String,
31 pub persona_state: EPersonaState,
32 pub persona_state_flags: u32,
33 pub avatar_hash: Option<String>,
34 pub game_name: Option<String>,
35 pub game_id: Option<u64>,
36 pub last_logon: Option<DateTime<Utc>>,
37 pub last_logoff: Option<DateTime<Utc>>,
38 pub last_seen_online: Option<DateTime<Utc>>,
39 pub rich_presence: HashMap<String, String>,
40 pub steam_player_group: Option<String>,
42 pub rich_presence_status: Option<String>,
44 pub game_map: Option<String>,
46 pub game_score: Option<String>,
48 pub num_players: Option<u32>,
50 pub unread_count: u32,
51 pub last_message_time: u32,
52}
53
54impl Default for UserPersona {
55 fn default() -> Self {
56 Self {
57 steam_id: SteamID::default(),
58 player_name: String::new(),
59 persona_state: EPersonaState::Offline,
60 persona_state_flags: 0,
61 avatar_hash: None,
62 game_name: None,
63 game_id: None,
64 last_logon: None,
65 last_logoff: None,
66 last_seen_online: None,
67 rich_presence: HashMap::new(),
68 steam_player_group: None,
69 rich_presence_status: None,
70 game_map: None,
71 game_score: None,
72 num_players: None,
73 unread_count: 0,
74 last_message_time: 0,
75 }
76 }
77}
78
79impl UserPersona {
80 pub fn avatar_url(&self, size: &str) -> Option<String> {
84 let hash = self.avatar_hash.as_ref()?;
85 let size_suffix = match size {
86 "medium" => "_medium.jpg",
87 "full" => "_full.jpg",
88 _ => ".jpg",
89 };
90 Some(format!("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{}/{}{}", &hash[0..2], hash, size_suffix))
91 }
92
93 pub fn merge(&mut self, other: &UserPersona) {
96 if !other.player_name.is_empty() {
97 self.player_name = other.player_name.clone();
98 }
99 self.persona_state = other.persona_state;
100 if other.persona_state_flags != 0 {
101 self.persona_state_flags = other.persona_state_flags;
102 }
103 if other.avatar_hash.is_some() {
104 self.avatar_hash = other.avatar_hash.clone();
105 }
106 if other.game_name.is_some() {
107 self.game_name = other.game_name.clone();
108 }
109 if other.game_id.is_some() && other.game_id != Some(0) {
110 self.game_id = other.game_id;
111 }
112 if other.last_logon.is_some() {
113 self.last_logon = other.last_logon;
114 }
115 if other.last_logoff.is_some() {
116 self.last_logoff = other.last_logoff;
117 }
118 if other.last_seen_online.is_some() {
119 self.last_seen_online = other.last_seen_online;
120 }
121 for (k, v) in &other.rich_presence {
122 self.rich_presence.insert(k.clone(), v.clone());
123 }
124 if other.steam_player_group.is_some() {
125 self.steam_player_group = other.steam_player_group.clone();
126 }
127 if other.rich_presence_status.is_some() {
128 self.rich_presence_status = other.rich_presence_status.clone();
129 }
130 if other.game_map.is_some() {
131 self.game_map = other.game_map.clone();
132 }
133 if other.game_score.is_some() {
134 self.game_score = other.game_score.clone();
135 }
136 if other.unread_count > 0 {
139 self.unread_count = other.unread_count;
140 }
141 if other.last_message_time > 0 {
142 self.last_message_time = other.last_message_time;
143 }
144 }
145}
146
147use std::time::Instant;
148
149use bytes::Bytes;
150use tokio::sync::{mpsc, oneshot};
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub(crate) enum BackgroundTask {
155 FriendsList,
156 OfflineMessages(SteamID),
157}
158
159#[derive(Debug)]
161pub(crate) struct QueuedMessage {
162 pub friend: SteamID,
163 pub message: String,
164 pub entry_type: steam_enums::EChatEntryType,
165 pub contains_bbcode: bool,
166 pub respond_to: oneshot::Sender<Result<crate::services::chat::SendMessageResult, SteamError>>,
167}
168
169const LOGON_TIMEOUT_ITERATIONS: usize = 150;
171const LOGON_TIMEOUT_DELAY_MS: u64 = 100;
173const HEARTBEAT_INTERVAL_SECONDS: u64 = 30;
175
176pub struct SteamClient {
178 pub options: SteamOptions,
180
181 pub steam_id: Option<SteamID>,
183
184 pub public_ip: Option<String>,
186
187 pub cell_id: Option<u32>,
189
190 pub vanity_url: Option<String>,
192
193 pub account_info: Option<AccountInfo>,
195
196 pub email_info: Option<EmailInfo>,
198
199 pub limitations: Option<Limitations>,
201
202 pub vac: Option<VacStatus>,
204
205 pub wallet: Option<WalletInfo>,
207
208 pub my_friends: HashMap<SteamID, u32>,
210
211 pub users: HashMap<SteamID, UserPersona>,
213
214 pub apps: HashMap<u32, super::events::AppInfoData>,
216
217 pub licenses: Vec<super::events::LicenseEntry>,
219
220 pub pending_apps: rustc_hash::FxHashSet<u32>,
222
223 pub chat_last_view: HashMap<SteamID, u32>,
225
226 pub playing_blocked: bool,
228 pub playing_app_ids: Vec<u32>,
229
230 pub connect_time: u64,
232 pub connection_count: u32,
233
234 pub auth_seq_me: u32,
236 pub auth_seq_them: u32,
237 pub h_steam_pipe: u32,
238 pub gc_tokens: Vec<Vec<u8>>,
239 pub active_tickets: Vec<crate::services::appauth::AuthSessionTicket>,
240
241 pub last_time_party_register: Option<i64>,
244
245 pub(crate) chat_queue_tx: mpsc::UnboundedSender<QueuedMessage>,
247 pub(crate) chat_queue_rx: mpsc::UnboundedReceiver<QueuedMessage>,
248 pub(crate) rate_limit_until: Option<Instant>,
249 pub(crate) pending_retry_message: Option<QueuedMessage>,
250 pub(crate) chat_tasks: tokio::task::JoinSet<(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>)>,
251
252 pub(crate) connection: Option<Box<dyn SteamConnection>>,
254 pub(crate) login_session: Option<LoginSession>,
255 pub(crate) logon_details: Option<LogOnDetails>,
256 pub(crate) session_id: i32,
257 pub(crate) connecting: bool,
258 pub(crate) logging_off: bool,
259 pub(crate) temp_steam_id: Option<SteamID>,
260 pub(crate) event_queue: VecDeque<super::events::SteamEvent>,
261
262 pub(crate) reconnect_manager: crate::internal::reconnect::ReconnectManager,
264 pub(crate) heartbeat_manager: crate::internal::heartbeat::HeartbeatManager,
265 pub(crate) relogging: bool,
266 pub(crate) last_cm_server: Option<CmServer>,
267
268 pub(crate) job_manager: crate::internal::jobs::JobManager,
270
271 pub(crate) gc_jobs: crate::services::gc::GCJobManager,
273
274 pub(crate) background_job_tasks: HashMap<u64, BackgroundTask>,
276
277 pub(crate) background_job_results_tx: mpsc::Sender<(u64, Result<Bytes, SteamError>)>,
279 pub(crate) background_job_results_rx: mpsc::Receiver<(u64, Result<Bytes, SteamError>)>,
280
281 pub persona_cache: crate::cache::PersonaCache,
283
284 pub session_recovery: crate::services::SessionRecovery,
286
287 pub(crate) http_client: Arc<dyn crate::utils::http::HttpClient>,
289 pub(crate) clock: Arc<dyn crate::utils::clock::Clock>,
290 pub(crate) rng: Arc<dyn crate::utils::rng::Rng>,
291}
292
293impl SteamClient {
294 pub fn builder() -> super::builder::SteamClientBuilder {
316 super::builder::SteamClientBuilder::new()
317 }
318
319 pub fn new(options: SteamOptions) -> Self {
321 Self::builder().with_options(options).build()
322 }
323
324 pub(crate) fn with_all_providers(options: SteamOptions, http_client: Arc<dyn crate::utils::http::HttpClient>, clock: Arc<dyn crate::utils::clock::Clock>, rng: Arc<dyn crate::utils::rng::Rng>) -> Self {
329 let reconnect_config = options.reconnect.clone();
330 let (chat_queue_tx, chat_queue_rx) = mpsc::unbounded_channel();
331 let (bg_tx, bg_rx) = mpsc::channel(100);
332
333 Self {
334 options,
335 steam_id: None,
336 public_ip: None,
337 cell_id: None,
338 vanity_url: None,
339 account_info: None,
340 email_info: None,
341 limitations: None,
342 vac: None,
343 wallet: None,
344 my_friends: HashMap::new(),
345 users: HashMap::new(),
346 apps: HashMap::new(),
347 licenses: Vec::new(),
348 pending_apps: rustc_hash::FxHashSet::default(),
349 chat_last_view: HashMap::new(),
350 playing_blocked: false,
351 playing_app_ids: Vec::new(),
352 connect_time: 0,
353 connection_count: 0,
354 auth_seq_me: 0,
355 auth_seq_them: 0,
356 h_steam_pipe: (rng.gen_u32() % 1000000) + 1,
357 gc_tokens: Vec::new(),
358 active_tickets: Vec::new(),
359 last_time_party_register: None,
360 chat_queue_tx,
361 chat_queue_rx,
362 rate_limit_until: None,
363 pending_retry_message: None,
364 chat_tasks: tokio::task::JoinSet::new(),
365 connection: None,
366 login_session: None,
367 logon_details: None,
368 session_id: 0,
369 connecting: false,
370 logging_off: false,
371 temp_steam_id: None,
372 event_queue: VecDeque::new(),
373 reconnect_manager: crate::internal::reconnect::ReconnectManager::new(reconnect_config),
374 heartbeat_manager: crate::internal::heartbeat::HeartbeatManager::new(HEARTBEAT_INTERVAL_SECONDS),
375 relogging: false,
376 last_cm_server: None,
377 job_manager: crate::internal::jobs::JobManager::new(),
378 gc_jobs: crate::services::gc::GCJobManager::new(),
379 background_job_tasks: HashMap::new(),
380 background_job_results_tx: bg_tx,
381 background_job_results_rx: bg_rx,
382 persona_cache: crate::cache::PersonaCache::default(),
383 session_recovery: crate::services::SessionRecovery::new(),
384 http_client,
385 clock,
386 rng,
387 }
388 }
389
390 pub async fn log_on(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
392 if self.steam_id.is_some() {
393 return Err(SteamError::AlreadyLoggedOn);
394 }
395
396 if self.connecting {
397 return Err(SteamError::AlreadyConnecting);
398 }
399
400 self.connecting = true;
401
402 match self.execute_logon(details).await {
403 Ok(response) => Ok(response),
404 Err(e) => {
405 match &e {
406 SteamError::SteamResult(steam_enums::EResult::AccountLoginDeniedThrottle) => {
407 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
408 }
409 SteamError::SteamResult(steam_enums::EResult::ServiceUnavailable) => {
410 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "ServiceUnavailable");
411 }
412 SteamError::SteamResult(steam_enums::EResult::TryAnotherCM) => {
413 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "TryAnotherCM");
414 }
415 _ => {}
416 }
417 self.connecting = false;
418 Err(e)
419 }
420 }
421 }
422
423 async fn execute_logon(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
424 self.logging_off = false;
425
426 info!("Starting Steam login...");
427 crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::CMConnection).await;
428
429 self.logon_details = Some(details.clone());
431
432 let is_web_logon = details.web_logon_token.is_some() && details.steam_id.is_some();
434 let is_anonymous = !is_web_logon && details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none() && !is_web_logon);
435
436 let mut temp_sid = SteamID::new();
438 temp_sid.universe = steamid::Universe::Public;
439
440 if is_web_logon {
441 info!("Logging in with web logon token");
443 if let Some(sid) = details.steam_id {
444 temp_sid = sid;
445 }
446 temp_sid.account_type = steamid::AccountType::Individual;
447 } else if is_anonymous {
448 info!("Logging in anonymously");
449 temp_sid.account_type = steamid::AccountType::AnonUser;
450 } else if let Some(ref token) = details.refresh_token {
451 info!("Logging in with refresh token");
452 match LoginSession::from_refresh_token(token.clone()) {
454 Ok(session) => {
455 if let Some(sid) = session.steam_id().cloned() {
456 temp_sid = sid;
457 }
458 self.login_session = Some(session);
459 }
460 Err(e) => {
461 return Err(SteamError::InvalidToken(e.to_string()));
462 }
463 }
464 } else if details.account_name.is_some() && details.password.is_some() {
465 info!("Logging in with account name and password");
466 temp_sid.account_type = steamid::AccountType::Individual;
467 }
468
469 self.temp_steam_id = Some(temp_sid);
470
471 let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
473 let server = provider.get_server().await?;
474 info!("Connecting to CM server: {}", server.endpoint);
475 let start_time = Instant::now();
476
477 let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
479 debug!("WebSocket connection established in {}ms", start_time.elapsed().as_millis());
480
481 self.session_id = self.rng.gen_i32().abs();
483
484 let skip_hello = details.refresh_token.is_some() || details.web_logon_token.is_some();
487 if !skip_hello {
488 let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
489 let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
490 connection.send(hello_msg.encode()).await?;
491 debug!("Sent ClientHello");
492 }
493
494 let logon = self.build_logon_message(&details, is_anonymous);
496 let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
497 connection.send(logon_msg.encode()).await?;
498 debug!("Sent ClientLogon");
499
500 let response = self.wait_for_logon_response(&mut *connection).await?;
502
503 self.connection = Some(connection);
504 self.connecting = false;
505
506 let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
508
509 if eresult != EResult::OK {
510 self.steam_id = None;
511 return Err(SteamError::SteamResult(eresult));
512 }
513
514 let steam_id = if let Some(sid) = response.client_supplied_steamid { SteamID::from(sid) } else { self.temp_steam_id.unwrap_or_default() };
516
517 self.steam_id = Some(steam_id);
518 self.cell_id = response.cell_id;
519 self.vanity_url = response.vanity_url.clone();
520
521 if let Some(ref ip) = response.public_ip {
522 if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
523 self.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
524 }
525 }
526
527 if let Some(secs) = response.heartbeat_seconds {
529 if secs > 0 {
530 self.heartbeat_manager.set_interval(secs as u64);
531 }
532 }
533 self.heartbeat_manager.reset();
534
535 let username = details.account_name.as_deref().unwrap_or("Unknown");
536 info!("Logged on as steam_id: {}, name: {}, username: {}", steam_id.steam_id64(), "Unknown", username);
537
538 Ok(LogOnResponse {
539 eresult,
540 steam_id,
541 public_ip: self.public_ip.clone(),
542 cell_id: self.cell_id.unwrap_or(0),
543 vanity_url: self.vanity_url.clone(),
544 email_domain: response.email_domain.clone(),
545 steam_guard_required: false,
546 heartbeat_seconds: response.heartbeat_seconds,
547 server_time: response.rtime32_server_time,
548 account_flags: response.account_flags,
549 user_country: response.user_country.clone(),
550 ip_country_code: response.ip_country_code.clone(),
551 client_instance_id: response.client_instance_id,
552 token_id: response.token_id,
553 family_group_id: response.family_group_id,
554 eresult_extended: response.eresult_extended,
555 cell_id_ping_threshold: response.cell_id_ping_threshold,
556 force_client_update_check: response.force_client_update_check,
557 agreement_session_url: response.agreement_session_url.clone(),
558 legacy_out_of_game_heartbeat_seconds: response.legacy_out_of_game_heartbeat_seconds,
559 parental_settings: response.parental_settings.clone(),
560 parental_setting_signature: response.parental_setting_signature.clone(),
561 count_loginfailures_to_migrate: response.count_loginfailures_to_migrate,
562 count_disconnects_to_migrate: response.count_disconnects_to_migrate,
563 ogs_data_report_time_window: response.ogs_data_report_time_window,
564 steam2_ticket: response.steam2_ticket.clone(),
565 })
566 }
567
568 fn build_logon_message(&self, details: &LogOnDetails, is_anonymous: bool) -> CMsgClientLogon {
570 use crate::protocol::messages::{build_client_logon, LogonConfig};
571
572 let machine_name = if details.machine_name.is_some() {
574 details.machine_name.clone()
575 } else if !is_anonymous {
576 Some(format!("DESKTOP-{:06}", self.rng.gen_u32() % 1000000))
577 } else {
578 None
579 };
580
581 let identifier = if let Some(ref name) = details.account_name { Some(name.clone()) } else { self.temp_steam_id.as_ref().map(|sid| sid.steam_id64().to_string()) };
582
583 let config = LogonConfig::new().with_cell_id(self.cell_id).with_machine_name(machine_name).with_identifier(identifier);
585
586 build_client_logon(&config, details, is_anonymous)
588 }
589
590 async fn wait_for_logon_response(&self, connection: &mut dyn SteamConnection) -> Result<CMsgClientLogonResponse, SteamError> {
592 use std::io::Read;
593
594 use flate2::read::GzDecoder;
595
596 for _ in 0..LOGON_TIMEOUT_ITERATIONS {
599 match connection.recv().await? {
600 Some(data) => {
601 let msg = SteamMessage::decode_from_bytes(&data)?;
602
603 debug!("Received message: {:?}", msg.msg);
604
605 if msg.msg == EMsg::ClientLogOnResponse {
606 return msg.decode_body::<CMsgClientLogonResponse>();
607 }
608
609 if msg.msg == EMsg::Multi {
611 if let Ok(multi) = msg.decode_body::<steam_protos::CMsgMulti>() {
612 let body = multi.message_body.unwrap_or_default();
613 let payload = if multi.size_unzipped.unwrap_or(0) > 0 {
614 let decompressed_result = tokio::task::spawn_blocking(move || {
617 let mut decoder = GzDecoder::new(&body[..]);
618 let mut decompressed = Vec::new();
619 if decoder.read_to_end(&mut decompressed).is_ok() {
620 Some(decompressed)
621 } else {
622 None
623 }
624 })
625 .await
626 .map_err(|e| SteamError::Other(format!("Task join error: {}", e)))?;
627
628 if let Some(decompressed) = decompressed_result {
629 decompressed
630 } else {
631 tracing::error!("Failed to decompress Multi message");
632 continue;
633 }
634 } else {
635 body
636 };
637
638 let mut offset = 0;
640 while offset + 4 <= payload.len() {
641 let sub_size = u32::from_le_bytes([payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3]]) as usize;
642 offset += 4;
643
644 if offset + sub_size > payload.len() {
645 break;
646 }
647
648 let sub_data = &payload[offset..offset + sub_size];
649 if let Ok(sub_msg) = SteamMessage::decode_from_bytes(sub_data) {
650 debug!("Received sub-message inside Multi: {:?}", sub_msg.msg);
651 if sub_msg.msg == EMsg::ClientLogOnResponse {
652 return sub_msg.decode_body::<CMsgClientLogonResponse>();
653 }
654 }
655 offset += sub_size;
656 }
657 }
658 }
659 }
660 None => {
661 return Err(SteamError::ConnectionError("Connection closed by server during login".into()));
662 }
663 }
664
665 self.clock.sleep(Duration::from_millis(LOGON_TIMEOUT_DELAY_MS)).await;
666 }
667
668 Err(SteamError::Timeout)
669 }
670
671 fn create_proto_message<T: Message>(&self, msg: EMsg, body: &T) -> SteamMessage {
673 use crate::protocol::messages::{build_proto_header, create_steam_message};
674
675 let steam_id = self.temp_steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0);
676 let header = build_proto_header(self.session_id, steam_id, u64::MAX, u64::MAX);
677
678 create_steam_message(msg, header, body)
679 }
680
681 pub async fn log_off(&mut self) -> Result<(), SteamError> {
683 if self.steam_id.is_none() {
684 return Err(SteamError::NotLoggedOn);
685 }
686
687 info!("Logging off from Steam");
688 self.logging_off = true;
689
690 let logoff = CMsgClientLogOff::default();
692 let msg = self.create_proto_message(EMsg::ClientLogOff, &logoff);
693
694 if let Some(ref mut conn) = self.connection {
696 let _ = conn.send(msg.encode()).await;
697 }
698
699 if let Some(conn) = self.connection.take() {
701 conn.close().await?;
702 }
703
704 self.steam_id = None;
706 self.public_ip = None;
707 self.cell_id = None;
708 self.account_info = None;
709 self.email_info = None;
710 self.limitations = None;
711 self.vac = None;
712 self.wallet = None;
713 self.my_friends.clear();
714 self.logon_details = None;
715 self.session_id = 0;
716
717 self.logging_off = false;
718
719 Ok(())
720 }
721
722 pub async fn log_on_with_password(&mut self, account_name: &str, password: &str, steam_guard_code: Option<&str>, machine_auth_token: Option<&str>) -> Result<LogOnResponse, SteamError> {
750 if self.steam_id.is_some() {
751 return Err(SteamError::AlreadyLoggedOn);
752 }
753
754 if self.connecting {
755 return Err(SteamError::AlreadyConnecting);
756 }
757
758 info!("Starting password authentication for {}", account_name);
759 crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
760
761 let mut session = LoginSession::new(EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient, Some(LoginSessionOptions { machine_friendly_name: Some(format!("DESKTOP-{}", self.rng.gen_u32() % 1000000)), ..Default::default() }));
763
764 let credentials = CredentialsDetails {
766 account_name: account_name.to_string(),
767 password: password.to_string(),
768 persistence: None,
769 steam_guard_machine_token: machine_auth_token.map(|s| s.to_string()),
770 steam_guard_code: steam_guard_code.map(|s| s.to_string()),
771 };
772
773 let start_result = match session.start_with_credentials(credentials).await {
774 Ok(res) => res,
775 Err(e) => {
776 if let steam_auth::SessionError::SteamError(_, steam_enums::EResult::AccountLoginDeniedThrottle) = e {
777 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
778 }
779 return Err(SteamError::SessionError(e));
780 }
781 };
782
783 if start_result.action_required {
785 if let Some(ref actions) = start_result.valid_actions {
786 let action = actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeEmailCode)).or_else(|| actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeDeviceCode))).or_else(|| actions.first());
789
790 if let Some(action) = action {
791 debug!("Steam Guard required: {:?}", action.guard_type);
792
793 self.login_session = Some(session);
795
796 return Err(SteamError::SteamGuardRequired { guard_type: action.guard_type, email_domain: action.detail.clone() });
797 }
798 }
799
800 warn!("Action required but no valid actions returned");
802 return Err(SteamError::InvalidCredentials);
803 }
804
805 self.complete_password_auth(session, account_name).await
807 }
808
809 pub async fn submit_steam_guard_code(&mut self, code: &str) -> Result<LogOnResponse, SteamError> {
822 let session = self.login_session.take().ok_or_else(|| SteamError::Other("No pending login session".to_string()))?;
823
824 let account_name = session.account_name().map(|s| s.to_string()).unwrap_or_default();
825
826 let mut session = session;
827 session.submit_steam_guard_code(code).await?;
828
829 self.complete_password_auth(session, &account_name).await
830 }
831
832 async fn complete_password_auth(&mut self, mut session: LoginSession, account_name: &str) -> Result<LogOnResponse, SteamError> {
834 let poll_result = loop {
836 match session.poll().await? {
837 Some(result) => break result,
838 None => {
839 let interval = session.poll_interval();
841 self.clock.sleep(Duration::from_secs_f32(interval)).await;
842 }
843 }
844 };
845
846 info!("Password authentication successful, got refresh token");
847 debug!("Refresh token account: {}", poll_result.account_name);
848
849 if let Some(ref _guard_data) = poll_result.new_guard_data {
851 debug!("Received new Steam Guard machine token");
852 self.login_session = Some(session);
854 }
855
856 self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::RefreshToken { token: poll_result.refresh_token.clone(), account_name: poll_result.account_name.clone() }));
858
859 self.log_on(LogOnDetails {
861 refresh_token: Some(poll_result.refresh_token),
862 account_name: Some(account_name.to_string()),
863 ..Default::default()
864 })
865 .await
866 }
867
868 pub fn is_logged_in(&self) -> bool {
870 self.steam_id.is_some()
871 }
872
873 pub fn reset_connecting_state(&mut self) {
877 self.connecting = false;
878 }
879
880 pub async fn get_web_session(&mut self) -> Result<(String, Vec<String>), SteamError> {
905 if self.steam_id.is_none() {
906 return Err(SteamError::NotLoggedOn);
907 }
908
909 let session = self.login_session.as_mut().ok_or_else(|| SteamError::Other("No login session available. Web session requires refresh token login.".to_string()))?;
910
911 let cookies = session.get_web_cookies().await.map_err(|e| SteamError::Other(format!("Failed to get web cookies: {}", e)))?;
912
913 let session_id = cookies.iter().find(|c| c.starts_with("sessionid=")).and_then(|c| c.strip_prefix("sessionid=")).and_then(|c| c.split(';').next()).map(|s| s.to_string()).unwrap_or_else(|| {
915 format!("{:x}", self.rng.gen_u64())
917 });
918
919 self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::WebSession { session_id: session_id.clone(), cookies: cookies.clone() }));
921
922 Ok((session_id, cookies))
923 }
924
925 pub(crate) async fn send_binary_message(&mut self, msg_type: EMsg, body: &[u8]) -> Result<(), SteamError> {
927 use crate::protocol::{ExtendedMessageHeader, MessageHeader, SteamMessage};
928
929 let header = ExtendedMessageHeader {
930 header_size: 36,
931 header_version: 2,
932 target_job_id: u64::MAX,
933 source_job_id: u64::MAX,
934 header_canary: 239,
935 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
936 session_id: self.session_id,
937 };
938
939 let msg = SteamMessage {
940 msg: msg_type,
941 is_proto: false,
942 header: MessageHeader::Extended(header),
943 body: bytes::Bytes::copy_from_slice(body),
944 };
945
946 if let Some(ref mut conn) = self.connection {
947 conn.send(msg.encode()).await?;
948 }
949
950 Ok(())
951 }
952
953 pub(crate) async fn send_message<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<(), SteamError> {
955 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
956
957 let header = ProtobufMessageHeader {
958 header_length: 0,
959 session_id: self.session_id,
960 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
961 job_id_source: u64::MAX,
962 job_id_target: u64::MAX,
963 target_job_name: None,
964 routing_appid: None,
965 };
966
967 let msg = SteamMessage::new_proto(msg_type, header, body);
968
969 if let Some(ref mut conn) = self.connection {
970 conn.send(msg.encode()).await?;
971 }
972
973 Ok(())
974 }
975
976 pub(crate) async fn send_message_with_routing<T: Message>(&mut self, msg_type: EMsg, routing_appid: u32, body: &T) -> Result<(), SteamError> {
978 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
979
980 let header = ProtobufMessageHeader {
981 header_length: 0,
982 session_id: self.session_id,
983 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
984 job_id_source: u64::MAX,
985 job_id_target: u64::MAX,
986 target_job_name: None,
987 routing_appid: Some(routing_appid),
988 };
989
990 let msg = SteamMessage::new_proto(msg_type, header, body);
991
992 if let Some(ref mut conn) = self.connection {
993 conn.send(msg.encode()).await?;
994 }
995
996 Ok(())
997 }
998
999 pub(crate) async fn send_message_with_job<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<tokio::sync::oneshot::Receiver<crate::internal::jobs::JobResponse>, SteamError> {
1005 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1006
1007 let (job_id, response_rx) = self.job_manager.create_job().await;
1009 info!("[SteamClient] send_message_with_job: Created JobID={} for EMsg::{:?}", job_id, msg_type);
1010
1011 let header = ProtobufMessageHeader {
1012 header_length: 0,
1013 session_id: self.session_id,
1014 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1015 job_id_source: job_id,
1016 job_id_target: u64::MAX,
1017 target_job_name: None,
1018 routing_appid: None,
1019 };
1020
1021 let msg = SteamMessage::new_proto(msg_type, header, body);
1022
1023 if let Some(ref mut conn) = self.connection {
1024 conn.send(msg.encode()).await?;
1025 }
1026
1027 Ok(response_rx)
1028 }
1029
1030 async fn process_queued_message(&mut self, msg: QueuedMessage) {
1032 info!("[SteamClient] process_queued_message: Processing message to {}", msg.friend);
1033
1034 let processed_message = if msg.contains_bbcode { msg.message.replace('[', "\\[") } else { msg.message.clone() };
1036
1037 debug!("[SteamClient] process_queued_message: Original len={}, Processed len={}", msg.message.len(), processed_message.len());
1038
1039 let request = steam_protos::CFriendMessagesSendMessageRequest {
1040 steamid: Some(msg.friend.steam_id64()),
1041 chat_entry_type: Some(msg.entry_type as i32),
1042 message: Some(processed_message),
1043 contains_bbcode: Some(msg.contains_bbcode),
1044 ..Default::default()
1045 };
1046
1047 info!("[SteamClient] process_queued_message: Sending FriendMessages.SendMessage#1 to Steam");
1048
1049 match self.send_service_method_with_job("FriendMessages.SendMessage#1", &request).await {
1051 Ok(rx) => {
1052 info!("[SteamClient] process_queued_message: Request sent, waiting for response...");
1053 let friend_id = msg.friend;
1054 let original_message = msg.message.clone();
1055
1056 self.chat_tasks.spawn(
1058 async move {
1059 const CHAT_TIMEOUT_SECS: u64 = 30;
1060
1061 debug!("[SteamClient] chat_task: Waiting for response (timeout: {}s)", CHAT_TIMEOUT_SECS);
1062
1063 let res = match tokio::time::timeout(std::time::Duration::from_secs(CHAT_TIMEOUT_SECS), rx).await {
1064 Ok(Ok(crate::internal::jobs::JobResponse::Success(body))) => {
1065 info!("[SteamClient] chat_task: Received Success response, body len={}", body.len());
1066 steam_protos::CFriendMessagesSendMessageResponse::decode(&body[..])
1067 .map(|response| {
1068 info!("[SteamClient] chat_task: Decoded response - server_ts={}, ordinal={}", response.server_timestamp.unwrap_or(0), response.ordinal.unwrap_or(0));
1069 crate::services::chat::SendMessageResult {
1070 modified_message: response.modified_message.unwrap_or(original_message),
1071 server_timestamp: response.server_timestamp.unwrap_or(0),
1072 ordinal: response.ordinal.unwrap_or(0),
1073 }
1074 })
1075 .map_err(|e| {
1076 error!("[SteamClient] chat_task: Failed to decode response: {:?}", e);
1077 SteamError::DeserializationFailed
1078 })
1079 }
1080 Ok(Ok(crate::internal::jobs::JobResponse::Timeout)) => {
1081 error!("[SteamClient] chat_task: Job response timeout (Steam didn't respond)");
1082 Err(SteamError::ResponseTimeout)
1083 }
1084 Ok(Ok(crate::internal::jobs::JobResponse::Error(e))) => {
1085 error!("[SteamClient] chat_task: Job response error: {}", e);
1086 Err(SteamError::ProtocolError(e))
1087 }
1088 Ok(Err(e)) => {
1089 error!("[SteamClient] chat_task: Job channel closed: {:?}", e);
1090 Err(SteamError::Other("Job response channel closed".into()))
1091 }
1092 Err(_) => {
1093 error!("[SteamClient] chat_task: Tokio timeout elapsed after {}s", CHAT_TIMEOUT_SECS);
1095 Err(SteamError::ResponseTimeout)
1096 }
1097 };
1098
1099 match &res {
1100 Ok(r) => info!("[SteamClient] chat_task: Final result SUCCESS - ts={}", r.server_timestamp),
1101 Err(e) => error!("[SteamClient] chat_task: Final result ERROR - {:?}", e),
1102 }
1103
1104 (msg, res)
1105 }
1106 .instrument(info_span!("chat_send", friend = %friend_id)),
1107 );
1108 }
1109 Err(e) => {
1110 error!("[SteamClient] process_queued_message: Failed to send request to Steam: {:?}", e);
1112 let _ = msg.respond_to.send(Err(e));
1113 }
1114 }
1115 }
1116
1117 pub async fn send_unified_request_and_wait<T, R>(&mut self, method: &str, request: &T) -> Result<R, SteamError>
1122 where
1123 T: prost::Message,
1124 R: prost::Message + Default,
1125 {
1126 use crate::internal::jobs::JobResponse;
1127
1128 let response_rx = self.send_service_method_with_job(method, request).await?;
1130
1131 match response_rx.await {
1133 Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1134 Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1135 Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1136 Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1137 }
1138 }
1139
1140 pub async fn send_request_and_wait<T, R>(&mut self, emsg: EMsg, request: &T) -> Result<R, SteamError>
1145 where
1146 T: prost::Message,
1147 R: prost::Message + Default,
1148 {
1149 use crate::internal::jobs::JobResponse;
1150
1151 let response_rx = self.send_message_with_job(emsg, request).await?;
1153
1154 match response_rx.await {
1156 Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1157 Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1158 Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1159 Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1160 }
1161 }
1162
1163 pub async fn poll_event(&mut self) -> Result<Option<super::events::SteamEvent>, SteamError> {
1194 use super::events::{ConnectionEvent, SteamEvent};
1195
1196 if let Some(mut event) = self.event_queue.pop_front() {
1198 self.handle_event(&mut event);
1199 self.post_process_event(&event).await;
1200 return Ok(Some(event));
1201 }
1202
1203 self.job_manager.cleanup_expired().await;
1205
1206 if self.reconnect_manager.is_reconnecting() {
1208 if let Some(attempt) = self.reconnect_manager.check_ready() {
1209 let delay = self.reconnect_manager.current_delay();
1210 let max_attempts = self.reconnect_manager.max_attempts();
1211
1212 self.event_queue.push_back(SteamEvent::Connection(ConnectionEvent::ReconnectAttempt { attempt, max_attempts, delay }));
1214
1215 match self.attempt_reconnect().await {
1217 Ok(()) => {
1218 self.reconnect_manager.record_success();
1220 info!("Reconnection successful");
1221
1222 if let Err(e) = self.restore_session_state().await {
1224 warn!("Failed to restore session state: {}", e);
1225 }
1226 }
1227 Err(e) => {
1228 warn!("Reconnection attempt {} failed: {:?}", attempt, e);
1230
1231 let server = self.last_cm_server.take();
1232 let should_continue = self.reconnect_manager.record_failure(server.as_ref());
1233
1234 if !should_continue {
1235 let reason = self.reconnect_manager.last_disconnect_reason();
1237 let attempts = self.reconnect_manager.attempt();
1238
1239 return Ok(Some(SteamEvent::Connection(ConnectionEvent::ReconnectFailed { reason, attempts })));
1240 }
1241 }
1242 }
1243 } else if let Some(wait_time) = self.reconnect_manager.time_until_next_attempt() {
1244 let sleep_time = wait_time.min(Duration::from_millis(100));
1247 self.clock.sleep(sleep_time).await;
1248 }
1249 }
1250
1251 if let Some(mut event) = self.event_queue.pop_front() {
1253 self.handle_event(&mut event);
1254 self.post_process_event(&event).await;
1255 return Ok(Some(event));
1256 }
1257
1258 loop {
1259 if let Some(msg) = self.pending_retry_message.take() {
1262 let rate_limited = self.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1263
1264 if !rate_limited {
1265 self.process_queued_message(msg).await;
1266 } else {
1268 self.pending_retry_message = Some(msg);
1270 }
1271 }
1272
1273 let rate_limited = self.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1275 let can_process_queue = !rate_limited && self.pending_retry_message.is_none();
1276
1277 if self.connection.is_none() {
1279 return Ok(None);
1280 }
1281
1282 let now = self.clock.now();
1284 let hb_timeout = self.heartbeat_manager.time_until_next_heartbeat(now);
1285
1286 if let Some(timeout) = hb_timeout {
1288 if timeout == Duration::ZERO {
1289 debug!("Sending heartbeat");
1290 let msg = crate::internal::heartbeat::HeartbeatManager::build_heartbeat_message(self.session_id, self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0));
1291
1292 let result = {
1294 if let Some(conn) = self.connection.as_mut() {
1295 conn.send(msg.encode()).await
1296 } else {
1297 Err(SteamError::NotConnected)
1298 }
1299 };
1300
1301 if let Err(e) = result {
1302 warn!("Failed to send heartbeat: {}", e);
1303 let reason = match &e {
1305 SteamError::SteamResult(r) => Some(*r),
1306 _ => Some(EResult::NoConnection),
1307 };
1308 return self.handle_connection_close(reason);
1309 }
1310
1311 self.heartbeat_manager.record_heartbeat(self.clock.now());
1312 continue;
1313 }
1314 }
1315
1316 enum PollResult {
1317 Packet(Option<bytes::Bytes>),
1318 Queue(QueuedMessage),
1319 ChatResponse(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>),
1320 BackgroundResult(u64, Result<Bytes, SteamError>),
1321 Timeout,
1322 }
1323
1324 let poll_res: Result<PollResult, SteamError> = {
1325 let conn = self.connection.as_mut().unwrap();
1326 let queue = &mut self.chat_queue_rx;
1327
1328 let recv_fut = conn.recv();
1329
1330 let rate_limit_timeout = self.rate_limit_until.map(|t| {
1336 let now = self.clock.now();
1337 if t > now {
1338 t - now
1339 } else {
1340 Duration::ZERO
1341 }
1342 });
1343
1344 let sleep_duration = match (hb_timeout, rate_limit_timeout) {
1345 (Some(hb), Some(rl)) => Some(hb.min(rl)),
1346 (Some(hb), None) => Some(hb),
1347 (None, Some(rl)) => Some(rl),
1348 (None, None) => None,
1349 };
1350
1351 tokio::select! {
1352 res = recv_fut => {
1353 Ok(PollResult::Packet(res?))
1354 }
1355 Some(msg) = queue.recv(), if can_process_queue => {
1356 Ok(PollResult::Queue(msg))
1357 }
1358 Some(join_result) = self.chat_tasks.join_next() => {
1359 match join_result {
1360 Ok((msg, res)) => Ok(PollResult::ChatResponse(msg, res)),
1361 Err(e) => {
1362 warn!("Chat task failed: {:?}", e);
1364 Ok(PollResult::Timeout) }
1366 }
1367 }
1368 res = self.background_job_results_rx.recv() => {
1369 match res {
1370 Some((job_id, result)) => Ok(PollResult::BackgroundResult(job_id, result)),
1371 None => Ok(PollResult::Timeout),
1372 }
1373 }
1374 _ = async {
1375 if let Some(d) = sleep_duration {
1376 tokio::time::sleep(d).await;
1377 } else {
1378 std::future::pending::<()>().await;
1379 }
1380 } => {
1381 Ok(PollResult::Timeout)
1382 }
1383 }
1384 };
1385
1386 match poll_res {
1387 Ok(PollResult::Packet(Some(data))) => {
1388 let messages = super::events::MessageHandler::decode_packet(&data);
1390
1391 for decoded in messages {
1392 if let Some(job_id) = decoded.job_id_target {
1394 info!("[SteamClient] poll_event: Completing job {} via job_manager", job_id);
1395 self.job_manager.complete_job_success(job_id, decoded.body).await;
1396 }
1397
1398 for event in &decoded.events {
1400 if let super::events::SteamEvent::Apps(super::events::AppsEvent::GCReceived(gc_msg)) = event {
1401 self.gc_jobs.try_complete(gc_msg.appid, gc_msg.msg_type, gc_msg.payload.clone());
1402 }
1403 }
1404
1405 self.event_queue.extend(decoded.events);
1407 }
1408
1409 if let Some(mut event) = self.event_queue.pop_front() {
1411 self.handle_event(&mut event);
1412 self.post_process_event(&event).await;
1413 return Ok(Some(event));
1414 }
1415 }
1416 Ok(PollResult::Packet(None)) => {
1417 return self.handle_connection_close(None);
1419 }
1420 Ok(PollResult::Queue(msg)) => {
1421 info!("[SteamClient] poll_event: Dequeued message from chat_queue_rx for {}", msg.friend);
1422 self.process_queued_message(msg).await;
1423 }
1425 Ok(PollResult::ChatResponse(msg, result)) => {
1426 info!("[SteamClient] poll_event: Received ChatResponse for {}", msg.friend);
1427 match result {
1428 Ok(res) => {
1429 info!("[SteamClient] poll_event: ChatResponse SUCCESS - server_ts={}", res.server_timestamp);
1430 let _ = msg.respond_to.send(Ok(res));
1431 }
1432 Err(SteamError::SteamResult(steam_enums::EResult::RateLimitExceeded)) => {
1433 warn!("[SteamClient] poll_event: Rate limit exceeded for chat message, backing off for 60 seconds");
1434 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "RateLimitExceeded");
1435 self.rate_limit_until = Some(self.clock.now() + std::time::Duration::from_secs(60));
1436 self.pending_retry_message = Some(msg);
1437 }
1438 Err(e) => {
1439 error!("[SteamClient] poll_event: ChatResponse ERROR - {:?}", e);
1440 let _ = msg.respond_to.send(Err(e));
1441 }
1442 }
1443 }
1444 Ok(PollResult::BackgroundResult(job_id, result)) => {
1445 debug!("[SteamClient] poll_event: Received BackgroundResult for job {}", job_id);
1446 if let Some(task) = self.background_job_tasks.remove(&job_id) {
1447 match task {
1448 BackgroundTask::FriendsList => match result {
1449 Ok(body) => {
1450 debug!("[SteamClient] poll_event: Decoding FriendsList response (len={})", body.len());
1451 match steam_protos::CFriendsListGetFriendsListResponse::decode(&body[..]) {
1452 Ok(response) => {
1453 debug!("[SteamClient] poll_event: Successfully decoded FriendsList, processing...");
1454 self.handle_friends_list_unified_response(response).await;
1455 }
1456 Err(e) => {
1457 error!("[SteamClient] poll_event: Failed to decode FriendsList response: {:?}", e);
1458 }
1459 }
1460 }
1461 Err(e) => {
1462 error!("[SteamClient] poll_event: Background FriendsList job failed: {:?}", e);
1463 }
1464 },
1465 BackgroundTask::OfflineMessages(friend_id) => match result {
1466 Ok(body) => {
1467 debug!("[SteamClient] poll_event: Decoding OfflineMessages response for {} (len={})", friend_id, body.len());
1468 match steam_protos::CFriendMessagesGetRecentMessagesResponse::decode(&body[..]) {
1469 Ok(response) => {
1470 debug!("[SteamClient] poll_event: Successfully decoded OfflineMessages, processing...");
1471 let last_view_timestamp = self.chat_last_view.get(&friend_id).copied().unwrap_or(0);
1472
1473 let messages = response
1474 .messages
1475 .into_iter()
1476 .map(|m| crate::services::chat::HistoryMessage {
1477 sender: SteamID::from_steam_id64(m.accountid.unwrap_or(0) as u64),
1478 timestamp: m.timestamp.unwrap_or(0),
1479 ordinal: m.ordinal.unwrap_or(0),
1480 message: m.message.unwrap_or_default(),
1481 unread: m.timestamp.unwrap_or(0) > last_view_timestamp,
1482 })
1483 .collect();
1484
1485 self.event_queue.push_back(crate::client::events::SteamEvent::Chat(crate::client::events::ChatEvent::OfflineMessagesFetched { friend_id, messages }));
1486 }
1487 Err(e) => {
1488 error!("[SteamClient] poll_event: Failed to decode OfflineMessages response: {:?}", e);
1489 }
1490 }
1491 }
1492 Err(e) => {
1493 error!("[SteamClient] poll_event: Background OfflineMessages job failed for {}: {:?}", friend_id, e);
1494 }
1495 },
1496 }
1497
1498 if let Some(mut event) = self.event_queue.pop_front() {
1501 self.handle_event(&mut event);
1502 self.post_process_event(&event).await;
1503 return Ok(Some(event));
1504 }
1505 } else {
1506 warn!("[SteamClient] poll_event: Received BackgroundResult for unknown job {}", job_id);
1507 }
1508 }
1509 Ok(PollResult::Timeout) => {
1510 }
1512 Err(e) => {
1513 let reason = match &e {
1515 SteamError::SteamResult(r) => Some(*r),
1516 _ => Some(EResult::NoConnection),
1517 };
1518 return self.handle_connection_close(reason);
1519 }
1520 }
1521 }
1522 }
1523
1524 fn handle_connection_close(&mut self, reason: Option<EResult>) -> Result<Option<super::events::SteamEvent>, SteamError> {
1526 use super::events::{ConnectionEvent, SteamEvent};
1527
1528 debug!("Handling connection close, reason: {:?}", reason);
1529
1530 self.connection = None;
1532 self.connecting = false;
1533
1534 let should_reconnect = !self.logging_off && self.options.auto_relogin && reason.map(|r| self.reconnect_manager.should_reconnect(r)).unwrap_or(true);
1536
1537 if should_reconnect && self.logon_details.is_some() {
1538 let eresult = reason.unwrap_or(EResult::NoConnection);
1540 self.reconnect_manager.start_reconnection(eresult);
1541
1542 if let Some(ref server) = self.last_cm_server {
1544 self.reconnect_manager.blacklist_server(&server.endpoint);
1545 }
1546
1547 info!("Connection lost, will attempt reconnection");
1548 } else {
1549 self.steam_id = None;
1551 self.reconnect_manager.reset();
1552 self.relogging = false;
1553 self.logging_off = false;
1554 }
1555
1556 Ok(Some(SteamEvent::Connection(ConnectionEvent::Disconnected { reason, will_reconnect: should_reconnect })))
1557 }
1558
1559 async fn restore_session_state(&mut self) -> Result<(), SteamError> {
1562 info!("Restoring session state...");
1563
1564 let app_ids = self.session_recovery.last_playing_app_ids.clone();
1566 if !app_ids.is_empty() {
1567 debug!("Restoring games played: {:?}", app_ids);
1568 self.games_played_with_extra(app_ids, self.session_recovery.last_custom_game_name.clone()).await?;
1569 }
1570
1571 if let Some(state) = self.session_recovery.last_persona_state {
1573 debug!("Restoring persona state: {:?}", state);
1574 self.set_persona(state, self.session_recovery.last_player_name.clone()).await?;
1575 }
1576
1577 let rp_data = self.session_recovery.last_rich_presence.clone();
1579 for (app_id, data) in rp_data {
1580 debug!("Restoring rich presence for app {}", app_id);
1581 self.upload_rich_presence(app_id, &data).await?;
1582 }
1583
1584 Ok(())
1585 }
1586
1587 async fn attempt_reconnect(&mut self) -> Result<(), SteamError> {
1589 let details = self.logon_details.clone().ok_or_else(|| SteamError::Other("No logon details available for reconnection".to_string()))?;
1591
1592 debug!("Attempting reconnection with stored credentials");
1593
1594 let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
1596 let server = provider.get_server().await?;
1597 self.last_cm_server = Some(server.clone());
1598
1599 info!("Reconnecting to CM server: {}", server.endpoint);
1600
1601 let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
1603 debug!("WebSocket connection established");
1604
1605 self.session_id = self.rng.gen_i32().abs();
1607
1608 if details.refresh_token.is_none() {
1610 let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
1611 let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
1612 connection.send(hello_msg.encode()).await?;
1613 }
1614
1615 let is_anonymous = details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none());
1617
1618 let logon = self.build_logon_message(&details, is_anonymous);
1620 let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
1621 connection.send(logon_msg.encode()).await?;
1622
1623 let response = self.wait_for_logon_response(&mut *connection).await?;
1625
1626 let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
1628
1629 if eresult != EResult::OK {
1630 return Err(SteamError::SteamResult(eresult));
1631 }
1632
1633 self.connection = Some(connection);
1635 self.connecting = false;
1636
1637 if let Some(sid) = response.client_supplied_steamid {
1639 self.steam_id = Some(SteamID::from(sid));
1640 }
1641
1642 self.cell_id = response.cell_id;
1643 self.vanity_url = response.vanity_url.clone();
1644
1645 if let Some(ref ip) = response.public_ip {
1646 if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
1647 self.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
1648 }
1649 }
1650
1651 info!("Successfully reconnected");
1652 Ok(())
1653 }
1654
1655 pub async fn poll_events(&mut self) -> Result<Vec<super::events::SteamEvent>, SteamError> {
1660 let mut all_events = Vec::new();
1661
1662 while let Some(event) = self.poll_event().await? {
1663 all_events.push(event);
1664
1665 if self.event_queue.is_empty() {
1667 break;
1668 }
1669 }
1670
1671 Ok(all_events)
1672 }
1673
1674 fn handle_event(&mut self, event: &mut super::events::SteamEvent) {
1676 use super::events::{AppsEvent, AuthEvent, ConnectionEvent, FriendsEvent, SteamEvent};
1677
1678 match event {
1679 SteamEvent::Auth(AuthEvent::LoggedOn { steam_id }) => {
1680 self.steam_id = Some(*steam_id);
1681 self.connecting = false;
1682 self.reconnect_manager.record_success();
1684 }
1685 SteamEvent::Auth(AuthEvent::LoggedOff { .. }) => {
1686 self.steam_id = None;
1687 }
1688 SteamEvent::Auth(AuthEvent::GameConnectTokens { tokens }) => {
1689 self.gc_tokens.extend(tokens.clone());
1690 }
1691 SteamEvent::Friends(FriendsEvent::FriendsList { friends, .. }) => {
1692 for friend in friends {
1693 if friend.relationship == steam_enums::EFriendRelationship::None {
1694 self.my_friends.remove(&friend.steam_id);
1695 } else {
1696 self.my_friends.insert(friend.steam_id, friend.relationship as u32);
1697 }
1698 }
1699 }
1700 SteamEvent::Friends(FriendsEvent::PersonaState(persona)) => {
1701 let mut persona = *persona.clone();
1702 if let Some(game_id) = persona.game_id {
1703 let app_id = game_id as u32;
1704 if app_id != 0 {
1705 if !self.apps.contains_key(&app_id) {
1707 self.pending_apps.insert(app_id);
1708 } else if persona.game_name.is_none() || persona.game_name.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
1709 if let Some(name) = self.get_app_name(app_id) {
1711 persona.game_name = Some(name);
1712 }
1713 }
1714 }
1715 }
1716
1717 self.users.entry(persona.steam_id).and_modify(|existing| existing.merge(&persona)).or_insert_with(|| persona.clone());
1719
1720 self.persona_cache.insert(persona.steam_id, persona);
1722 }
1723 SteamEvent::Apps(AppsEvent::PlayingState { playing_app, blocked }) => {
1724 self.playing_blocked = *blocked;
1725 if *playing_app != 0 {
1726 if !self.apps.contains_key(playing_app) {
1727 self.pending_apps.insert(*playing_app);
1728 }
1729 self.apps.entry(*playing_app).or_insert_with(|| super::events::AppInfoData { app_id: *playing_app, change_number: 0, missing_token: false, app_info: None });
1730 }
1731 }
1732 SteamEvent::Apps(AppsEvent::ProductInfoResponse { apps, .. }) => {
1733 for (app_id, data) in apps {
1734 self.apps.insert(*app_id, data.clone());
1735 self.pending_apps.remove(app_id);
1736 }
1737 }
1738 SteamEvent::Connection(ConnectionEvent::Disconnected { will_reconnect, .. }) => {
1739 if !*will_reconnect {
1741 self.steam_id = None;
1742 }
1743 self.connection = None;
1744 self.connecting = false;
1745 }
1746 SteamEvent::Connection(ConnectionEvent::ReconnectFailed { .. }) => {
1747 self.steam_id = None;
1749 self.connection = None;
1750 self.connecting = false;
1751 self.reconnect_manager.reset();
1752 }
1753 SteamEvent::Apps(super::events::AppsEvent::LicenseList { licenses }) => {
1754 self.licenses = licenses.clone();
1755 }
1756 _ => {}
1757 }
1758 }
1759
1760 async fn post_process_event(&mut self, event: &super::events::SteamEvent) {
1763 use super::events::{FriendsEvent, SteamEvent};
1764
1765 if let SteamEvent::Friends(FriendsEvent::FriendsList { incremental: false, .. }) = event {
1766 let friends: Vec<SteamID> = self.my_friends.iter().filter(|&(_, &rel)| rel == steam_enums::EFriendRelationship::Friend as u32).map(|(&id, _)| id).collect();
1767
1768 if !friends.is_empty() {
1769 debug!("Auto-fetching personas for {} friends", friends.len());
1770 if let Err(e) = self.get_personas(friends).await {
1771 warn!("Failed to auto-fetch personas: {}", e);
1772 }
1773 }
1774 }
1775 }
1776
1777 pub fn format_rich_presence(&self, steam_id: SteamID, language: &str) -> Option<String> {
1783 let user = self.users.get(&steam_id)?;
1784 let app_id = user.game_id? as u32;
1785 let app_info = self.apps.get(&app_id)?;
1786 let app_vdf = app_info.app_info.as_ref()?;
1787
1788 let localization = app_vdf.get("common").and_then(|c| c.get("rich_presence")).and_then(|rp| rp.get("localization")).and_then(|loc| loc.get(language))?;
1790
1791 let display_token = user.rich_presence.get("steam_display")?;
1793 let mut formatted = localization.get_str(display_token)?.to_string();
1794
1795 for (key, value) in &user.rich_presence {
1797 if key == "steam_display" {
1798 continue;
1799 }
1800 let placeholder = format!("%{}%", key);
1801 let localized_value = localization.get_str(value).unwrap_or(value);
1803 formatted = formatted.replace(&placeholder, localized_value);
1804 }
1805
1806 Some(formatted)
1807 }
1808
1809 pub fn get_app_name(&self, app_id: u32) -> Option<String> {
1811 if let Ok(idx) = crate::client::static_app_list::APP_LIST.binary_search_by_key(&app_id, |&(id, _)| id) {
1813 return Some(crate::client::static_app_list::APP_LIST[idx].1.to_string());
1814 }
1815
1816 let info = self.apps.get(&app_id)?;
1818 if let Some(vdf) = &info.app_info {
1819 vdf.get("common").and_then(|c| c.get_str("name")).or_else(|| vdf.get_str("name")).map(|s| s.to_string())
1820 } else {
1821 None
1822 }
1823 }
1824
1825 pub async fn fetch_pending_app_info(&mut self) -> Result<(), SteamError> {
1830 if self.pending_apps.is_empty() {
1831 return Ok(());
1832 }
1833
1834 let app_ids: Vec<u32> = self.pending_apps.iter().copied().collect();
1835 self.pending_apps.clear();
1837 self.get_product_info(app_ids).await
1838 }
1839
1840 pub async fn update_friend_sessions(&mut self) -> Result<(), SteamError> {
1845 let sessions = self.get_active_friend_sessions().await?;
1846
1847 for session in sessions {
1848 self.users
1849 .entry(session.friend)
1850 .and_modify(|user| {
1851 user.unread_count = session.unread_count;
1852 user.last_message_time = session.time_last_message;
1853 })
1854 .or_insert_with(|| UserPersona {
1855 steam_id: session.friend,
1856 unread_count: session.unread_count,
1857 last_message_time: session.time_last_message,
1858 ..Default::default()
1859 });
1860 }
1861
1862 Ok(())
1863 }
1864
1865 pub async fn relog(&mut self) -> Result<(), SteamError> {
1874 if self.steam_id.is_none() {
1875 return Err(SteamError::NotLoggedOn);
1876 }
1877
1878 if self.logon_details.is_none() {
1879 return Err(SteamError::Other("No stored credentials for relog".to_string()));
1880 }
1881
1882 info!("Initiating manual relog");
1883 self.relogging = true;
1884
1885 if let Some(conn) = self.connection.take() {
1887 let _ = conn.close().await;
1888 }
1889
1890 self.reconnect_manager.start_reconnection(EResult::NoConnection);
1892
1893 Ok(())
1894 }
1895
1896 pub fn cancel_reconnect(&mut self) {
1898 self.reconnect_manager.reset();
1899 self.relogging = false;
1900 }
1901
1902 pub fn is_reconnecting(&self) -> bool {
1904 self.reconnect_manager.is_reconnecting()
1905 }
1906
1907 pub fn reconnect_state(&self) -> crate::internal::reconnect::ReconnectState {
1909 self.reconnect_manager.state()
1910 }
1911
1912 pub fn try_poll_event(&mut self) -> Option<super::events::SteamEvent> {
1933 if let Some(mut event) = self.event_queue.pop_front() {
1934 self.handle_event(&mut event);
1935 Some(event)
1936 } else {
1937 None
1938 }
1939 }
1940
1941 pub async fn poll_event_timeout(&mut self, timeout: std::time::Duration) -> Result<Option<super::events::SteamEvent>, SteamError> {
1961 match tokio::time::timeout(timeout, self.poll_event()).await {
1962 Ok(result) => result,
1963 Err(_) => Ok(None), }
1965 }
1966
1967 pub async fn wait_for<F>(&mut self, predicate: F) -> Result<super::events::SteamEvent, SteamError>
1988 where
1989 F: Fn(&super::events::SteamEvent) -> bool,
1990 {
1991 loop {
1992 if let Some(event) = self.poll_event().await? {
1993 if predicate(&event) {
1994 return Ok(event);
1995 }
1996 }
1997 }
1998 }
1999
2000 pub async fn wait_for_timeout<F>(&mut self, predicate: F, timeout: std::time::Duration) -> Result<Option<super::events::SteamEvent>, SteamError>
2026 where
2027 F: Fn(&super::events::SteamEvent) -> bool,
2028 {
2029 match tokio::time::timeout(timeout, self.wait_for(predicate)).await {
2030 Ok(result) => result.map(Some),
2031 Err(_) => Ok(None), }
2033 }
2034
2035 pub fn drain_queued_events(&mut self) -> Vec<super::events::SteamEvent> {
2049 let mut events: Vec<_> = self.event_queue.drain(..).collect();
2050 for event in &mut events {
2051 self.handle_event(event);
2052 }
2053 events
2054 }
2055
2056 pub fn csgo(&mut self) -> crate::services::CSGOClient<'_> {
2058 crate::services::CSGOClient::new(self)
2059 }
2060
2061 pub(crate) async fn send_service_method_and_wait<Req: prost::Message, Resp: prost::Message + Default>(&mut self, method: &str, body: &Req) -> Result<Resp, SteamError> {
2063 let rx = self.send_service_method_with_job(method, body).await?;
2064
2065 let job_response = rx.await.map_err(|_| SteamError::ResponseTimeout)?;
2067
2068 let body = match job_response {
2069 crate::internal::jobs::JobResponse::Success(bytes) => bytes,
2070 crate::internal::jobs::JobResponse::Timeout => return Err(SteamError::ResponseTimeout),
2071 crate::internal::jobs::JobResponse::Error(msg) => return Err(SteamError::ProtocolError(msg)),
2072 };
2073
2074 Resp::decode(&body[..]).map_err(|_| SteamError::DeserializationFailed)
2075 }
2076}
2077
2078use std::task::{Context, Poll};
2083
2084use futures::Stream;
2085
2086pub struct SteamEventStream<'a> {
2118 client: &'a mut SteamClient,
2119}
2120
2121impl SteamClient {
2122 pub(crate) async fn send_service_method_background<T: prost::Message>(&mut self, method: &str, body: &T, task: BackgroundTask) -> Result<(), SteamError> {
2146 let (job_id, rx) = self.job_manager.create_job().await;
2147
2148 self.send_service_method_with_job_id(method, body, job_id).await?;
2149
2150 self.background_job_tasks.insert(job_id, task);
2151
2152 let tx = self.background_job_results_tx.clone();
2153 tokio::spawn(async move {
2154 match rx.await {
2155 Ok(crate::internal::jobs::JobResponse::Success(bytes)) => {
2156 let _ = tx.send((job_id, Ok(bytes))).await;
2157 }
2158 Ok(crate::internal::jobs::JobResponse::Timeout) => {
2159 let _ = tx.send((job_id, Err(SteamError::Timeout))).await;
2160 }
2161 Ok(crate::internal::jobs::JobResponse::Error(e)) => {
2162 let _ = tx.send((job_id, Err(SteamError::Other(e)))).await;
2163 }
2164 Err(_) => {
2165 }
2167 }
2168 });
2169
2170 Ok(())
2171 }
2172
2173 async fn send_service_method_with_job_id<T: prost::Message>(&mut self, method: &str, body: &T, job_id: u64) -> Result<(), SteamError> {
2175 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
2176
2177 let header = ProtobufMessageHeader {
2178 header_length: 0,
2179 session_id: self.session_id,
2180 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
2181 job_id_source: job_id,
2182 job_id_target: u64::MAX,
2183 target_job_name: Some(method.to_string()),
2184 routing_appid: None,
2185 };
2186
2187 let msg = SteamMessage::new_proto(steam_enums::EMsg::ServiceMethodCallFromClient, header, body);
2188
2189 if let Some(ref mut conn) = self.connection {
2190 conn.send(msg.encode()).await?;
2191 } else {
2192 return Err(SteamError::NotConnected);
2193 }
2194
2195 Ok(())
2196 }
2197
2198 pub fn into_stream(&mut self) -> SteamEventStream<'_> {
2199 SteamEventStream { client: self }
2200 }
2201}
2202
2203impl<'a> Stream for SteamEventStream<'a> {
2204 type Item = Result<super::events::SteamEvent, SteamError>;
2205
2206 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2207 if let Some(mut event) = self.client.event_queue.pop_front() {
2210 self.client.handle_event(&mut event);
2211 Poll::Ready(Some(Ok(event)))
2212 } else {
2213 Poll::Ready(None)
2215 }
2216 }
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221 use super::*;
2222
2223 fn test_steam_id() -> SteamID {
2224 SteamID::from_steam_id64(76561198000000000)
2225 }
2226
2227 #[test]
2228 fn test_user_persona_merge_rich_presence() {
2229 let mut persona1 = UserPersona {
2230 steam_id: test_steam_id(),
2231 rich_presence: {
2232 let mut map = HashMap::new();
2233 map.insert("status".to_string(), "Online".to_string());
2234 map.insert("game".to_string(), "CS:GO".to_string());
2235 map
2236 },
2237 ..Default::default()
2238 };
2239
2240 let persona2 = UserPersona {
2241 steam_id: test_steam_id(),
2242 rich_presence: {
2243 let mut map = HashMap::new();
2244 map.insert("status".to_string(), "In-Game".to_string());
2245 map.insert("party".to_string(), "Open".to_string());
2246 map
2247 },
2248 ..Default::default()
2249 };
2250
2251 persona1.merge(&persona2);
2252
2253 assert_eq!(persona1.rich_presence.get("status"), Some(&"In-Game".to_string()));
2254 assert_eq!(persona1.rich_presence.get("game"), Some(&"CS:GO".to_string()));
2255 assert_eq!(persona1.rich_presence.get("party"), Some(&"Open".to_string()));
2256 }
2257
2258 #[test]
2259 fn test_user_persona_merge_unread_count() {
2260 let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2261
2262 let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 10, last_message_time: 200, ..Default::default() };
2263
2264 persona1.merge(&persona2);
2265
2266 assert_eq!(persona1.unread_count, 10);
2267 assert_eq!(persona1.last_message_time, 200);
2268 }
2269
2270 #[test]
2271 fn test_user_persona_merge_unread_count_zero() {
2272 let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2273
2274 let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 0, last_message_time: 0, ..Default::default() };
2276
2277 persona1.merge(&persona2);
2278
2279 assert_eq!(persona1.unread_count, 5);
2280 assert_eq!(persona1.last_message_time, 100);
2281 }
2282}