1mod polling;
4mod reconnect;
5mod state;
6
7use std::{
8 collections::{HashMap, VecDeque},
9 pin::Pin,
10 sync::Arc,
11 time::Duration,
12};
13
14use chrono::{DateTime, Utc};
15use prost::Message;
16use steam_auth::{CredentialsDetails, EAuthSessionGuardType, EAuthTokenPlatformType, LoginSession, LoginSessionOptions};
17use steam_enums::{EMsg, EPersonaState, EResult};
18use steam_protos::{CMsgClientHello, CMsgClientLogOff, CMsgClientLogon, CMsgClientLogonResponse, PROTOCOL_VERSION};
19use steamid::SteamID;
20use tracing::{debug, error, info, info_span, warn, Instrument};
21
22use crate::{
23 connection::{CmServer, CmServerProvider, HttpCmServerProvider, SteamConnection, WebSocketConnection},
24 error::SteamError,
25 options::SteamOptions,
26 protocol::SteamMessage,
27 types::{AccountInfo, EmailInfo, Limitations, LogOnDetails, LogOnResponse, VacStatus, WalletInfo},
28};
29
30#[derive(Debug, Clone)]
32pub struct UserPersona {
33 pub steam_id: SteamID,
34 pub player_name: String,
35 pub persona_state: EPersonaState,
36 pub persona_state_flags: u32,
37 pub avatar_hash: Option<String>,
38 pub game_name: Option<String>,
39 pub game_id: Option<u64>,
40 pub last_logon: Option<DateTime<Utc>>,
41 pub last_logoff: Option<DateTime<Utc>>,
42 pub last_seen_online: Option<DateTime<Utc>>,
43 pub rich_presence: HashMap<String, String>,
44 pub steam_player_group: Option<String>,
46 pub rich_presence_status: Option<String>,
48 pub game_map: Option<String>,
50 pub game_score: Option<String>,
52 pub num_players: Option<u32>,
54 pub unread_count: u32,
55 pub last_message_time: u32,
56}
57
58impl Default for UserPersona {
59 fn default() -> Self {
60 Self {
61 steam_id: SteamID::default(),
62 player_name: String::new(),
63 persona_state: EPersonaState::Offline,
64 persona_state_flags: 0,
65 avatar_hash: None,
66 game_name: None,
67 game_id: None,
68 last_logon: None,
69 last_logoff: None,
70 last_seen_online: None,
71 rich_presence: HashMap::new(),
72 steam_player_group: None,
73 rich_presence_status: None,
74 game_map: None,
75 game_score: None,
76 num_players: None,
77 unread_count: 0,
78 last_message_time: 0,
79 }
80 }
81}
82
83impl UserPersona {
84 pub fn avatar_url(&self, size: &str) -> Option<String> {
88 let hash = self.avatar_hash.as_ref()?;
89 let size_suffix = match size {
90 "medium" => "_medium.jpg",
91 "full" => "_full.jpg",
92 _ => ".jpg",
93 };
94 Some(format!("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{}/{}{}", &hash[0..2], hash, size_suffix))
95 }
96
97 pub fn merge(&mut self, other: &UserPersona) {
100 if !other.player_name.is_empty() {
101 self.player_name = other.player_name.clone();
102 }
103 self.persona_state = other.persona_state;
104 if other.persona_state_flags != 0 {
105 self.persona_state_flags = other.persona_state_flags;
106 }
107 if other.avatar_hash.is_some() {
108 self.avatar_hash = other.avatar_hash.clone();
109 }
110 if other.game_name.is_some() {
111 self.game_name = other.game_name.clone();
112 }
113 if other.game_id.is_some() && other.game_id != Some(0) {
114 self.game_id = other.game_id;
115 }
116 if other.last_logon.is_some() {
117 self.last_logon = other.last_logon;
118 }
119 if other.last_logoff.is_some() {
120 self.last_logoff = other.last_logoff;
121 }
122 if other.last_seen_online.is_some() {
123 self.last_seen_online = other.last_seen_online;
124 }
125 for (k, v) in &other.rich_presence {
126 self.rich_presence.insert(k.clone(), v.clone());
127 }
128 if other.steam_player_group.is_some() {
129 self.steam_player_group = other.steam_player_group.clone();
130 }
131 if other.rich_presence_status.is_some() {
132 self.rich_presence_status = other.rich_presence_status.clone();
133 }
134 if other.game_map.is_some() {
135 self.game_map = other.game_map.clone();
136 }
137 if other.game_score.is_some() {
138 self.game_score = other.game_score.clone();
139 }
140 if other.unread_count > 0 {
143 self.unread_count = other.unread_count;
144 }
145 if other.last_message_time > 0 {
146 self.last_message_time = other.last_message_time;
147 }
148 }
149}
150
151use std::time::Instant;
152
153use bytes::Bytes;
154use tokio::sync::{mpsc, oneshot};
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub(crate) enum BackgroundTask {
159 FriendsList,
160 OfflineMessages(SteamID),
161}
162
163#[derive(Debug)]
165pub(crate) struct QueuedMessage {
166 pub friend: SteamID,
167 pub message: String,
168 pub entry_type: steam_enums::EChatEntryType,
169 pub contains_bbcode: bool,
170 pub respond_to: oneshot::Sender<Result<crate::services::chat::SendMessageResult, SteamError>>,
171}
172
173const LOGON_RESPONSE_TIMEOUT: Duration = Duration::from_secs(15);
177const HEARTBEAT_INTERVAL_SECONDS: u64 = 30;
179
180#[derive(Debug, Default, Clone)]
183pub(crate) struct AccountState {
184 pub(crate) public_ip: Option<String>,
185 pub(crate) cell_id: Option<u32>,
186 pub(crate) vanity_url: Option<String>,
187 pub(crate) info: Option<AccountInfo>,
188 pub(crate) email: Option<EmailInfo>,
189 pub(crate) limitations: Option<Limitations>,
190 pub(crate) vac: Option<VacStatus>,
191 pub(crate) wallet: Option<WalletInfo>,
192}
193
194#[derive(Debug, Default)]
196pub(crate) struct AppsState {
197 pub(crate) info: HashMap<u32, super::events::AppInfoData>,
199 pub(crate) licenses: Vec<super::events::LicenseEntry>,
201 pub(crate) pending: rustc_hash::FxHashSet<u32>,
203 pub(crate) playing_blocked: bool,
205 pub(crate) playing_app_ids: Vec<u32>,
207}
208
209#[derive(Debug, Default)]
211pub(crate) struct SocialState {
212 pub(crate) friends: HashMap<SteamID, u32>,
214 pub(crate) users: HashMap<SteamID, UserPersona>,
216 pub(crate) chat_last_view: HashMap<SteamID, u32>,
218 pub(crate) persona_cache: crate::cache::PersonaCache,
220}
221
222pub(crate) struct ChatQueueState {
227 pub(crate) tx: mpsc::UnboundedSender<QueuedMessage>,
228 pub(crate) rx: mpsc::UnboundedReceiver<QueuedMessage>,
229 pub(crate) rate_limit_until: Option<Instant>,
230 pub(crate) pending_retry_message: Option<QueuedMessage>,
231 pub(crate) tasks: tokio::task::JoinSet<(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>)>,
232}
233
234#[derive(Default)]
237pub(crate) struct AuthState {
238 pub(crate) login_session: Option<LoginSession>,
239 pub(crate) logon_details: Option<LogOnDetails>,
240 pub(crate) session_id: i32,
241 pub(crate) connecting: bool,
242 pub(crate) logging_off: bool,
243 pub(crate) relogging: bool,
244 pub(crate) temp_steam_id: Option<SteamID>,
245 pub(crate) auth_seq_me: u32,
246 pub(crate) auth_seq_them: u32,
247 pub(crate) last_cm_server: Option<CmServer>,
248}
249
250pub struct SteamClient {
256 pub options: SteamOptions,
258
259 pub(crate) steam_id: Option<SteamID>,
261
262 pub(crate) account: Arc<parking_lot::RwLock<AccountState>>,
268
269 pub(crate) apps: Arc<parking_lot::RwLock<AppsState>>,
277
278 pub(crate) social: Arc<parking_lot::RwLock<SocialState>>,
286
287 pub(crate) connect_time: u64,
289 pub(crate) connection_count: u32,
290
291 pub(crate) auth: Arc<parking_lot::RwLock<AuthState>>,
298
299 pub(crate) h_steam_pipe: u32,
300 pub(crate) gc_tokens: Vec<Vec<u8>>,
301 pub(crate) active_tickets: Vec<crate::services::appauth::AuthSessionTicket>,
302
303 pub(crate) last_time_party_register: Option<i64>,
306
307 pub(crate) chat: ChatQueueState,
309
310 pub(crate) connection: Option<Box<dyn SteamConnection>>,
312 pub(crate) event_queue: VecDeque<super::events::SteamEvent>,
313
314 pub(crate) reconnect_manager: crate::internal::reconnect::ReconnectManager,
316 pub(crate) heartbeat_manager: crate::internal::heartbeat::HeartbeatManager,
317
318 pub(crate) job_manager: crate::internal::jobs::JobManager,
320
321 pub(crate) gc_jobs: crate::services::gc::GCJobManager,
323
324 pub(crate) background_job_tasks: HashMap<u64, BackgroundTask>,
326
327 pub(crate) background_job_results_tx: mpsc::Sender<(u64, Result<Bytes, SteamError>)>,
329 pub(crate) background_job_results_rx: mpsc::Receiver<(u64, Result<Bytes, SteamError>)>,
330
331 pub(crate) session_recovery: crate::services::SessionRecovery,
333
334 pub(crate) http_client: Arc<dyn crate::utils::http::HttpClient>,
336 pub(crate) clock: Arc<dyn crate::utils::clock::Clock>,
337 pub(crate) rng: Arc<dyn crate::utils::rng::Rng>,
338}
339
340impl SteamClient {
341 pub fn builder() -> super::builder::SteamClientBuilder {
363 super::builder::SteamClientBuilder::new()
364 }
365
366 pub fn new(options: SteamOptions) -> Self {
368 Self::builder().with_options(options).build()
369 }
370
371 pub(crate) fn with_all_providers(options: SteamOptions, http_client: Arc<dyn crate::utils::http::HttpClient>, clock: Arc<dyn crate::utils::clock::Clock>, rng: Arc<dyn crate::utils::rng::Rng>) -> Self {
376 let reconnect_config = options.reconnect.clone();
377 let (chat_queue_tx, chat_queue_rx) = mpsc::unbounded_channel();
378 let (bg_tx, bg_rx) = mpsc::channel(100);
379
380 Self {
381 options,
382 steam_id: None,
383 account: Arc::new(parking_lot::RwLock::new(AccountState::default())),
384 apps: Arc::new(parking_lot::RwLock::new(AppsState::default())),
385 social: Arc::new(parking_lot::RwLock::new(SocialState::default())),
386 auth: Arc::new(parking_lot::RwLock::new(AuthState::default())),
387 connect_time: 0,
388 connection_count: 0,
389 h_steam_pipe: (rng.gen_u32() % 1000000) + 1,
390 gc_tokens: Vec::new(),
391 active_tickets: Vec::new(),
392 last_time_party_register: None,
393 chat: ChatQueueState {
394 tx: chat_queue_tx,
395 rx: chat_queue_rx,
396 rate_limit_until: None,
397 pending_retry_message: None,
398 tasks: tokio::task::JoinSet::new(),
399 },
400 connection: None,
401 event_queue: VecDeque::new(),
402 reconnect_manager: crate::internal::reconnect::ReconnectManager::new(reconnect_config),
403 heartbeat_manager: crate::internal::heartbeat::HeartbeatManager::new(HEARTBEAT_INTERVAL_SECONDS),
404 job_manager: crate::internal::jobs::JobManager::new(),
405 gc_jobs: crate::services::gc::GCJobManager::new(),
406 background_job_tasks: HashMap::new(),
407 background_job_results_tx: bg_tx,
408 background_job_results_rx: bg_rx,
409 session_recovery: crate::services::SessionRecovery::new(),
410 http_client,
411 clock,
412 rng,
413 }
414 }
415
416 pub async fn log_on(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
418 if self.steam_id.is_some() {
419 return Err(SteamError::AlreadyLoggedOn);
420 }
421
422 if self.auth.read().connecting {
423 return Err(SteamError::AlreadyConnecting);
424 }
425
426 self.auth.write().connecting = true;
427
428 match self.execute_logon(details).await {
429 Ok(response) => Ok(response),
430 Err(e) => {
431 match &e {
432 SteamError::SteamResult(steam_enums::EResult::AccountLoginDeniedThrottle) => {
433 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
434 }
435 SteamError::SteamResult(steam_enums::EResult::ServiceUnavailable) => {
436 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "ServiceUnavailable");
437 }
438 SteamError::SteamResult(steam_enums::EResult::TryAnotherCM) => {
439 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "TryAnotherCM");
440 }
441 _ => {}
442 }
443 self.auth.write().connecting = false;
444 Err(e)
445 }
446 }
447 }
448
449 async fn execute_logon(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
450 self.auth.write().logging_off = false;
451
452 info!("Starting Steam login...");
453 crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::CMConnection).await;
454
455 self.auth.write().logon_details = Some(details.clone());
457
458 let is_web_logon = details.web_logon_token.is_some() && details.steam_id.is_some();
460 let is_anonymous = !is_web_logon && details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none() && !is_web_logon);
461
462 let mut temp_sid = SteamID::new();
464 temp_sid.universe = steamid::Universe::Public;
465
466 if is_web_logon {
467 info!("Logging in with web logon token");
469 if let Some(sid) = details.steam_id {
470 temp_sid = sid;
471 }
472 temp_sid.account_type = steamid::AccountType::Individual;
473 } else if is_anonymous {
474 info!("Logging in anonymously");
475 temp_sid.account_type = steamid::AccountType::AnonUser;
476 } else if let Some(ref token) = details.refresh_token {
477 info!("Logging in with refresh token");
478 match LoginSession::from_refresh_token(token.clone()) {
480 Ok(session) => {
481 if let Some(sid) = session.steam_id().cloned() {
482 temp_sid = sid;
483 }
484 self.auth.write().login_session = Some(session);
485 }
486 Err(e) => {
487 return Err(SteamError::InvalidToken(e.to_string()));
488 }
489 }
490 } else if details.account_name.is_some() && details.password.is_some() {
491 info!("Logging in with account name and password");
492 temp_sid.account_type = steamid::AccountType::Individual;
493 }
494
495 self.auth.write().temp_steam_id = Some(temp_sid);
496
497 let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
499 let server = provider.get_server().await?;
500 info!("Connecting to CM server: {}", server.endpoint);
501 let start_time = Instant::now();
502
503 let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
505 debug!("WebSocket connection established in {}ms", start_time.elapsed().as_millis());
506
507 self.auth.write().session_id = (self.rng.gen_u32() & 0x7FFF_FFFF) as i32;
509
510 let skip_hello = details.refresh_token.is_some() || details.web_logon_token.is_some();
513 if !skip_hello {
514 let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
515 let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
516 connection.send(hello_msg.encode()).await?;
517 debug!("Sent ClientHello");
518 }
519
520 let logon = self.build_logon_message(&details, is_anonymous);
522 let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
523 connection.send(logon_msg.encode()).await?;
524 debug!("Sent ClientLogon");
525
526 let response = self.wait_for_logon_response(&mut *connection).await?;
528
529 self.connection = Some(connection);
530 self.auth.write().connecting = false;
531
532 let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
534
535 if eresult != EResult::OK {
536 self.steam_id = None;
537 return Err(SteamError::SteamResult(eresult));
538 }
539
540 let steam_id = if let Some(sid) = response.client_supplied_steamid {
542 SteamID::from(sid)
543 } else if let Some(sid) = self.auth.read().temp_steam_id {
544 sid
545 } else {
546 return Err(SteamError::bad_response("Logon response missing client_supplied_steamid"));
547 };
548
549 self.steam_id = Some(steam_id);
550 {
551 let mut account = self.account.write();
552 account.cell_id = response.cell_id;
553 account.vanity_url = response.vanity_url.clone();
554
555 if let Some(ref ip) = response.public_ip {
556 if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
557 account.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
558 }
559 }
560 }
561
562 if let Some(secs) = response.heartbeat_seconds {
564 if secs > 0 {
565 self.heartbeat_manager.set_interval(secs as u64);
566 }
567 }
568 self.heartbeat_manager.reset();
569
570 let username = details.account_name.as_deref().unwrap_or("Unknown");
571 info!("Logged on as steam_id: {}, username: {}", steam_id.steam_id64(), username);
572
573 Ok(LogOnResponse {
574 eresult,
575 steam_id,
576 public_ip: self.account.read().public_ip.clone(),
577 cell_id: self.account.read().cell_id.unwrap_or(0),
578 vanity_url: self.account.read().vanity_url.clone(),
579 email_domain: response.email_domain.clone(),
580 steam_guard_required: false,
581 heartbeat_seconds: response.heartbeat_seconds,
582 server_time: response.rtime32_server_time,
583 account_flags: response.account_flags,
584 user_country: response.user_country.clone(),
585 ip_country_code: response.ip_country_code.clone(),
586 client_instance_id: response.client_instance_id,
587 token_id: response.token_id,
588 family_group_id: response.family_group_id,
589 eresult_extended: response.eresult_extended,
590 cell_id_ping_threshold: response.cell_id_ping_threshold,
591 force_client_update_check: response.force_client_update_check,
592 agreement_session_url: response.agreement_session_url.clone(),
593 legacy_out_of_game_heartbeat_seconds: response.legacy_out_of_game_heartbeat_seconds,
594 parental_settings: response.parental_settings.clone(),
595 parental_setting_signature: response.parental_setting_signature.clone(),
596 count_loginfailures_to_migrate: response.count_loginfailures_to_migrate,
597 count_disconnects_to_migrate: response.count_disconnects_to_migrate,
598 ogs_data_report_time_window: response.ogs_data_report_time_window,
599 steam2_ticket: response.steam2_ticket.clone(),
600 })
601 }
602
603 fn build_logon_message(&self, details: &LogOnDetails, is_anonymous: bool) -> CMsgClientLogon {
605 use crate::protocol::messages::{build_client_logon, LogonConfig};
606
607 let machine_name = if details.machine_name.is_some() {
609 details.machine_name.clone()
610 } else if !is_anonymous {
611 Some(format!("DESKTOP-{:06}", self.rng.gen_u32() % 1000000))
612 } else {
613 None
614 };
615
616 let identifier = if let Some(ref name) = details.account_name { Some(name.clone()) } else { self.auth.read().temp_steam_id.as_ref().map(|sid| sid.steam_id64().to_string()) };
617
618 let config = LogonConfig::new().with_cell_id(self.account.read().cell_id).with_machine_name(machine_name).with_identifier(identifier);
620
621 build_client_logon(&config, details, is_anonymous)
623 }
624
625 async fn wait_for_logon_response(&self, connection: &mut dyn SteamConnection) -> Result<CMsgClientLogonResponse, SteamError> {
634 use std::io::Read;
635
636 use flate2::read::GzDecoder;
637
638 let work = async {
639 loop {
640 let data = match connection.recv().await? {
641 Some(d) => d,
642 None => return Err(SteamError::ConnectionError("Connection closed by server during login".into())),
643 };
644
645 let msg = SteamMessage::decode_from_bytes(&data)?;
646 debug!("Received message: {:?}", msg.msg);
647
648 if msg.msg == EMsg::ClientLogOnResponse {
649 return msg.decode_body::<CMsgClientLogonResponse>();
650 }
651
652 if msg.msg == EMsg::Multi {
654 let multi = match msg.decode_body::<steam_protos::CMsgMulti>() {
655 Ok(m) => m,
656 Err(e) => {
657 warn!("Failed to decode Multi message: {}", e);
658 continue;
659 }
660 };
661 let body = multi.message_body.unwrap_or_default();
662 let payload = if multi.size_unzipped.unwrap_or(0) > 0 {
663 let decompressed_result = tokio::task::spawn_blocking(move || {
666 let mut decoder = GzDecoder::new(&body[..]);
667 let mut decompressed = Vec::new();
668 if decoder.read_to_end(&mut decompressed).is_ok() {
669 Some(decompressed)
670 } else {
671 None
672 }
673 })
674 .await
675 .map_err(|e| SteamError::Other(format!("Task join error: {}", e)))?;
676
677 match decompressed_result {
678 Some(d) => d,
679 None => {
680 error!("Failed to decompress Multi message");
681 continue;
682 }
683 }
684 } else {
685 body
686 };
687
688 let mut offset = 0;
690 while offset + 4 <= payload.len() {
691 let sub_size = u32::from_le_bytes([payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3]]) as usize;
692 offset += 4;
693
694 if offset + sub_size > payload.len() {
695 break;
696 }
697
698 let sub_data = &payload[offset..offset + sub_size];
699 match SteamMessage::decode_from_bytes(sub_data) {
700 Ok(sub_msg) => {
701 debug!("Received sub-message inside Multi: {:?}", sub_msg.msg);
702 if sub_msg.msg == EMsg::ClientLogOnResponse {
703 return sub_msg.decode_body::<CMsgClientLogonResponse>();
704 }
705 }
706 Err(e) => {
707 warn!("Failed to decode sub-message inside Multi: {}", e);
708 }
709 }
710 offset += sub_size;
711 }
712 }
713 }
714 };
715
716 match tokio::time::timeout(LOGON_RESPONSE_TIMEOUT, work).await {
717 Ok(result) => result,
718 Err(_) => Err(SteamError::Timeout),
719 }
720 }
721
722 fn create_proto_message<T: Message>(&self, msg: EMsg, body: &T) -> SteamMessage {
724 use crate::protocol::messages::{build_proto_header, create_steam_message};
725
726 let steam_id = self.auth.read().temp_steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0);
727 let header = build_proto_header(self.auth.read().session_id, steam_id, u64::MAX, u64::MAX);
728
729 create_steam_message(msg, header, body)
730 }
731
732 pub async fn log_off(&mut self) -> Result<(), SteamError> {
734 if self.steam_id.is_none() {
735 return Err(SteamError::NotLoggedOn);
736 }
737
738 info!("Logging off from Steam");
739 self.auth.write().logging_off = true;
740
741 let logoff = CMsgClientLogOff::default();
743 let msg = self.create_proto_message(EMsg::ClientLogOff, &logoff);
744
745 if let Some(ref mut conn) = self.connection {
747 let _ = conn.send(msg.encode()).await;
748 }
749
750 let close_result = if let Some(conn) = self.connection.take() { conn.close().await } else { Ok(()) };
752
753 self.steam_id = None;
755 *self.account.write() = AccountState::default();
756 self.social.write().friends.clear();
757 self.auth.write().logon_details = None;
758 self.auth.write().session_id = 0;
759
760 self.auth.write().logging_off = false;
761
762 close_result
763 }
764
765 pub async fn log_on_with_password(&mut self, account_name: &str, password: &str, steam_guard_code: Option<&str>, machine_auth_token: Option<&str>) -> Result<LogOnResponse, SteamError> {
793 if self.steam_id.is_some() {
794 return Err(SteamError::AlreadyLoggedOn);
795 }
796
797 if self.auth.read().connecting {
798 return Err(SteamError::AlreadyConnecting);
799 }
800
801 debug!("Starting password authentication for {}", account_name);
802 crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
803
804 let mut session = LoginSession::new(EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient, Some(LoginSessionOptions { machine_friendly_name: Some(format!("DESKTOP-{}", self.rng.gen_u32() % 1000000)), ..Default::default() }));
806
807 let credentials = CredentialsDetails {
809 account_name: account_name.to_string(),
810 password: password.to_string(),
811 persistence: None,
812 steam_guard_machine_token: machine_auth_token.map(|s| s.to_string()),
813 steam_guard_code: steam_guard_code.map(|s| s.to_string()),
814 };
815
816 let start_result = match session.start_with_credentials(credentials).await {
817 Ok(res) => res,
818 Err(e) => {
819 if let steam_auth::SessionError::SteamError(_, steam_enums::EResult::AccountLoginDeniedThrottle) = e {
820 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
821 }
822 return Err(SteamError::SessionError(e));
823 }
824 };
825
826 if start_result.action_required {
828 if let Some(ref actions) = start_result.valid_actions {
829 let action = actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeEmailCode)).or_else(|| actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeDeviceCode))).or_else(|| actions.first());
832
833 if let Some(action) = action {
834 debug!("Steam Guard required: {:?}", action.guard_type);
835
836 self.auth.write().login_session = Some(session);
838
839 return Err(SteamError::SteamGuardRequired { guard_type: action.guard_type, email_domain: action.detail.clone() });
840 }
841 }
842
843 warn!("Action required but no valid actions returned");
845 return Err(SteamError::InvalidCredentials);
846 }
847
848 self.complete_password_auth(session, account_name).await
850 }
851
852 pub async fn submit_steam_guard_code(&mut self, code: &str) -> Result<LogOnResponse, SteamError> {
865 let session = self.auth.write().login_session.take().ok_or_else(|| SteamError::Other("No pending login session".to_string()))?;
866
867 let account_name = session.account_name().map(|s| s.to_string()).unwrap_or_default();
868
869 let mut session = session;
870 session.submit_steam_guard_code(code).await?;
871
872 self.complete_password_auth(session, &account_name).await
873 }
874
875 async fn complete_password_auth(&mut self, mut session: LoginSession, account_name: &str) -> Result<LogOnResponse, SteamError> {
877 let poll_result = loop {
879 match session.poll().await? {
880 Some(result) => break result,
881 None => {
882 let interval = session.poll_interval();
884 self.clock.sleep(Duration::from_secs_f32(interval)).await;
885 }
886 }
887 };
888
889 debug!("Password authentication successful, got refresh token");
890 debug!("Refresh token account: {}", poll_result.account_name);
891
892 if let Some(ref _guard_data) = poll_result.new_guard_data {
894 debug!("Received new Steam Guard machine token");
895 self.auth.write().login_session = Some(session);
897 }
898
899 self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::RefreshToken { token: poll_result.refresh_token.clone(), account_name: poll_result.account_name.clone() }));
901
902 self.log_on(LogOnDetails {
904 refresh_token: Some(poll_result.refresh_token),
905 account_name: Some(account_name.to_string()),
906 ..Default::default()
907 })
908 .await
909 }
910
911 pub async fn log_on_with_cookies(&mut self, cookies: &str) -> Result<LogOnResponse, SteamError> {
937 if self.steam_id.is_some() {
938 return Err(SteamError::AlreadyLoggedOn);
939 }
940 if self.auth.read().connecting {
941 return Err(SteamError::AlreadyConnecting);
942 }
943
944 crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
945
946 let (web_logon_token, account_name, steam_id) = fetch_client_js_token(self.http_client.as_ref(), cookies).await?;
947
948 debug!("Minted web logon token for {} ({})", account_name, steam_id);
949
950 self.log_on(LogOnDetails {
951 web_logon_token: Some(web_logon_token),
952 account_name: Some(account_name),
953 steam_id: Some(steam_id),
954 ..Default::default()
955 })
956 .await
957 }
958
959 pub fn reset_connecting_state(&mut self) {
963 self.auth.write().connecting = false;
964 }
965
966 pub async fn get_web_session(&mut self) -> Result<(String, Vec<String>), SteamError> {
991 if self.steam_id.is_none() {
992 return Err(SteamError::NotLoggedOn);
993 }
994
995 let mut session = self.auth.write().login_session.take().ok_or_else(|| SteamError::Other("No login session available. Web session requires refresh token login.".to_string()))?;
998
999 let cookies_result = session.get_web_cookies().await;
1000 self.auth.write().login_session = Some(session);
1001 let cookies = cookies_result.map_err(|e| SteamError::Other(format!("Failed to get web cookies: {}", e)))?;
1002
1003 let session_id = cookies.iter().find(|c| c.starts_with("sessionid=")).and_then(|c| c.strip_prefix("sessionid=")).and_then(|c| c.split(';').next()).map(|s| s.to_string()).unwrap_or_else(|| {
1005 format!("{:x}", self.rng.gen_u64())
1007 });
1008
1009 self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::WebSession { session_id: session_id.clone(), cookies: cookies.clone() }));
1011
1012 Ok((session_id, cookies))
1013 }
1014
1015 pub(crate) async fn send_binary_message(&mut self, msg_type: EMsg, body: &[u8]) -> Result<(), SteamError> {
1017 use crate::protocol::{ExtendedMessageHeader, MessageHeader, SteamMessage};
1018
1019 let header = ExtendedMessageHeader {
1020 header_size: 36,
1021 header_version: 2,
1022 target_job_id: u64::MAX,
1023 source_job_id: u64::MAX,
1024 header_canary: 239,
1025 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1026 session_id: self.auth.read().session_id,
1027 };
1028
1029 let msg = SteamMessage {
1030 msg: msg_type,
1031 is_proto: false,
1032 header: MessageHeader::Extended(header),
1033 body: bytes::Bytes::copy_from_slice(body),
1034 };
1035
1036 if let Some(ref mut conn) = self.connection {
1037 conn.send(msg.encode()).await?;
1038 }
1039
1040 Ok(())
1041 }
1042
1043 pub(crate) async fn send_message<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<(), SteamError> {
1045 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1046
1047 let header = ProtobufMessageHeader {
1048 header_length: 0,
1049 session_id: self.auth.read().session_id,
1050 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1051 job_id_source: u64::MAX,
1052 job_id_target: u64::MAX,
1053 target_job_name: None,
1054 routing_appid: None,
1055 };
1056
1057 let msg = SteamMessage::new_proto(msg_type, header, body);
1058
1059 if let Some(ref mut conn) = self.connection {
1060 conn.send(msg.encode()).await?;
1061 }
1062
1063 Ok(())
1064 }
1065
1066 pub(crate) async fn send_message_with_routing<T: Message>(&mut self, msg_type: EMsg, routing_appid: u32, body: &T) -> Result<(), SteamError> {
1068 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1069
1070 let header = ProtobufMessageHeader {
1071 header_length: 0,
1072 session_id: self.auth.read().session_id,
1073 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1074 job_id_source: u64::MAX,
1075 job_id_target: u64::MAX,
1076 target_job_name: None,
1077 routing_appid: Some(routing_appid),
1078 };
1079
1080 let msg = SteamMessage::new_proto(msg_type, header, body);
1081
1082 if let Some(ref mut conn) = self.connection {
1083 conn.send(msg.encode()).await?;
1084 }
1085
1086 Ok(())
1087 }
1088
1089 pub(crate) async fn send_message_with_job<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<tokio::sync::oneshot::Receiver<crate::internal::jobs::JobResponse>, SteamError> {
1095 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1096
1097 let (job_id, response_rx) = self.job_manager.create_job().await;
1099 info!("[SteamClient] send_message_with_job: Created JobID={} for EMsg::{:?}", job_id, msg_type);
1100
1101 let header = ProtobufMessageHeader {
1102 header_length: 0,
1103 session_id: self.auth.read().session_id,
1104 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1105 job_id_source: job_id,
1106 job_id_target: u64::MAX,
1107 target_job_name: None,
1108 routing_appid: None,
1109 };
1110
1111 let msg = SteamMessage::new_proto(msg_type, header, body);
1112
1113 if let Some(ref mut conn) = self.connection {
1114 conn.send(msg.encode()).await?;
1115 }
1116
1117 Ok(response_rx)
1118 }
1119
1120 async fn process_queued_message(&mut self, msg: QueuedMessage) {
1122 info!("[SteamClient] process_queued_message: Processing message to {}", msg.friend);
1123
1124 let processed_message = if msg.contains_bbcode { msg.message.replace('[', "\\[") } else { msg.message.clone() };
1126
1127 debug!("[SteamClient] process_queued_message: Original len={}, Processed len={}", msg.message.len(), processed_message.len());
1128
1129 let request = steam_protos::CFriendMessagesSendMessageRequest {
1130 steamid: Some(msg.friend.steam_id64()),
1131 chat_entry_type: Some(msg.entry_type as i32),
1132 message: Some(processed_message),
1133 contains_bbcode: Some(msg.contains_bbcode),
1134 ..Default::default()
1135 };
1136
1137 info!("[SteamClient] process_queued_message: Sending FriendMessages.SendMessage#1 to Steam");
1138
1139 match self.send_service_method_with_job("FriendMessages.SendMessage#1", &request).await {
1141 Ok(rx) => {
1142 info!("[SteamClient] process_queued_message: Request sent, waiting for response...");
1143 let friend_id = msg.friend;
1144 let original_message = msg.message.clone();
1145
1146 self.chat.tasks.spawn(
1148 async move {
1149 const CHAT_TIMEOUT_SECS: u64 = 30;
1150
1151 debug!("[SteamClient] chat_task: Waiting for response (timeout: {}s)", CHAT_TIMEOUT_SECS);
1152
1153 let res = match tokio::time::timeout(std::time::Duration::from_secs(CHAT_TIMEOUT_SECS), rx).await {
1154 Ok(Ok(crate::internal::jobs::JobResponse::Success(body))) => {
1155 info!("[SteamClient] chat_task: Received Success response, body len={}", body.len());
1156 steam_protos::CFriendMessagesSendMessageResponse::decode(&body[..])
1157 .map(|response| {
1158 info!("[SteamClient] chat_task: Decoded response - server_ts={}, ordinal={}", response.server_timestamp.unwrap_or(0), response.ordinal.unwrap_or(0));
1159 crate::services::chat::SendMessageResult {
1160 modified_message: response.modified_message.unwrap_or(original_message),
1161 server_timestamp: response.server_timestamp.unwrap_or(0),
1162 ordinal: response.ordinal.unwrap_or(0),
1163 }
1164 })
1165 .map_err(|e| {
1166 error!("[SteamClient] chat_task: Failed to decode response: {:?}", e);
1167 SteamError::DeserializationFailed
1168 })
1169 }
1170 Ok(Ok(crate::internal::jobs::JobResponse::Timeout)) => {
1171 error!("[SteamClient] chat_task: Job response timeout (Steam didn't respond)");
1172 Err(SteamError::ResponseTimeout)
1173 }
1174 Ok(Ok(crate::internal::jobs::JobResponse::Error(e))) => {
1175 error!("[SteamClient] chat_task: Job response error: {}", e);
1176 Err(SteamError::ProtocolError(e))
1177 }
1178 Ok(Err(e)) => {
1179 error!("[SteamClient] chat_task: Job channel closed: {:?}", e);
1180 Err(SteamError::Other("Job response channel closed".into()))
1181 }
1182 Err(_) => {
1183 error!("[SteamClient] chat_task: Tokio timeout elapsed after {}s", CHAT_TIMEOUT_SECS);
1185 Err(SteamError::ResponseTimeout)
1186 }
1187 };
1188
1189 match &res {
1190 Ok(r) => info!("[SteamClient] chat_task: Final result SUCCESS - ts={}", r.server_timestamp),
1191 Err(e) => error!("[SteamClient] chat_task: Final result ERROR - {:?}", e),
1192 }
1193
1194 (msg, res)
1195 }
1196 .instrument(info_span!("chat_send", friend = %friend_id)),
1197 );
1198 }
1199 Err(e) => {
1200 error!("[SteamClient] process_queued_message: Failed to send request to Steam: {:?}", e);
1202 let _ = msg.respond_to.send(Err(e));
1203 }
1204 }
1205 }
1206
1207 pub async fn send_unified_request_and_wait<T, R>(&mut self, method: &str, request: &T) -> Result<R, SteamError>
1212 where
1213 T: prost::Message,
1214 R: prost::Message + Default,
1215 {
1216 use crate::internal::jobs::JobResponse;
1217
1218 let response_rx = self.send_service_method_with_job(method, request).await?;
1220
1221 match response_rx.await {
1223 Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1224 Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1225 Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1226 Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1227 }
1228 }
1229
1230 pub async fn send_request_and_wait<T, R>(&mut self, emsg: EMsg, request: &T) -> Result<R, SteamError>
1235 where
1236 T: prost::Message,
1237 R: prost::Message + Default,
1238 {
1239 use crate::internal::jobs::JobResponse;
1240
1241 let response_rx = self.send_message_with_job(emsg, request).await?;
1243
1244 match response_rx.await {
1246 Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1247 Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1248 Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1249 Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1250 }
1251 }
1252
1253 pub async fn poll_event(&mut self) -> Result<Option<super::events::SteamEvent>, SteamError> {
1284 use super::events::{ConnectionEvent, SteamEvent};
1285
1286 if let Some(mut event) = self.event_queue.pop_front() {
1288 self.handle_event(&mut event);
1289 self.post_process_event(&event).await;
1290 return Ok(Some(event));
1291 }
1292
1293 self.job_manager.cleanup_expired().await;
1295
1296 if self.reconnect_manager.is_reconnecting() {
1298 if let Some(attempt) = self.reconnect_manager.check_ready() {
1299 let delay = self.reconnect_manager.current_delay();
1300 let max_attempts = self.reconnect_manager.max_attempts();
1301
1302 self.event_queue.push_back(SteamEvent::Connection(ConnectionEvent::ReconnectAttempt { attempt, max_attempts, delay }));
1304
1305 match self.attempt_reconnect().await {
1307 Ok(()) => {
1308 self.reconnect_manager.record_success();
1310 info!("Reconnection successful");
1311
1312 if let Err(e) = self.restore_session_state().await {
1314 warn!("Failed to restore session state: {}", e);
1315 }
1316 }
1317 Err(e) => {
1318 warn!("Reconnection attempt {} failed: {:?}", attempt, e);
1320
1321 let server = self.auth.write().last_cm_server.take();
1322 let should_continue = self.reconnect_manager.record_failure(server.as_ref());
1323
1324 if !should_continue {
1325 let reason = self.reconnect_manager.last_disconnect_reason();
1327 let attempts = self.reconnect_manager.attempt();
1328
1329 return Ok(Some(SteamEvent::Connection(ConnectionEvent::ReconnectFailed { reason, attempts })));
1330 }
1331 }
1332 }
1333 } else if let Some(wait_time) = self.reconnect_manager.time_until_next_attempt() {
1334 let sleep_time = wait_time.min(Duration::from_millis(100));
1337 self.clock.sleep(sleep_time).await;
1338 }
1339 }
1340
1341 if let Some(mut event) = self.event_queue.pop_front() {
1343 self.handle_event(&mut event);
1344 self.post_process_event(&event).await;
1345 return Ok(Some(event));
1346 }
1347
1348 loop {
1349 if let Some(msg) = self.chat.pending_retry_message.take() {
1352 let rate_limited = self.chat.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1353
1354 if !rate_limited {
1355 self.process_queued_message(msg).await;
1356 } else {
1358 self.chat.pending_retry_message = Some(msg);
1360 }
1361 }
1362
1363 let rate_limited = self.chat.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1365 let can_process_queue = !rate_limited && self.chat.pending_retry_message.is_none();
1366
1367 if self.connection.is_none() {
1369 return Ok(None);
1370 }
1371
1372 let now = self.clock.now();
1374 let hb_timeout = self.heartbeat_manager.time_until_next_heartbeat(now);
1375
1376 if let Some(timeout) = hb_timeout {
1378 if timeout == Duration::ZERO {
1379 debug!("Sending heartbeat");
1380 let msg = crate::internal::heartbeat::HeartbeatManager::build_heartbeat_message(self.auth.read().session_id, self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0));
1381
1382 let result = {
1384 if let Some(conn) = self.connection.as_mut() {
1385 conn.send(msg.encode()).await
1386 } else {
1387 Err(SteamError::NotConnected)
1388 }
1389 };
1390
1391 if let Err(e) = result {
1392 warn!("Failed to send heartbeat: {}", e);
1393 let reason = match &e {
1395 SteamError::SteamResult(r) => Some(*r),
1396 _ => Some(EResult::NoConnection),
1397 };
1398 return self.handle_connection_close(reason);
1399 }
1400
1401 self.heartbeat_manager.record_heartbeat(self.clock.now());
1402 continue;
1403 }
1404 }
1405
1406 enum PollResult {
1407 Packet(Option<bytes::Bytes>),
1408 Queue(QueuedMessage),
1409 ChatResponse(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>),
1410 BackgroundResult(u64, Result<Bytes, SteamError>),
1411 Timeout,
1412 }
1413
1414 let poll_res: Result<PollResult, SteamError> = {
1415 let conn = self.connection.as_mut().unwrap();
1416 let queue = &mut self.chat.rx;
1417
1418 let recv_fut = conn.recv();
1419
1420 let rate_limit_timeout = self.chat.rate_limit_until.map(|t| {
1426 let now = self.clock.now();
1427 if t > now {
1428 t - now
1429 } else {
1430 Duration::ZERO
1431 }
1432 });
1433
1434 let sleep_duration = match (hb_timeout, rate_limit_timeout) {
1435 (Some(hb), Some(rl)) => Some(hb.min(rl)),
1436 (Some(hb), None) => Some(hb),
1437 (None, Some(rl)) => Some(rl),
1438 (None, None) => None,
1439 };
1440
1441 tokio::select! {
1442 res = recv_fut => {
1443 Ok(PollResult::Packet(res?))
1444 }
1445 Some(msg) = queue.recv(), if can_process_queue => {
1446 Ok(PollResult::Queue(msg))
1447 }
1448 Some(join_result) = self.chat.tasks.join_next() => {
1449 match join_result {
1450 Ok((msg, res)) => Ok(PollResult::ChatResponse(msg, res)),
1451 Err(e) => {
1452 warn!("Chat task failed: {:?}", e);
1454 Ok(PollResult::Timeout) }
1456 }
1457 }
1458 res = self.background_job_results_rx.recv() => {
1459 match res {
1460 Some((job_id, result)) => Ok(PollResult::BackgroundResult(job_id, result)),
1461 None => Ok(PollResult::Timeout),
1462 }
1463 }
1464 _ = async {
1465 if let Some(d) = sleep_duration {
1466 tokio::time::sleep(d).await;
1467 } else {
1468 std::future::pending::<()>().await;
1469 }
1470 } => {
1471 Ok(PollResult::Timeout)
1472 }
1473 }
1474 };
1475
1476 match poll_res {
1477 Ok(PollResult::Packet(Some(data))) => {
1478 let messages = super::events::MessageHandler::decode_packet(&data);
1480
1481 for decoded in messages {
1482 if let Some(job_id) = decoded.job_id_target {
1484 info!("[SteamClient] poll_event: Completing job {} via job_manager", job_id);
1485 self.job_manager.complete_job_success(job_id, decoded.body).await;
1486 }
1487
1488 for event in &decoded.events {
1490 if let super::events::SteamEvent::Apps(super::events::AppsEvent::GCReceived(gc_msg)) = event {
1491 self.gc_jobs.try_complete(gc_msg.appid, gc_msg.msg_type, gc_msg.payload.clone());
1492 }
1493 }
1494
1495 self.event_queue.extend(decoded.events);
1497 }
1498
1499 if let Some(mut event) = self.event_queue.pop_front() {
1501 self.handle_event(&mut event);
1502 self.post_process_event(&event).await;
1503 return Ok(Some(event));
1504 }
1505 }
1506 Ok(PollResult::Packet(None)) => {
1507 return self.handle_connection_close(None);
1509 }
1510 Ok(PollResult::Queue(msg)) => {
1511 info!("[SteamClient] poll_event: Dequeued message from chat_queue_rx for {}", msg.friend);
1512 self.process_queued_message(msg).await;
1513 }
1515 Ok(PollResult::ChatResponse(msg, result)) => {
1516 info!("[SteamClient] poll_event: Received ChatResponse for {}", msg.friend);
1517 match result {
1518 Ok(res) => {
1519 info!("[SteamClient] poll_event: ChatResponse SUCCESS - server_ts={}", res.server_timestamp);
1520 let _ = msg.respond_to.send(Ok(res));
1521 }
1522 Err(SteamError::SteamResult(steam_enums::EResult::RateLimitExceeded)) => {
1523 warn!("[SteamClient] poll_event: Rate limit exceeded for chat message, backing off for 60 seconds");
1524 crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "RateLimitExceeded");
1525 self.chat.rate_limit_until = Some(self.clock.now() + std::time::Duration::from_secs(60));
1526 self.chat.pending_retry_message = Some(msg);
1527 }
1528 Err(e) => {
1529 error!("[SteamClient] poll_event: ChatResponse ERROR - {:?}", e);
1530 let _ = msg.respond_to.send(Err(e));
1531 }
1532 }
1533 }
1534 Ok(PollResult::BackgroundResult(job_id, result)) => {
1535 debug!("[SteamClient] poll_event: Received BackgroundResult for job {}", job_id);
1536 if let Some(task) = self.background_job_tasks.remove(&job_id) {
1537 match task {
1538 BackgroundTask::FriendsList => match result {
1539 Ok(body) => {
1540 debug!("[SteamClient] poll_event: Decoding FriendsList response (len={})", body.len());
1541 match steam_protos::CFriendsListGetFriendsListResponse::decode(&body[..]) {
1542 Ok(response) => {
1543 debug!("[SteamClient] poll_event: Successfully decoded FriendsList, processing...");
1544 self.handle_friends_list_unified_response(response).await;
1545 }
1546 Err(e) => {
1547 error!("[SteamClient] poll_event: Failed to decode FriendsList response: {:?}", e);
1548 }
1549 }
1550 }
1551 Err(e) => {
1552 error!("[SteamClient] poll_event: Background FriendsList job failed: {:?}", e);
1553 }
1554 },
1555 BackgroundTask::OfflineMessages(friend_id) => match result {
1556 Ok(body) => {
1557 debug!("[SteamClient] poll_event: Decoding OfflineMessages response for {} (len={})", friend_id, body.len());
1558 match steam_protos::CFriendMessagesGetRecentMessagesResponse::decode(&body[..]) {
1559 Ok(response) => {
1560 debug!("[SteamClient] poll_event: Successfully decoded OfflineMessages, processing...");
1561 let last_view_timestamp = self.social.read().chat_last_view.get(&friend_id).copied().unwrap_or(0);
1562
1563 let messages = response
1564 .messages
1565 .into_iter()
1566 .map(|m| crate::services::chat::HistoryMessage {
1567 sender: SteamID::from_steam_id64(m.accountid.unwrap_or(0) as u64),
1568 timestamp: m.timestamp.unwrap_or(0),
1569 ordinal: m.ordinal.unwrap_or(0),
1570 message: m.message.unwrap_or_default(),
1571 unread: m.timestamp.unwrap_or(0) > last_view_timestamp,
1572 })
1573 .collect();
1574
1575 self.event_queue.push_back(crate::client::events::SteamEvent::Chat(crate::client::events::ChatEvent::OfflineMessagesFetched { friend_id, messages }));
1576 }
1577 Err(e) => {
1578 error!("[SteamClient] poll_event: Failed to decode OfflineMessages response: {:?}", e);
1579 }
1580 }
1581 }
1582 Err(e) => {
1583 error!("[SteamClient] poll_event: Background OfflineMessages job failed for {}: {:?}", friend_id, e);
1584 }
1585 },
1586 }
1587
1588 if let Some(mut event) = self.event_queue.pop_front() {
1591 self.handle_event(&mut event);
1592 self.post_process_event(&event).await;
1593 return Ok(Some(event));
1594 }
1595 } else {
1596 warn!("[SteamClient] poll_event: Received BackgroundResult for unknown job {}", job_id);
1597 }
1598 }
1599 Ok(PollResult::Timeout) => {
1600 }
1602 Err(e) => {
1603 let reason = match &e {
1605 SteamError::SteamResult(r) => Some(*r),
1606 _ => Some(EResult::NoConnection),
1607 };
1608 return self.handle_connection_close(reason);
1609 }
1610 }
1611 }
1612 }
1613
1614 fn handle_connection_close(&mut self, reason: Option<EResult>) -> Result<Option<super::events::SteamEvent>, SteamError> {
1616 use super::events::{ConnectionEvent, SteamEvent};
1617
1618 debug!("Handling connection close, reason: {:?}", reason);
1619
1620 self.connection = None;
1622 self.auth.write().connecting = false;
1623
1624 let should_reconnect = !self.auth.read().logging_off && self.options.auto_relogin && reason.map(|r| self.reconnect_manager.should_reconnect(r)).unwrap_or(true);
1626
1627 if should_reconnect && self.auth.read().logon_details.is_some() {
1628 let eresult = reason.unwrap_or(EResult::NoConnection);
1630 self.reconnect_manager.start_reconnection(eresult);
1631
1632 if let Some(server) = self.auth.read().last_cm_server.clone() {
1634 self.reconnect_manager.blacklist_server(&server.endpoint);
1635 }
1636
1637 info!("Connection lost, will attempt reconnection");
1638 } else {
1639 self.steam_id = None;
1641 self.reconnect_manager.reset();
1642 self.auth.write().relogging = false;
1643 self.auth.write().logging_off = false;
1644 }
1645
1646 Ok(Some(SteamEvent::Connection(ConnectionEvent::Disconnected { reason, will_reconnect: should_reconnect })))
1647 }
1648
1649 async fn restore_session_state(&mut self) -> Result<(), SteamError> {
1652 info!("Restoring session state...");
1653
1654 let app_ids = self.session_recovery.last_playing_app_ids.clone();
1656 if !app_ids.is_empty() {
1657 debug!("Restoring games played: {:?}", app_ids);
1658 self.games_played_with_extra(app_ids, self.session_recovery.last_custom_game_name.clone()).await?;
1659 }
1660
1661 if let Some(state) = self.session_recovery.last_persona_state {
1663 debug!("Restoring persona state: {:?}", state);
1664 self.set_persona(state, self.session_recovery.last_player_name.clone()).await?;
1665 }
1666
1667 let rp_data = self.session_recovery.last_rich_presence.clone();
1669 for (app_id, data) in rp_data {
1670 debug!("Restoring rich presence for app {}", app_id);
1671 self.upload_rich_presence(app_id, &data).await?;
1672 }
1673
1674 Ok(())
1675 }
1676
1677 async fn attempt_reconnect(&mut self) -> Result<(), SteamError> {
1679 let details = self.auth.read().logon_details.clone().ok_or_else(|| SteamError::Other("No logon details available for reconnection".to_string()))?;
1681
1682 debug!("Attempting reconnection with stored credentials");
1683
1684 let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
1686 let server = provider.get_server().await?;
1687 self.auth.write().last_cm_server = Some(server.clone());
1688
1689 info!("Reconnecting to CM server: {}", server.endpoint);
1690
1691 let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
1693 debug!("WebSocket connection established");
1694
1695 self.auth.write().session_id = (self.rng.gen_u32() & 0x7FFF_FFFF) as i32;
1697
1698 if details.refresh_token.is_none() {
1700 let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
1701 let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
1702 connection.send(hello_msg.encode()).await?;
1703 }
1704
1705 let is_anonymous = details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none());
1707
1708 let logon = self.build_logon_message(&details, is_anonymous);
1710 let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
1711 connection.send(logon_msg.encode()).await?;
1712
1713 let response = self.wait_for_logon_response(&mut *connection).await?;
1715
1716 let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
1718
1719 if eresult != EResult::OK {
1720 return Err(SteamError::SteamResult(eresult));
1721 }
1722
1723 self.connection = Some(connection);
1725 self.auth.write().connecting = false;
1726
1727 if let Some(sid) = response.client_supplied_steamid {
1729 self.steam_id = Some(SteamID::from(sid));
1730 }
1731
1732 {
1733 let mut account = self.account.write();
1734 account.cell_id = response.cell_id;
1735 account.vanity_url = response.vanity_url.clone();
1736
1737 if let Some(ref ip) = response.public_ip {
1738 if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
1739 account.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
1740 }
1741 }
1742 }
1743
1744 info!("Successfully reconnected");
1745 Ok(())
1746 }
1747
1748 pub async fn poll_events(&mut self) -> Result<Vec<super::events::SteamEvent>, SteamError> {
1753 let mut all_events = Vec::new();
1754
1755 while let Some(event) = self.poll_event().await? {
1756 all_events.push(event);
1757
1758 if self.event_queue.is_empty() {
1760 break;
1761 }
1762 }
1763
1764 Ok(all_events)
1765 }
1766
1767 fn handle_event(&mut self, event: &mut super::events::SteamEvent) {
1769 use super::events::{AppsEvent, AuthEvent, ConnectionEvent, FriendsEvent, SteamEvent};
1770
1771 match event {
1772 SteamEvent::Auth(AuthEvent::LoggedOn { steam_id }) => {
1773 self.steam_id = Some(*steam_id);
1774 self.auth.write().connecting = false;
1775 self.reconnect_manager.record_success();
1777 }
1778 SteamEvent::Auth(AuthEvent::LoggedOff { .. }) => {
1779 self.steam_id = None;
1780 }
1781 SteamEvent::Auth(AuthEvent::GameConnectTokens { tokens }) => {
1782 self.gc_tokens.extend(tokens.clone());
1783 }
1784 SteamEvent::Friends(FriendsEvent::FriendsList { friends, .. }) => {
1785 for friend in friends {
1786 if friend.relationship == steam_enums::EFriendRelationship::None {
1787 self.social.write().friends.remove(&friend.steam_id);
1788 } else {
1789 self.social.write().friends.insert(friend.steam_id, friend.relationship as u32);
1790 }
1791 }
1792 }
1793 SteamEvent::Friends(FriendsEvent::PersonaState(persona)) => {
1794 let mut persona = *persona.clone();
1795 if let Some(game_id) = persona.game_id {
1796 let app_id = game_id as u32;
1797 if app_id != 0 {
1798 if !self.apps.read().info.contains_key(&app_id) {
1800 self.apps.write().pending.insert(app_id);
1801 } else if persona.game_name.is_none() || persona.game_name.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
1802 if let Some(name) = self.get_app_name(app_id) {
1804 persona.game_name = Some(name);
1805 }
1806 }
1807 }
1808 }
1809
1810 self.social.write().users.entry(persona.steam_id).and_modify(|existing| existing.merge(&persona)).or_insert_with(|| persona.clone());
1812
1813 self.social.read().persona_cache.insert(persona.steam_id, persona);
1815 }
1816 SteamEvent::Apps(AppsEvent::PlayingState { playing_app, blocked }) => {
1817 let mut apps = self.apps.write();
1818 apps.playing_blocked = *blocked;
1819 if *playing_app != 0 {
1820 if !apps.info.contains_key(playing_app) {
1821 apps.pending.insert(*playing_app);
1822 }
1823 apps.info.entry(*playing_app).or_insert_with(|| super::events::AppInfoData { app_id: *playing_app, change_number: 0, missing_token: false, app_info: None });
1824 }
1825 }
1826 SteamEvent::Apps(AppsEvent::ProductInfoResponse { apps: new_apps, .. }) => {
1827 let mut apps = self.apps.write();
1828 for (app_id, data) in new_apps {
1829 apps.info.insert(*app_id, data.clone());
1830 apps.pending.remove(app_id);
1831 }
1832 }
1833 SteamEvent::Connection(ConnectionEvent::Disconnected { will_reconnect, .. }) => {
1834 if !*will_reconnect {
1836 self.steam_id = None;
1837 }
1838 self.connection = None;
1839 self.auth.write().connecting = false;
1840 }
1841 SteamEvent::Connection(ConnectionEvent::ReconnectFailed { .. }) => {
1842 self.steam_id = None;
1844 self.connection = None;
1845 self.auth.write().connecting = false;
1846 self.reconnect_manager.reset();
1847 }
1848 SteamEvent::Apps(super::events::AppsEvent::LicenseList { licenses }) => {
1849 self.apps.write().licenses = licenses.clone();
1850 }
1851 _ => {}
1852 }
1853 }
1854
1855 async fn post_process_event(&mut self, event: &super::events::SteamEvent) {
1858 use super::events::{FriendsEvent, SteamEvent};
1859
1860 if let SteamEvent::Friends(FriendsEvent::FriendsList { incremental: false, .. }) = event {
1861 let friends: Vec<SteamID> = self.social.read().friends.iter().filter(|&(_, &rel)| rel == steam_enums::EFriendRelationship::Friend as u32).map(|(&id, _)| id).collect();
1862
1863 if !friends.is_empty() {
1864 debug!("Auto-fetching personas for {} friends", friends.len());
1865 if let Err(e) = self.get_personas(friends).await {
1866 warn!("Failed to auto-fetch personas: {}", e);
1867 }
1868 }
1869 }
1870 }
1871
1872 pub fn format_rich_presence(&self, steam_id: SteamID, language: &str) -> Option<String> {
1878 let user = self.social.read().users.get(&steam_id).cloned()?;
1881 let app_id = user.game_id? as u32;
1882 let apps = self.apps.read();
1885 let app_info = apps.info.get(&app_id)?;
1886 let app_vdf = app_info.app_info.as_ref()?;
1887
1888 let localization = app_vdf.get("common").and_then(|c| c.get("rich_presence")).and_then(|rp| rp.get("localization")).and_then(|loc| loc.get(language))?;
1890
1891 let display_token = user.rich_presence.get("steam_display")?;
1893 let mut formatted = localization.get_str(display_token)?.to_string();
1894
1895 for (key, value) in &user.rich_presence {
1897 if key == "steam_display" {
1898 continue;
1899 }
1900 let placeholder = format!("%{}%", key);
1901 let localized_value = localization.get_str(value).unwrap_or(value);
1903 formatted = formatted.replace(&placeholder, localized_value);
1904 }
1905
1906 Some(formatted)
1907 }
1908
1909 pub fn get_app_name(&self, app_id: u32) -> Option<String> {
1911 #[cfg(feature = "static-app-list")]
1914 {
1915 if let Ok(idx) = crate::client::static_app_list::APP_LIST.binary_search_by_key(&app_id, |&(id, _)| id) {
1916 return Some(crate::client::static_app_list::APP_LIST[idx].1.to_string());
1917 }
1918 }
1919
1920 let apps = self.apps.read();
1922 let info = apps.info.get(&app_id)?;
1923 if let Some(vdf) = &info.app_info {
1924 vdf.get("common").and_then(|c| c.get_str("name")).or_else(|| vdf.get_str("name")).map(|s| s.to_string())
1925 } else {
1926 None
1927 }
1928 }
1929
1930 pub async fn fetch_pending_app_info(&mut self) -> Result<(), SteamError> {
1935 if self.apps.read().pending.is_empty() {
1936 return Ok(());
1937 }
1938
1939 let app_ids: Vec<u32> = self.apps.read().pending.iter().copied().collect();
1940 self.apps.write().pending.clear();
1942 self.get_product_info(app_ids).await
1943 }
1944
1945 pub async fn update_friend_sessions(&mut self) -> Result<(), SteamError> {
1950 let sessions = self.get_active_friend_sessions().await?;
1951
1952 {
1954 let mut social = self.social.write();
1955 for session in sessions {
1956 social
1957 .users
1958 .entry(session.friend)
1959 .and_modify(|user| {
1960 user.unread_count = session.unread_count;
1961 user.last_message_time = session.time_last_message;
1962 })
1963 .or_insert_with(|| UserPersona {
1964 steam_id: session.friend,
1965 unread_count: session.unread_count,
1966 last_message_time: session.time_last_message,
1967 ..Default::default()
1968 });
1969 }
1970 }
1971
1972 Ok(())
1973 }
1974
1975 pub fn csgo(&mut self) -> crate::services::CSGOClient<'_> {
1977 crate::services::CSGOClient::new(self)
1978 }
1979
1980 pub(crate) async fn send_service_method_and_wait<Req: prost::Message, Resp: prost::Message + Default>(&mut self, method: &str, body: &Req) -> Result<Resp, SteamError> {
1982 let rx = self.send_service_method_with_job(method, body).await?;
1983
1984 let job_response = rx.await.map_err(|_| SteamError::ResponseTimeout)?;
1986
1987 let body = match job_response {
1988 crate::internal::jobs::JobResponse::Success(bytes) => bytes,
1989 crate::internal::jobs::JobResponse::Timeout => return Err(SteamError::ResponseTimeout),
1990 crate::internal::jobs::JobResponse::Error(msg) => return Err(SteamError::ProtocolError(msg)),
1991 };
1992
1993 Resp::decode(&body[..]).map_err(|_| SteamError::DeserializationFailed)
1994 }
1995}
1996
1997use std::task::{Context, Poll};
2002
2003use futures::Stream;
2004
2005pub struct SteamEventStream<'a> {
2037 client: &'a mut SteamClient,
2038}
2039
2040impl SteamClient {
2041 pub(crate) async fn send_service_method_background<T: prost::Message>(&mut self, method: &str, body: &T, task: BackgroundTask) -> Result<(), SteamError> {
2065 let (job_id, rx) = self.job_manager.create_job().await;
2066
2067 self.send_service_method_with_job_id(method, body, job_id).await?;
2068
2069 self.background_job_tasks.insert(job_id, task);
2070
2071 let tx = self.background_job_results_tx.clone();
2072 tokio::spawn(async move {
2073 match rx.await {
2074 Ok(crate::internal::jobs::JobResponse::Success(bytes)) => {
2075 let _ = tx.send((job_id, Ok(bytes))).await;
2076 }
2077 Ok(crate::internal::jobs::JobResponse::Timeout) => {
2078 let _ = tx.send((job_id, Err(SteamError::Timeout))).await;
2079 }
2080 Ok(crate::internal::jobs::JobResponse::Error(e)) => {
2081 let _ = tx.send((job_id, Err(SteamError::Other(e)))).await;
2082 }
2083 Err(_) => {
2084 }
2086 }
2087 });
2088
2089 Ok(())
2090 }
2091
2092 async fn send_service_method_with_job_id<T: prost::Message>(&mut self, method: &str, body: &T, job_id: u64) -> Result<(), SteamError> {
2094 use crate::protocol::{ProtobufMessageHeader, SteamMessage};
2095
2096 let header = ProtobufMessageHeader {
2097 header_length: 0,
2098 session_id: self.auth.read().session_id,
2099 steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
2100 job_id_source: job_id,
2101 job_id_target: u64::MAX,
2102 target_job_name: Some(method.to_string()),
2103 routing_appid: None,
2104 };
2105
2106 let msg = SteamMessage::new_proto(steam_enums::EMsg::ServiceMethodCallFromClient, header, body);
2107
2108 if let Some(ref mut conn) = self.connection {
2109 conn.send(msg.encode()).await?;
2110 } else {
2111 return Err(SteamError::NotConnected);
2112 }
2113
2114 Ok(())
2115 }
2116
2117 pub fn into_stream(&mut self) -> SteamEventStream<'_> {
2118 SteamEventStream { client: self }
2119 }
2120}
2121
2122impl<'a> Stream for SteamEventStream<'a> {
2123 type Item = Result<super::events::SteamEvent, SteamError>;
2124
2125 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2126 if let Some(mut event) = self.client.event_queue.pop_front() {
2129 self.client.handle_event(&mut event);
2130 Poll::Ready(Some(Ok(event)))
2131 } else {
2132 Poll::Ready(None)
2134 }
2135 }
2136}
2137
2138#[derive(Debug, serde::Deserialize)]
2140struct ClientJsTokenResponse {
2141 #[serde(default)]
2142 logged_in: bool,
2143 #[serde(default)]
2144 steamid: Option<String>,
2145 #[serde(default)]
2146 account_name: Option<String>,
2147 #[serde(default)]
2148 token: Option<String>,
2149}
2150
2151async fn fetch_client_js_token(http_client: &dyn crate::utils::http::HttpClient, cookies: &str) -> Result<(String, String, SteamID), SteamError> {
2154 let response = http_client.get_with_cookies("https://steamcommunity.com/chat/clientjstoken", cookies).await?;
2155
2156 if !response.is_success() {
2157 return Err(SteamError::Other(format!("clientjstoken HTTP error: {}", response.status)));
2158 }
2159
2160 let body: ClientJsTokenResponse = response.json().map_err(|e| SteamError::ProtocolError(format!("Failed to parse clientjstoken response: {}", e)))?;
2161
2162 if !body.logged_in {
2163 return Err(SteamError::InvalidCredentials);
2164 }
2165
2166 let token = body.token.filter(|s| !s.is_empty()).ok_or_else(|| SteamError::ProtocolError("clientjstoken returned no token".to_string()))?;
2167 let account_name = body.account_name.filter(|s| !s.is_empty()).ok_or_else(|| SteamError::ProtocolError("clientjstoken returned no account_name".to_string()))?;
2168 let steamid_str = body.steamid.filter(|s| !s.is_empty()).ok_or_else(|| SteamError::ProtocolError("clientjstoken returned no steamid".to_string()))?;
2169 let steamid_u64: u64 = steamid_str.parse().map_err(|_| SteamError::ProtocolError(format!("clientjstoken returned malformed steamid: {}", steamid_str)))?;
2170 let steam_id = SteamID::from(steamid_u64);
2171
2172 Ok((token, account_name, steam_id))
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177 use super::*;
2178
2179 fn test_steam_id() -> SteamID {
2180 SteamID::from_steam_id64(76561198000000000)
2181 }
2182
2183 #[test]
2184 fn test_user_persona_merge_rich_presence() {
2185 let mut persona1 = UserPersona {
2186 steam_id: test_steam_id(),
2187 rich_presence: {
2188 let mut map = HashMap::new();
2189 map.insert("status".to_string(), "Online".to_string());
2190 map.insert("game".to_string(), "CS:GO".to_string());
2191 map
2192 },
2193 ..Default::default()
2194 };
2195
2196 let persona2 = UserPersona {
2197 steam_id: test_steam_id(),
2198 rich_presence: {
2199 let mut map = HashMap::new();
2200 map.insert("status".to_string(), "In-Game".to_string());
2201 map.insert("party".to_string(), "Open".to_string());
2202 map
2203 },
2204 ..Default::default()
2205 };
2206
2207 persona1.merge(&persona2);
2208
2209 assert_eq!(persona1.rich_presence.get("status"), Some(&"In-Game".to_string()));
2210 assert_eq!(persona1.rich_presence.get("game"), Some(&"CS:GO".to_string()));
2211 assert_eq!(persona1.rich_presence.get("party"), Some(&"Open".to_string()));
2212 }
2213
2214 #[test]
2215 fn test_user_persona_merge_unread_count() {
2216 let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2217
2218 let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 10, last_message_time: 200, ..Default::default() };
2219
2220 persona1.merge(&persona2);
2221
2222 assert_eq!(persona1.unread_count, 10);
2223 assert_eq!(persona1.last_message_time, 200);
2224 }
2225
2226 #[test]
2227 fn test_user_persona_merge_unread_count_zero() {
2228 let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2229
2230 let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 0, last_message_time: 0, ..Default::default() };
2232
2233 persona1.merge(&persona2);
2234
2235 assert_eq!(persona1.unread_count, 5);
2236 assert_eq!(persona1.last_message_time, 100);
2237 }
2238}