Skip to main content

steam_client/client/
steam_client.rs

1//! Main Steam client implementation.
2
3use std::{
4    collections::{HashMap, VecDeque},
5    pin::Pin,
6    sync::Arc,
7    time::Duration,
8};
9
10use chrono::{DateTime, Utc};
11use prost::Message;
12use steam_auth::{CredentialsDetails, EAuthSessionGuardType, EAuthTokenPlatformType, LoginSession, LoginSessionOptions};
13use steam_enums::{EMsg, EPersonaState, EResult};
14use steam_protos::{CMsgClientHello, CMsgClientLogOff, CMsgClientLogon, CMsgClientLogonResponse, PROTOCOL_VERSION};
15use steamid::SteamID;
16use tracing::{debug, error, info, info_span, warn, Instrument};
17
18use crate::{
19    connection::{CmServer, CmServerProvider, HttpCmServerProvider, SteamConnection, WebSocketConnection},
20    error::SteamError,
21    options::SteamOptions,
22    protocol::SteamMessage,
23    types::{AccountInfo, EmailInfo, Limitations, LogOnDetails, LogOnResponse, VacStatus, WalletInfo},
24};
25
26/// User persona information.
27#[derive(Debug, Clone)]
28pub struct UserPersona {
29    pub steam_id: SteamID,
30    pub player_name: String,
31    pub persona_state: EPersonaState,
32    pub persona_state_flags: u32,
33    pub avatar_hash: Option<String>,
34    pub game_name: Option<String>,
35    pub game_id: Option<u64>,
36    pub last_logon: Option<DateTime<Utc>>,
37    pub last_logoff: Option<DateTime<Utc>>,
38    pub last_seen_online: Option<DateTime<Utc>>,
39    pub rich_presence: HashMap<String, String>,
40    /// Steam player group ID (lobby/party) from rich presence
41    pub steam_player_group: Option<String>,
42    /// Status string from rich presence (e.g. "Competitive Mirage [ 3 : 6 ]")
43    pub rich_presence_status: Option<String>,
44    /// Map name from rich presence (e.g. "de_mirage")
45    pub game_map: Option<String>,
46    /// Game score from rich presence (e.g. "[ 3 : 6 ]")
47    pub game_score: Option<String>,
48    /// Number of players/slots in lobby/party from rich presence
49    pub num_players: Option<u32>,
50    pub unread_count: u32,
51    pub last_message_time: u32,
52}
53
54impl Default for UserPersona {
55    fn default() -> Self {
56        Self {
57            steam_id: SteamID::default(),
58            player_name: String::new(),
59            persona_state: EPersonaState::Offline,
60            persona_state_flags: 0,
61            avatar_hash: None,
62            game_name: None,
63            game_id: None,
64            last_logon: None,
65            last_logoff: None,
66            last_seen_online: None,
67            rich_presence: HashMap::new(),
68            steam_player_group: None,
69            rich_presence_status: None,
70            game_map: None,
71            game_score: None,
72            num_players: None,
73            unread_count: 0,
74            last_message_time: 0,
75        }
76    }
77}
78
79impl UserPersona {
80    /// Get the URL for the user's avatar.
81    ///
82    /// Size can be "icon" (32x32), "medium" (64x64), or "full" (184x184).
83    pub fn avatar_url(&self, size: &str) -> Option<String> {
84        let hash = self.avatar_hash.as_ref()?;
85        let size_suffix = match size {
86            "medium" => "_medium.jpg",
87            "full" => "_full.jpg",
88            _ => ".jpg",
89        };
90        Some(format!("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{}/{}{}", &hash[0..2], hash, size_suffix))
91    }
92
93    /// Merge another persona data into this one, overwriting only
94    /// non-empty/Some values.
95    pub fn merge(&mut self, other: &UserPersona) {
96        if !other.player_name.is_empty() {
97            self.player_name = other.player_name.clone();
98        }
99        self.persona_state = other.persona_state;
100        if other.persona_state_flags != 0 {
101            self.persona_state_flags = other.persona_state_flags;
102        }
103        if other.avatar_hash.is_some() {
104            self.avatar_hash = other.avatar_hash.clone();
105        }
106        if other.game_name.is_some() {
107            self.game_name = other.game_name.clone();
108        }
109        if other.game_id.is_some() && other.game_id != Some(0) {
110            self.game_id = other.game_id;
111        }
112        if other.last_logon.is_some() {
113            self.last_logon = other.last_logon;
114        }
115        if other.last_logoff.is_some() {
116            self.last_logoff = other.last_logoff;
117        }
118        if other.last_seen_online.is_some() {
119            self.last_seen_online = other.last_seen_online;
120        }
121        for (k, v) in &other.rich_presence {
122            self.rich_presence.insert(k.clone(), v.clone());
123        }
124        if other.steam_player_group.is_some() {
125            self.steam_player_group = other.steam_player_group.clone();
126        }
127        if other.rich_presence_status.is_some() {
128            self.rich_presence_status = other.rich_presence_status.clone();
129        }
130        if other.game_map.is_some() {
131            self.game_map = other.game_map.clone();
132        }
133        if other.game_score.is_some() {
134            self.game_score = other.game_score.clone();
135        }
136        // Only update unread count and last message time if they are non-zero in the
137        // other persona
138        if other.unread_count > 0 {
139            self.unread_count = other.unread_count;
140        }
141        if other.last_message_time > 0 {
142            self.last_message_time = other.last_message_time;
143        }
144    }
145}
146
147use std::time::Instant;
148
149use bytes::Bytes;
150use tokio::sync::{mpsc, oneshot};
151
152/// Background tasks that can be triggered without awaiting.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub(crate) enum BackgroundTask {
155    FriendsList,
156    OfflineMessages(SteamID),
157}
158
159/// Message queued for sending.
160#[derive(Debug)]
161pub(crate) struct QueuedMessage {
162    pub friend: SteamID,
163    pub message: String,
164    pub entry_type: steam_enums::EChatEntryType,
165    pub contains_bbcode: bool,
166    pub respond_to: oneshot::Sender<Result<crate::services::chat::SendMessageResult, SteamError>>,
167}
168
169/// Timeout for waiting for logon response (iterations).
170const LOGON_TIMEOUT_ITERATIONS: usize = 150;
171/// Delay between logon checks (milliseconds).
172const LOGON_TIMEOUT_DELAY_MS: u64 = 100;
173/// Heartbeat interval in seconds.
174const HEARTBEAT_INTERVAL_SECONDS: u64 = 30;
175
176/// The main Steam client.
177pub struct SteamClient {
178    /// Client options.
179    pub options: SteamOptions,
180
181    /// The logged-in SteamID (None if not logged in).
182    pub steam_id: Option<SteamID>,
183
184    /// Public IP as seen by Steam.
185    pub public_ip: Option<String>,
186
187    /// Cell ID for content servers.
188    pub cell_id: Option<u32>,
189
190    /// Vanity URL.
191    pub vanity_url: Option<String>,
192
193    /// Account information.
194    pub account_info: Option<AccountInfo>,
195
196    /// Email information.
197    pub email_info: Option<EmailInfo>,
198
199    /// Account limitations.
200    pub limitations: Option<Limitations>,
201
202    /// VAC status.
203    pub vac: Option<VacStatus>,
204
205    /// Wallet information.
206    pub wallet: Option<WalletInfo>,
207
208    /// Friends list (SteamID -> relationship).
209    pub my_friends: HashMap<SteamID, u32>,
210
211    /// User personas.
212    pub users: HashMap<SteamID, UserPersona>,
213
214    /// App information cache (AppID -> Info).
215    pub apps: HashMap<u32, super::events::AppInfoData>,
216
217    /// Account licenses.
218    pub licenses: Vec<super::events::LicenseEntry>,
219
220    /// App IDs that need info fetching.
221    pub pending_apps: rustc_hash::FxHashSet<u32>,
222
223    /// Chat last view timestamps (SteamID -> unix timestamp).
224    pub chat_last_view: HashMap<SteamID, u32>,
225
226    // Playing state
227    pub playing_blocked: bool,
228    pub playing_app_ids: Vec<u32>,
229
230    // Connection stats
231    pub connect_time: u64,
232    pub connection_count: u32,
233
234    // Auth state
235    pub auth_seq_me: u32,
236    pub auth_seq_them: u32,
237    pub h_steam_pipe: u32,
238    pub gc_tokens: Vec<Vec<u8>>,
239    pub active_tickets: Vec<crate::services::appauth::AuthSessionTicket>,
240
241    /// Last time this account successfully registered for a CSGO party
242    /// (in-memory)
243    pub last_time_party_register: Option<i64>,
244
245    // Message Queue
246    pub(crate) chat_queue_tx: mpsc::UnboundedSender<QueuedMessage>,
247    pub(crate) chat_queue_rx: mpsc::UnboundedReceiver<QueuedMessage>,
248    pub(crate) rate_limit_until: Option<Instant>,
249    pub(crate) pending_retry_message: Option<QueuedMessage>,
250    pub(crate) chat_tasks: tokio::task::JoinSet<(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>)>,
251
252    // Internal state (crate-visible)
253    pub(crate) connection: Option<Box<dyn SteamConnection>>,
254    pub(crate) login_session: Option<LoginSession>,
255    pub(crate) logon_details: Option<LogOnDetails>,
256    pub(crate) session_id: i32,
257    pub(crate) connecting: bool,
258    pub(crate) logging_off: bool,
259    pub(crate) temp_steam_id: Option<SteamID>,
260    pub(crate) event_queue: VecDeque<super::events::SteamEvent>,
261
262    // Reconnection state
263    pub(crate) reconnect_manager: crate::internal::reconnect::ReconnectManager,
264    pub(crate) heartbeat_manager: crate::internal::heartbeat::HeartbeatManager,
265    pub(crate) relogging: bool,
266    pub(crate) last_cm_server: Option<CmServer>,
267
268    // Job manager for request-response correlation
269    pub(crate) job_manager: crate::internal::jobs::JobManager,
270
271    // GC job manager for GC message request-response correlation
272    pub(crate) gc_jobs: crate::services::gc::GCJobManager,
273
274    /// Mapping of Job IDs to specific background tasks
275    pub(crate) background_job_tasks: HashMap<u64, BackgroundTask>,
276
277    /// Channel for background job results
278    pub(crate) background_job_results_tx: mpsc::Sender<(u64, Result<Bytes, SteamError>)>,
279    pub(crate) background_job_results_rx: mpsc::Receiver<(u64, Result<Bytes, SteamError>)>,
280
281    // Persona cache with TTL expiration
282    pub persona_cache: crate::cache::PersonaCache,
283
284    // Session recovery helper
285    pub session_recovery: crate::services::SessionRecovery,
286
287    // Dependency injection for testing
288    pub(crate) http_client: Arc<dyn crate::utils::http::HttpClient>,
289    pub(crate) clock: Arc<dyn crate::utils::clock::Clock>,
290    pub(crate) rng: Arc<dyn crate::utils::rng::Rng>,
291}
292
293impl SteamClient {
294    /// Create a builder for constructing a SteamClient instance.
295    ///
296    /// The builder allows injecting mock dependencies for testing.
297    /// For production use, prefer `SteamClient::new()` instead.
298    ///
299    /// # Example
300    ///
301    /// ```rust
302    /// use steam_client::SteamClient;
303    ///
304    /// // For testing with mocked dependencies
305    /// let (client, mocks) = SteamClient::builder()
306    ///     .with_mock_http()
307    ///     .with_mock_clock()
308    ///     .build_with_mocks();
309    ///
310    /// // Control the mock clock in tests
311    /// if let Some(clock) = mocks.clock {
312    ///     clock.advance(std::time::Duration::from_secs(10));
313    /// }
314    /// ```
315    pub fn builder() -> super::builder::SteamClientBuilder {
316        super::builder::SteamClientBuilder::new()
317    }
318
319    /// Create a new Steam client with the given options.
320    pub fn new(options: SteamOptions) -> Self {
321        Self::builder().with_options(options).build()
322    }
323
324    /// Create a new Steam client with all custom providers.
325    ///
326    /// This is primarily useful for testing, allowing all mock providers to be
327    /// injected.
328    pub(crate) fn with_all_providers(options: SteamOptions, http_client: Arc<dyn crate::utils::http::HttpClient>, clock: Arc<dyn crate::utils::clock::Clock>, rng: Arc<dyn crate::utils::rng::Rng>) -> Self {
329        let reconnect_config = options.reconnect.clone();
330        let (chat_queue_tx, chat_queue_rx) = mpsc::unbounded_channel();
331        let (bg_tx, bg_rx) = mpsc::channel(100);
332
333        Self {
334            options,
335            steam_id: None,
336            public_ip: None,
337            cell_id: None,
338            vanity_url: None,
339            account_info: None,
340            email_info: None,
341            limitations: None,
342            vac: None,
343            wallet: None,
344            my_friends: HashMap::new(),
345            users: HashMap::new(),
346            apps: HashMap::new(),
347            licenses: Vec::new(),
348            pending_apps: rustc_hash::FxHashSet::default(),
349            chat_last_view: HashMap::new(),
350            playing_blocked: false,
351            playing_app_ids: Vec::new(),
352            connect_time: 0,
353            connection_count: 0,
354            auth_seq_me: 0,
355            auth_seq_them: 0,
356            h_steam_pipe: (rng.gen_u32() % 1000000) + 1,
357            gc_tokens: Vec::new(),
358            active_tickets: Vec::new(),
359            last_time_party_register: None,
360            chat_queue_tx,
361            chat_queue_rx,
362            rate_limit_until: None,
363            pending_retry_message: None,
364            chat_tasks: tokio::task::JoinSet::new(),
365            connection: None,
366            login_session: None,
367            logon_details: None,
368            session_id: 0,
369            connecting: false,
370            logging_off: false,
371            temp_steam_id: None,
372            event_queue: VecDeque::new(),
373            reconnect_manager: crate::internal::reconnect::ReconnectManager::new(reconnect_config),
374            heartbeat_manager: crate::internal::heartbeat::HeartbeatManager::new(HEARTBEAT_INTERVAL_SECONDS),
375            relogging: false,
376            last_cm_server: None,
377            job_manager: crate::internal::jobs::JobManager::new(),
378            gc_jobs: crate::services::gc::GCJobManager::new(),
379            background_job_tasks: HashMap::new(),
380            background_job_results_tx: bg_tx,
381            background_job_results_rx: bg_rx,
382            persona_cache: crate::cache::PersonaCache::default(),
383            session_recovery: crate::services::SessionRecovery::new(),
384            http_client,
385            clock,
386            rng,
387        }
388    }
389
390    /// Log on to Steam.
391    pub async fn log_on(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
392        if self.steam_id.is_some() {
393            return Err(SteamError::AlreadyLoggedOn);
394        }
395
396        if self.connecting {
397            return Err(SteamError::AlreadyConnecting);
398        }
399
400        self.connecting = true;
401
402        match self.execute_logon(details).await {
403            Ok(response) => Ok(response),
404            Err(e) => {
405                match &e {
406                    SteamError::SteamResult(steam_enums::EResult::AccountLoginDeniedThrottle) => {
407                        crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
408                    }
409                    SteamError::SteamResult(steam_enums::EResult::ServiceUnavailable) => {
410                        crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "ServiceUnavailable");
411                    }
412                    SteamError::SteamResult(steam_enums::EResult::TryAnotherCM) => {
413                        crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "TryAnotherCM");
414                    }
415                    _ => {}
416                }
417                self.connecting = false;
418                Err(e)
419            }
420        }
421    }
422
423    async fn execute_logon(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
424        self.logging_off = false;
425
426        info!("Starting Steam login...");
427        crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::CMConnection).await;
428
429        // Store logon details
430        self.logon_details = Some(details.clone());
431
432        // Determine login type
433        let is_web_logon = details.web_logon_token.is_some() && details.steam_id.is_some();
434        let is_anonymous = !is_web_logon && details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none() && !is_web_logon);
435
436        // Set up temporary SteamID
437        let mut temp_sid = SteamID::new();
438        temp_sid.universe = steamid::Universe::Public;
439
440        if is_web_logon {
441            // Web logon token login (from clientjstoken)
442            info!("Logging in with web logon token");
443            if let Some(sid) = details.steam_id {
444                temp_sid = sid;
445            }
446            temp_sid.account_type = steamid::AccountType::Individual;
447        } else if is_anonymous {
448            info!("Logging in anonymously");
449            temp_sid.account_type = steamid::AccountType::AnonUser;
450        } else if let Some(ref token) = details.refresh_token {
451            info!("Logging in with refresh token");
452            // Create login session from refresh token
453            match LoginSession::from_refresh_token(token.clone()) {
454                Ok(session) => {
455                    if let Some(sid) = session.steam_id().cloned() {
456                        temp_sid = sid;
457                    }
458                    self.login_session = Some(session);
459                }
460                Err(e) => {
461                    return Err(SteamError::InvalidToken(e.to_string()));
462                }
463            }
464        } else if details.account_name.is_some() && details.password.is_some() {
465            info!("Logging in with account name and password");
466            temp_sid.account_type = steamid::AccountType::Individual;
467        }
468
469        self.temp_steam_id = Some(temp_sid);
470
471        // Get CM server list
472        let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
473        let server = provider.get_server().await?;
474        info!("Connecting to CM server: {}", server.endpoint);
475        let start_time = Instant::now();
476
477        // Connect to CM
478        let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
479        debug!("WebSocket connection established in {}ms", start_time.elapsed().as_millis());
480
481        // Generate session ID
482        self.session_id = self.rng.gen_i32().abs();
483
484        // Send ClientHello (only if not using refresh token or web_logon_token,
485        // matching node-steam-user behavior)
486        let skip_hello = details.refresh_token.is_some() || details.web_logon_token.is_some();
487        if !skip_hello {
488            let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
489            let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
490            connection.send(hello_msg.encode()).await?;
491            debug!("Sent ClientHello");
492        }
493
494        // Build and send ClientLogon
495        let logon = self.build_logon_message(&details, is_anonymous);
496        let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
497        connection.send(logon_msg.encode()).await?;
498        debug!("Sent ClientLogon");
499
500        // Wait for response
501        let response = self.wait_for_logon_response(&mut *connection).await?;
502
503        self.connection = Some(connection);
504        self.connecting = false;
505
506        // Handle response
507        let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
508
509        if eresult != EResult::OK {
510            self.steam_id = None;
511            return Err(SteamError::SteamResult(eresult));
512        }
513
514        // Success!
515        let steam_id = if let Some(sid) = response.client_supplied_steamid { SteamID::from(sid) } else { self.temp_steam_id.unwrap_or_default() };
516
517        self.steam_id = Some(steam_id);
518        self.cell_id = response.cell_id;
519        self.vanity_url = response.vanity_url.clone();
520
521        if let Some(ref ip) = response.public_ip {
522            if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
523                self.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
524            }
525        }
526
527        // Initialize heartbeat
528        if let Some(secs) = response.heartbeat_seconds {
529            if secs > 0 {
530                self.heartbeat_manager.set_interval(secs as u64);
531            }
532        }
533        self.heartbeat_manager.reset();
534
535        let username = details.account_name.as_deref().unwrap_or("Unknown");
536        info!("Logged on as steam_id: {}, name: {}, username: {}", steam_id.steam_id64(), "Unknown", username);
537
538        Ok(LogOnResponse {
539            eresult,
540            steam_id,
541            public_ip: self.public_ip.clone(),
542            cell_id: self.cell_id.unwrap_or(0),
543            vanity_url: self.vanity_url.clone(),
544            email_domain: response.email_domain.clone(),
545            steam_guard_required: false,
546            heartbeat_seconds: response.heartbeat_seconds,
547            server_time: response.rtime32_server_time,
548            account_flags: response.account_flags,
549            user_country: response.user_country.clone(),
550            ip_country_code: response.ip_country_code.clone(),
551            client_instance_id: response.client_instance_id,
552            token_id: response.token_id,
553            family_group_id: response.family_group_id,
554            eresult_extended: response.eresult_extended,
555            cell_id_ping_threshold: response.cell_id_ping_threshold,
556            force_client_update_check: response.force_client_update_check,
557            agreement_session_url: response.agreement_session_url.clone(),
558            legacy_out_of_game_heartbeat_seconds: response.legacy_out_of_game_heartbeat_seconds,
559            parental_settings: response.parental_settings.clone(),
560            parental_setting_signature: response.parental_setting_signature.clone(),
561            count_loginfailures_to_migrate: response.count_loginfailures_to_migrate,
562            count_disconnects_to_migrate: response.count_disconnects_to_migrate,
563            ogs_data_report_time_window: response.ogs_data_report_time_window,
564            steam2_ticket: response.steam2_ticket.clone(),
565        })
566    }
567
568    /// Build the ClientLogon protobuf message.
569    fn build_logon_message(&self, details: &LogOnDetails, is_anonymous: bool) -> CMsgClientLogon {
570        use crate::protocol::messages::{build_client_logon, LogonConfig};
571
572        // Generate machine name if not provided and not anonymous
573        let machine_name = if details.machine_name.is_some() {
574            details.machine_name.clone()
575        } else if !is_anonymous {
576            Some(format!("DESKTOP-{:06}", self.rng.gen_u32() % 1000000))
577        } else {
578            None
579        };
580
581        let identifier = if let Some(ref name) = details.account_name { Some(name.clone()) } else { self.temp_steam_id.as_ref().map(|sid| sid.steam_id64().to_string()) };
582
583        // Build config from current state
584        let config = LogonConfig::new().with_cell_id(self.cell_id).with_machine_name(machine_name).with_identifier(identifier);
585
586        // Delegate to pure function
587        build_client_logon(&config, details, is_anonymous)
588    }
589
590    /// Wait for and parse the logon response.
591    async fn wait_for_logon_response(&self, connection: &mut dyn SteamConnection) -> Result<CMsgClientLogonResponse, SteamError> {
592        use std::io::Read;
593
594        use flate2::read::GzDecoder;
595
596        // Wait for messages until we get a logon response
597        // Increased timeout to 15 seconds (150 * 100ms) to handle slow connections
598        for _ in 0..LOGON_TIMEOUT_ITERATIONS {
599            match connection.recv().await? {
600                Some(data) => {
601                    let msg = SteamMessage::decode_from_bytes(&data)?;
602
603                    debug!("Received message: {:?}", msg.msg);
604
605                    if msg.msg == EMsg::ClientLogOnResponse {
606                        return msg.decode_body::<CMsgClientLogonResponse>();
607                    }
608
609                    // Handle Multi message
610                    if msg.msg == EMsg::Multi {
611                        if let Ok(multi) = msg.decode_body::<steam_protos::CMsgMulti>() {
612                            let body = multi.message_body.unwrap_or_default();
613                            let payload = if multi.size_unzipped.unwrap_or(0) > 0 {
614                                // Gzip compressed
615                                // Offload decompression to a blocking task to avoid stalling the async runtime
616                                let decompressed_result = tokio::task::spawn_blocking(move || {
617                                    let mut decoder = GzDecoder::new(&body[..]);
618                                    let mut decompressed = Vec::new();
619                                    if decoder.read_to_end(&mut decompressed).is_ok() {
620                                        Some(decompressed)
621                                    } else {
622                                        None
623                                    }
624                                })
625                                .await
626                                .map_err(|e| SteamError::Other(format!("Task join error: {}", e)))?;
627
628                                if let Some(decompressed) = decompressed_result {
629                                    decompressed
630                                } else {
631                                    tracing::error!("Failed to decompress Multi message");
632                                    continue;
633                                }
634                            } else {
635                                body
636                            };
637
638                            // Iterate over sub-messages
639                            let mut offset = 0;
640                            while offset + 4 <= payload.len() {
641                                let sub_size = u32::from_le_bytes([payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3]]) as usize;
642                                offset += 4;
643
644                                if offset + sub_size > payload.len() {
645                                    break;
646                                }
647
648                                let sub_data = &payload[offset..offset + sub_size];
649                                if let Ok(sub_msg) = SteamMessage::decode_from_bytes(sub_data) {
650                                    debug!("Received sub-message inside Multi: {:?}", sub_msg.msg);
651                                    if sub_msg.msg == EMsg::ClientLogOnResponse {
652                                        return sub_msg.decode_body::<CMsgClientLogonResponse>();
653                                    }
654                                }
655                                offset += sub_size;
656                            }
657                        }
658                    }
659                }
660                None => {
661                    return Err(SteamError::ConnectionError("Connection closed by server during login".into()));
662                }
663            }
664
665            self.clock.sleep(Duration::from_millis(LOGON_TIMEOUT_DELAY_MS)).await;
666        }
667
668        Err(SteamError::Timeout)
669    }
670
671    /// Create a protobuf message with header.
672    fn create_proto_message<T: Message>(&self, msg: EMsg, body: &T) -> SteamMessage {
673        use crate::protocol::messages::{build_proto_header, create_steam_message};
674
675        let steam_id = self.temp_steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0);
676        let header = build_proto_header(self.session_id, steam_id, u64::MAX, u64::MAX);
677
678        create_steam_message(msg, header, body)
679    }
680
681    /// Log off from Steam.
682    pub async fn log_off(&mut self) -> Result<(), SteamError> {
683        if self.steam_id.is_none() {
684            return Err(SteamError::NotLoggedOn);
685        }
686
687        info!("Logging off from Steam");
688        self.logging_off = true;
689
690        // Create logoff message first (before borrowing connection)
691        let logoff = CMsgClientLogOff::default();
692        let msg = self.create_proto_message(EMsg::ClientLogOff, &logoff);
693
694        // Send logoff message
695        if let Some(ref mut conn) = self.connection {
696            let _ = conn.send(msg.encode()).await;
697        }
698
699        // Close connection
700        if let Some(conn) = self.connection.take() {
701            conn.close().await?;
702        }
703
704        // Clear state
705        self.steam_id = None;
706        self.public_ip = None;
707        self.cell_id = None;
708        self.account_info = None;
709        self.email_info = None;
710        self.limitations = None;
711        self.vac = None;
712        self.wallet = None;
713        self.my_friends.clear();
714        self.logon_details = None;
715        self.session_id = 0;
716
717        self.logging_off = false;
718
719        Ok(())
720    }
721
722    /// Log on with username and password using steam-session.
723    ///
724    /// This method handles the full password authentication flow including:
725    /// - RSA password encryption
726    /// - Starting an auth session with Steam
727    /// - Obtaining a refresh token
728    ///
729    /// If Steam Guard is required, returns `SteamError::SteamGuardRequired`
730    /// with details about what code is needed. Use
731    /// [`submit_steam_guard_code`] to provide the code and complete
732    /// authentication.
733    ///
734    /// On success, emits a `RefreshToken` event with the token that can be
735    /// saved for future logins.
736    ///
737    /// # Example
738    /// ```rust,ignore
739    /// let result = client.log_on_with_password("username", "password", None, None).await;
740    /// match result {
741    ///     Ok(response) => tracing::info!("Logged in as {}", response.steam_id.steam3()),
742    ///     Err(SteamError::SteamGuardRequired { guard_type, email_domain }) => {
743    ///         tracing::info!("Need Steam Guard code: {:?}", guard_type);
744    ///         // Get code from user, then call submit_steam_guard_code
745    ///     }
746    ///     Err(e) => tracing::info!("Login failed: {:?}", e),
747    /// }
748    /// ```
749    pub async fn log_on_with_password(&mut self, account_name: &str, password: &str, steam_guard_code: Option<&str>, machine_auth_token: Option<&str>) -> Result<LogOnResponse, SteamError> {
750        if self.steam_id.is_some() {
751            return Err(SteamError::AlreadyLoggedOn);
752        }
753
754        if self.connecting {
755            return Err(SteamError::AlreadyConnecting);
756        }
757
758        info!("Starting password authentication for {}", account_name);
759        crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
760
761        // Create login session for Steam Client platform
762        let mut session = LoginSession::new(EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient, Some(LoginSessionOptions { machine_friendly_name: Some(format!("DESKTOP-{}", self.rng.gen_u32() % 1000000)), ..Default::default() }));
763
764        // Start authentication with credentials
765        let credentials = CredentialsDetails {
766            account_name: account_name.to_string(),
767            password: password.to_string(),
768            persistence: None,
769            steam_guard_machine_token: machine_auth_token.map(|s| s.to_string()),
770            steam_guard_code: steam_guard_code.map(|s| s.to_string()),
771        };
772
773        let start_result = match session.start_with_credentials(credentials).await {
774            Ok(res) => res,
775            Err(e) => {
776                if let steam_auth::SessionError::SteamError(_, steam_enums::EResult::AccountLoginDeniedThrottle) = e {
777                    crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
778                }
779                return Err(SteamError::SessionError(e));
780            }
781        };
782
783        // Check if action (Steam Guard) is required
784        if start_result.action_required {
785            if let Some(ref actions) = start_result.valid_actions {
786                // Find the best action to report to user
787                // Priority: EmailCode > DeviceCode > DeviceConfirmation
788                let action = actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeEmailCode)).or_else(|| actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeDeviceCode))).or_else(|| actions.first());
789
790                if let Some(action) = action {
791                    debug!("Steam Guard required: {:?}", action.guard_type);
792
793                    // Store session for later submission
794                    self.login_session = Some(session);
795
796                    return Err(SteamError::SteamGuardRequired { guard_type: action.guard_type, email_domain: action.detail.clone() });
797                }
798            }
799
800            // Shouldn't happen, but handle gracefully
801            warn!("Action required but no valid actions returned");
802            return Err(SteamError::InvalidCredentials);
803        }
804
805        // No Steam Guard needed, continue polling for result
806        self.complete_password_auth(session, account_name).await
807    }
808
809    /// Submit a Steam Guard code to complete password authentication.
810    ///
811    /// Call this after [`log_on_with_password`] returns
812    /// `SteamError::SteamGuardRequired`.
813    ///
814    /// # Example
815    /// ```rust,ignore
816    /// // After receiving SteamGuardRequired error:
817    /// let code = get_code_from_user();
818    /// let response = client.submit_steam_guard_code(&code).await?;
819    /// tracing::info!("Logged in as {}", response.steam_id.steam3());
820    /// ```
821    pub async fn submit_steam_guard_code(&mut self, code: &str) -> Result<LogOnResponse, SteamError> {
822        let session = self.login_session.take().ok_or_else(|| SteamError::Other("No pending login session".to_string()))?;
823
824        let account_name = session.account_name().map(|s| s.to_string()).unwrap_or_default();
825
826        let mut session = session;
827        session.submit_steam_guard_code(code).await?;
828
829        self.complete_password_auth(session, &account_name).await
830    }
831
832    /// Complete password authentication by polling for tokens.
833    async fn complete_password_auth(&mut self, mut session: LoginSession, account_name: &str) -> Result<LogOnResponse, SteamError> {
834        // Poll for authentication result
835        let poll_result = loop {
836            match session.poll().await? {
837                Some(result) => break result,
838                None => {
839                    // Wait before polling again
840                    let interval = session.poll_interval();
841                    self.clock.sleep(Duration::from_secs_f32(interval)).await;
842                }
843            }
844        };
845
846        info!("Password authentication successful, got refresh token");
847        debug!("Refresh token account: {}", poll_result.account_name);
848
849        // Store machine token if we got one
850        if let Some(ref _guard_data) = poll_result.new_guard_data {
851            debug!("Received new Steam Guard machine token");
852            // Store for future use (could be saved to disk by the user)
853            self.login_session = Some(session);
854        }
855
856        // Queue RefreshToken event
857        self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::RefreshToken { token: poll_result.refresh_token.clone(), account_name: poll_result.account_name.clone() }));
858
859        // Now log on with the refresh token
860        self.log_on(LogOnDetails {
861            refresh_token: Some(poll_result.refresh_token),
862            account_name: Some(account_name.to_string()),
863            ..Default::default()
864        })
865        .await
866    }
867
868    /// Check if logged in.
869    pub fn is_logged_in(&self) -> bool {
870        self.steam_id.is_some()
871    }
872
873    /// Force reset the connecting state.
874    /// Use this if a connection attempt is cancelled externally (e.g. by a
875    /// timeout).
876    pub fn reset_connecting_state(&mut self) {
877        self.connecting = false;
878    }
879
880    /// Get web session cookies for Steam web APIs.
881    ///
882    /// This method fetches cookies that can be used to authenticate with Steam
883    /// web services like steamcommunity.com and store.steampowered.com. It
884    /// requires a valid login session with a refresh token (not available
885    /// for web_logon_token or anonymous logins).
886    ///
887    /// On success, this also queues a `WebSession` event with the session data.
888    ///
889    /// # Returns
890    /// - `Ok((session_id, cookies))` - The session ID and cookies for web
891    ///   authentication.
892    /// - `Err(SteamError::NotLoggedOn)` - If not logged in.
893    /// - `Err(SteamError::Other)` - If no login session is available (e.g.,
894    ///   web_logon_token login).
895    ///
896    /// # Example
897    /// ```rust,ignore
898    /// let (session_id, cookies) = client.get_web_session().await?;
899    /// tracing::info!("Session ID: {}", session_id);
900    /// for cookie in &cookies {
901    ///     tracing::info!("Cookie: {}", cookie);
902    /// }
903    /// ```
904    pub async fn get_web_session(&mut self) -> Result<(String, Vec<String>), SteamError> {
905        if self.steam_id.is_none() {
906            return Err(SteamError::NotLoggedOn);
907        }
908
909        let session = self.login_session.as_mut().ok_or_else(|| SteamError::Other("No login session available. Web session requires refresh token login.".to_string()))?;
910
911        let cookies = session.get_web_cookies().await.map_err(|e| SteamError::Other(format!("Failed to get web cookies: {}", e)))?;
912
913        // Extract session ID from cookies
914        let session_id = cookies.iter().find(|c| c.starts_with("sessionid=")).and_then(|c| c.strip_prefix("sessionid=")).and_then(|c| c.split(';').next()).map(|s| s.to_string()).unwrap_or_else(|| {
915            // Generate a session ID if not in cookies
916            format!("{:x}", self.rng.gen_u64())
917        });
918
919        // Queue WebSession event
920        self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::WebSession { session_id: session_id.clone(), cookies: cookies.clone() }));
921
922        Ok((session_id, cookies))
923    }
924
925    /// Helper to send a binary message.
926    pub(crate) async fn send_binary_message(&mut self, msg_type: EMsg, body: &[u8]) -> Result<(), SteamError> {
927        use crate::protocol::{ExtendedMessageHeader, MessageHeader, SteamMessage};
928
929        let header = ExtendedMessageHeader {
930            header_size: 36,
931            header_version: 2,
932            target_job_id: u64::MAX,
933            source_job_id: u64::MAX,
934            header_canary: 239,
935            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
936            session_id: self.session_id,
937        };
938
939        let msg = SteamMessage {
940            msg: msg_type,
941            is_proto: false,
942            header: MessageHeader::Extended(header),
943            body: bytes::Bytes::copy_from_slice(body),
944        };
945
946        if let Some(ref mut conn) = self.connection {
947            conn.send(msg.encode()).await?;
948        }
949
950        Ok(())
951    }
952
953    /// Helper to send a protobuf message.
954    pub(crate) async fn send_message<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<(), SteamError> {
955        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
956
957        let header = ProtobufMessageHeader {
958            header_length: 0,
959            session_id: self.session_id,
960            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
961            job_id_source: u64::MAX,
962            job_id_target: u64::MAX,
963            target_job_name: None,
964            routing_appid: None,
965        };
966
967        let msg = SteamMessage::new_proto(msg_type, header, body);
968
969        if let Some(ref mut conn) = self.connection {
970            conn.send(msg.encode()).await?;
971        }
972
973        Ok(())
974    }
975
976    /// Helper to send a protobuf message with routing app ID.
977    pub(crate) async fn send_message_with_routing<T: Message>(&mut self, msg_type: EMsg, routing_appid: u32, body: &T) -> Result<(), SteamError> {
978        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
979
980        let header = ProtobufMessageHeader {
981            header_length: 0,
982            session_id: self.session_id,
983            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
984            job_id_source: u64::MAX,
985            job_id_target: u64::MAX,
986            target_job_name: None,
987            routing_appid: Some(routing_appid),
988        };
989
990        let msg = SteamMessage::new_proto(msg_type, header, body);
991
992        if let Some(ref mut conn) = self.connection {
993            conn.send(msg.encode()).await?;
994        }
995
996        Ok(())
997    }
998
999    /// Helper to send a protobuf message with job tracking for request-response
1000    /// correlation.
1001    ///
1002    /// Returns a receiver that will receive the response body when Steam
1003    /// replies. The job ID is automatically set in the message header.
1004    pub(crate) async fn send_message_with_job<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<tokio::sync::oneshot::Receiver<crate::internal::jobs::JobResponse>, SteamError> {
1005        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1006
1007        // Create a job to track this request
1008        let (job_id, response_rx) = self.job_manager.create_job().await;
1009        info!("[SteamClient] send_message_with_job: Created JobID={} for EMsg::{:?}", job_id, msg_type);
1010
1011        let header = ProtobufMessageHeader {
1012            header_length: 0,
1013            session_id: self.session_id,
1014            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1015            job_id_source: job_id,
1016            job_id_target: u64::MAX,
1017            target_job_name: None,
1018            routing_appid: None,
1019        };
1020
1021        let msg = SteamMessage::new_proto(msg_type, header, body);
1022
1023        if let Some(ref mut conn) = self.connection {
1024            conn.send(msg.encode()).await?;
1025        }
1026
1027        Ok(response_rx)
1028    }
1029
1030    /// Process a queued message.
1031    async fn process_queued_message(&mut self, msg: QueuedMessage) {
1032        info!("[SteamClient] process_queued_message: Processing message to {}", msg.friend);
1033
1034        // Match JS behavior: escape [ chars if contains_bbcode
1035        let processed_message = if msg.contains_bbcode { msg.message.replace('[', "\\[") } else { msg.message.clone() };
1036
1037        debug!("[SteamClient] process_queued_message: Original len={}, Processed len={}", msg.message.len(), processed_message.len());
1038
1039        let request = steam_protos::CFriendMessagesSendMessageRequest {
1040            steamid: Some(msg.friend.steam_id64()),
1041            chat_entry_type: Some(msg.entry_type as i32),
1042            message: Some(processed_message),
1043            contains_bbcode: Some(msg.contains_bbcode),
1044            ..Default::default()
1045        };
1046
1047        info!("[SteamClient] process_queued_message: Sending FriendMessages.SendMessage#1 to Steam");
1048
1049        // Send the request and get a job receiver
1050        match self.send_service_method_with_job("FriendMessages.SendMessage#1", &request).await {
1051            Ok(rx) => {
1052                info!("[SteamClient] process_queued_message: Request sent, waiting for response...");
1053                let friend_id = msg.friend;
1054                let original_message = msg.message.clone();
1055
1056                // Spawn task into JoinSet with timeout and tracing span
1057                self.chat_tasks.spawn(
1058                    async move {
1059                        const CHAT_TIMEOUT_SECS: u64 = 30;
1060
1061                        debug!("[SteamClient] chat_task: Waiting for response (timeout: {}s)", CHAT_TIMEOUT_SECS);
1062
1063                        let res = match tokio::time::timeout(std::time::Duration::from_secs(CHAT_TIMEOUT_SECS), rx).await {
1064                            Ok(Ok(crate::internal::jobs::JobResponse::Success(body))) => {
1065                                info!("[SteamClient] chat_task: Received Success response, body len={}", body.len());
1066                                steam_protos::CFriendMessagesSendMessageResponse::decode(&body[..])
1067                                    .map(|response| {
1068                                        info!("[SteamClient] chat_task: Decoded response - server_ts={}, ordinal={}", response.server_timestamp.unwrap_or(0), response.ordinal.unwrap_or(0));
1069                                        crate::services::chat::SendMessageResult {
1070                                            modified_message: response.modified_message.unwrap_or(original_message),
1071                                            server_timestamp: response.server_timestamp.unwrap_or(0),
1072                                            ordinal: response.ordinal.unwrap_or(0),
1073                                        }
1074                                    })
1075                                    .map_err(|e| {
1076                                        error!("[SteamClient] chat_task: Failed to decode response: {:?}", e);
1077                                        SteamError::DeserializationFailed
1078                                    })
1079                            }
1080                            Ok(Ok(crate::internal::jobs::JobResponse::Timeout)) => {
1081                                error!("[SteamClient] chat_task: Job response timeout (Steam didn't respond)");
1082                                Err(SteamError::ResponseTimeout)
1083                            }
1084                            Ok(Ok(crate::internal::jobs::JobResponse::Error(e))) => {
1085                                error!("[SteamClient] chat_task: Job response error: {}", e);
1086                                Err(SteamError::ProtocolError(e))
1087                            }
1088                            Ok(Err(e)) => {
1089                                error!("[SteamClient] chat_task: Job channel closed: {:?}", e);
1090                                Err(SteamError::Other("Job response channel closed".into()))
1091                            }
1092                            Err(_) => {
1093                                // Timeout elapsed
1094                                error!("[SteamClient] chat_task: Tokio timeout elapsed after {}s", CHAT_TIMEOUT_SECS);
1095                                Err(SteamError::ResponseTimeout)
1096                            }
1097                        };
1098
1099                        match &res {
1100                            Ok(r) => info!("[SteamClient] chat_task: Final result SUCCESS - ts={}", r.server_timestamp),
1101                            Err(e) => error!("[SteamClient] chat_task: Final result ERROR - {:?}", e),
1102                        }
1103
1104                        (msg, res)
1105                    }
1106                    .instrument(info_span!("chat_send", friend = %friend_id)),
1107                );
1108            }
1109            Err(e) => {
1110                // Failed to even send the request (maybe disconnected)
1111                error!("[SteamClient] process_queued_message: Failed to send request to Steam: {:?}", e);
1112                let _ = msg.respond_to.send(Err(e));
1113            }
1114        }
1115    }
1116
1117    /// Send a unified service method request and wait for the response.
1118    ///
1119    /// This handles the common pattern of sending a request with job tracking,
1120    /// waiting for the response, decoding it, and mapping errors.
1121    pub async fn send_unified_request_and_wait<T, R>(&mut self, method: &str, request: &T) -> Result<R, SteamError>
1122    where
1123        T: prost::Message,
1124        R: prost::Message + Default,
1125    {
1126        use crate::internal::jobs::JobResponse;
1127
1128        // Send with job tracking
1129        let response_rx = self.send_service_method_with_job(method, request).await?;
1130
1131        // Wait for response
1132        match response_rx.await {
1133            Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1134            Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1135            Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1136            Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1137        }
1138    }
1139
1140    /// Send a protobuf request and wait for the response.
1141    ///
1142    /// This handles the common pattern of sending a request with job tracking,
1143    /// waiting for the response, decoding it, and mapping errors.
1144    pub async fn send_request_and_wait<T, R>(&mut self, emsg: EMsg, request: &T) -> Result<R, SteamError>
1145    where
1146        T: prost::Message,
1147        R: prost::Message + Default,
1148    {
1149        use crate::internal::jobs::JobResponse;
1150
1151        // Send with job tracking
1152        let response_rx = self.send_message_with_job(emsg, request).await?;
1153
1154        // Wait for response
1155        match response_rx.await {
1156            Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1157            Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1158            Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1159            Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1160        }
1161    }
1162
1163    /// Poll for incoming events.
1164    ///
1165    /// This method receives messages from the Steam connection and returns
1166    /// decoded events. Call this in a loop to process incoming messages.
1167    /// It also handles automatic reconnection when disconnected.
1168    ///
1169    /// # Example
1170    /// ```rust,ignore
1171    /// loop {
1172    ///     if let Some(event) = client.poll_event().await? {
1173    ///         match event {
1174    ///             SteamEvent::FriendMessage { sender, message, .. } => {
1175    ///                 tracing::info!("{} says: {}", sender, message);
1176    ///             }
1177    ///             SteamEvent::PersonaState(persona) => {
1178    ///                 tracing::info!("{} is now {:?}", persona.player_name, persona.persona_state);
1179    ///             }
1180    ///             SteamEvent::ReconnectAttempt { attempt, max_attempts, .. } => {
1181    ///                 tracing::info!("Reconnecting... attempt {}/{}", attempt, max_attempts);
1182    ///             }
1183    ///             SteamEvent::Disconnected { will_reconnect, .. } => {
1184    ///                 if !will_reconnect {
1185    ///                     break; // Exit loop on permanent disconnect
1186    ///                 }
1187    ///             }
1188    ///             _ => {}
1189    ///         }
1190    ///     }
1191    /// }
1192    /// ```
1193    pub async fn poll_event(&mut self) -> Result<Option<super::events::SteamEvent>, SteamError> {
1194        use super::events::{ConnectionEvent, SteamEvent};
1195
1196        // First check if we have queued events from a previous Multi message
1197        if let Some(mut event) = self.event_queue.pop_front() {
1198            self.handle_event(&mut event);
1199            self.post_process_event(&event).await;
1200            return Ok(Some(event));
1201        }
1202
1203        // Cleanup expired jobs
1204        self.job_manager.cleanup_expired().await;
1205
1206        // Check if we should attempt reconnection
1207        if self.reconnect_manager.is_reconnecting() {
1208            if let Some(attempt) = self.reconnect_manager.check_ready() {
1209                let delay = self.reconnect_manager.current_delay();
1210                let max_attempts = self.reconnect_manager.max_attempts();
1211
1212                // Queue the reconnect attempt event
1213                self.event_queue.push_back(SteamEvent::Connection(ConnectionEvent::ReconnectAttempt { attempt, max_attempts, delay }));
1214
1215                // Attempt reconnection
1216                match self.attempt_reconnect().await {
1217                    Ok(()) => {
1218                        // Successful reconnection - reset manager and continue
1219                        self.reconnect_manager.record_success();
1220                        info!("Reconnection successful");
1221
1222                        // Restore session state
1223                        if let Err(e) = self.restore_session_state().await {
1224                            warn!("Failed to restore session state: {}", e);
1225                        }
1226                    }
1227                    Err(e) => {
1228                        // Failed reconnection attempt
1229                        warn!("Reconnection attempt {} failed: {:?}", attempt, e);
1230
1231                        let server = self.last_cm_server.take();
1232                        let should_continue = self.reconnect_manager.record_failure(server.as_ref());
1233
1234                        if !should_continue {
1235                            // Max attempts reached
1236                            let reason = self.reconnect_manager.last_disconnect_reason();
1237                            let attempts = self.reconnect_manager.attempt();
1238
1239                            return Ok(Some(SteamEvent::Connection(ConnectionEvent::ReconnectFailed { reason, attempts })));
1240                        }
1241                    }
1242                }
1243            } else if let Some(wait_time) = self.reconnect_manager.time_until_next_attempt() {
1244                // Still waiting for backoff - sleep briefly and return no event
1245                // We don't want to block the full backoff period, so use a short sleep
1246                let sleep_time = wait_time.min(Duration::from_millis(100));
1247                self.clock.sleep(sleep_time).await;
1248            }
1249        }
1250
1251        // Process queued events first
1252        if let Some(mut event) = self.event_queue.pop_front() {
1253            self.handle_event(&mut event);
1254            self.post_process_event(&event).await;
1255            return Ok(Some(event));
1256        }
1257
1258        loop {
1259            // Check retry message first (priority over network?)
1260            // We check it at start of loop.
1261            if let Some(msg) = self.pending_retry_message.take() {
1262                let rate_limited = self.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1263
1264                if !rate_limited {
1265                    self.process_queued_message(msg).await;
1266                    // Loop again to process effects or next
1267                } else {
1268                    // Put it back
1269                    self.pending_retry_message = Some(msg);
1270                }
1271            }
1272
1273            // Determine if we can process queue
1274            let rate_limited = self.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1275            let can_process_queue = !rate_limited && self.pending_retry_message.is_none();
1276
1277            // Check connection
1278            if self.connection.is_none() {
1279                return Ok(None);
1280            }
1281
1282            // Calculate timeout for recv (heartbeat)
1283            let now = self.clock.now();
1284            let hb_timeout = self.heartbeat_manager.time_until_next_heartbeat(now);
1285
1286            // Check if immediate heartbeat needed
1287            if let Some(timeout) = hb_timeout {
1288                if timeout == Duration::ZERO {
1289                    debug!("Sending heartbeat");
1290                    let msg = crate::internal::heartbeat::HeartbeatManager::build_heartbeat_message(self.session_id, self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0));
1291
1292                    // We wrap send in block to limit borrow scope
1293                    let result = {
1294                        if let Some(conn) = self.connection.as_mut() {
1295                            conn.send(msg.encode()).await
1296                        } else {
1297                            Err(SteamError::NotConnected)
1298                        }
1299                    };
1300
1301                    if let Err(e) = result {
1302                        warn!("Failed to send heartbeat: {}", e);
1303                        // Connection likely dead
1304                        let reason = match &e {
1305                            SteamError::SteamResult(r) => Some(*r),
1306                            _ => Some(EResult::NoConnection),
1307                        };
1308                        return self.handle_connection_close(reason);
1309                    }
1310
1311                    self.heartbeat_manager.record_heartbeat(self.clock.now());
1312                    continue;
1313                }
1314            }
1315
1316            enum PollResult {
1317                Packet(Option<bytes::Bytes>),
1318                Queue(QueuedMessage),
1319                ChatResponse(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>),
1320                BackgroundResult(u64, Result<Bytes, SteamError>),
1321                Timeout,
1322            }
1323
1324            let poll_res: Result<PollResult, SteamError> = {
1325                let conn = self.connection.as_mut().unwrap();
1326                let queue = &mut self.chat_queue_rx;
1327
1328                let recv_fut = conn.recv();
1329
1330                // Determine timeout duration
1331                // We want to wake up for:
1332                // 1. Next heartbeat
1333                // 2. Next rate limit expiration (if we have a pending retry or want to process
1334                //    queue)
1335                let rate_limit_timeout = self.rate_limit_until.map(|t| {
1336                    let now = self.clock.now();
1337                    if t > now {
1338                        t - now
1339                    } else {
1340                        Duration::ZERO
1341                    }
1342                });
1343
1344                let sleep_duration = match (hb_timeout, rate_limit_timeout) {
1345                    (Some(hb), Some(rl)) => Some(hb.min(rl)),
1346                    (Some(hb), None) => Some(hb),
1347                    (None, Some(rl)) => Some(rl),
1348                    (None, None) => None,
1349                };
1350
1351                tokio::select! {
1352                    res = recv_fut => {
1353                        Ok(PollResult::Packet(res?))
1354                    }
1355                    Some(msg) = queue.recv(), if can_process_queue => {
1356                        Ok(PollResult::Queue(msg))
1357                    }
1358                    Some(join_result) = self.chat_tasks.join_next() => {
1359                        match join_result {
1360                            Ok((msg, res)) => Ok(PollResult::ChatResponse(msg, res)),
1361                            Err(e) => {
1362                                // Task panicked or was cancelled
1363                                warn!("Chat task failed: {:?}", e);
1364                                Ok(PollResult::Timeout) // Treat as a non-event
1365                            }
1366                        }
1367                    }
1368                    res = self.background_job_results_rx.recv() => {
1369                        match res {
1370                            Some((job_id, result)) => Ok(PollResult::BackgroundResult(job_id, result)),
1371                            None => Ok(PollResult::Timeout),
1372                        }
1373                    }
1374                    _ = async {
1375                         if let Some(d) = sleep_duration {
1376                             tokio::time::sleep(d).await;
1377                         } else {
1378                             std::future::pending::<()>().await;
1379                         }
1380                    } => {
1381                        Ok(PollResult::Timeout)
1382                    }
1383                }
1384            };
1385
1386            match poll_res {
1387                Ok(PollResult::Packet(Some(data))) => {
1388                    // Decode the message(s)
1389                    let messages = super::events::MessageHandler::decode_packet(&data);
1390
1391                    for decoded in messages {
1392                        // Complete any pending job if this is a response
1393                        if let Some(job_id) = decoded.job_id_target {
1394                            info!("[SteamClient] poll_event: Completing job {} via job_manager", job_id);
1395                            self.job_manager.complete_job_success(job_id, decoded.body).await;
1396                        }
1397
1398                        // Check for GC events and complete pending GC jobs
1399                        for event in &decoded.events {
1400                            if let super::events::SteamEvent::Apps(super::events::AppsEvent::GCReceived(gc_msg)) = event {
1401                                self.gc_jobs.try_complete(gc_msg.appid, gc_msg.msg_type, gc_msg.payload.clone());
1402                            }
1403                        }
1404
1405                        // Queue all events
1406                        self.event_queue.extend(decoded.events);
1407                    }
1408
1409                    // Return the first event if available
1410                    if let Some(mut event) = self.event_queue.pop_front() {
1411                        self.handle_event(&mut event);
1412                        self.post_process_event(&event).await;
1413                        return Ok(Some(event));
1414                    }
1415                }
1416                Ok(PollResult::Packet(None)) => {
1417                    // Connection closed
1418                    return self.handle_connection_close(None);
1419                }
1420                Ok(PollResult::Queue(msg)) => {
1421                    info!("[SteamClient] poll_event: Dequeued message from chat_queue_rx for {}", msg.friend);
1422                    self.process_queued_message(msg).await;
1423                    // Loop to process next
1424                }
1425                Ok(PollResult::ChatResponse(msg, result)) => {
1426                    info!("[SteamClient] poll_event: Received ChatResponse for {}", msg.friend);
1427                    match result {
1428                        Ok(res) => {
1429                            info!("[SteamClient] poll_event: ChatResponse SUCCESS - server_ts={}", res.server_timestamp);
1430                            let _ = msg.respond_to.send(Ok(res));
1431                        }
1432                        Err(SteamError::SteamResult(steam_enums::EResult::RateLimitExceeded)) => {
1433                            warn!("[SteamClient] poll_event: Rate limit exceeded for chat message, backing off for 60 seconds");
1434                            crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "RateLimitExceeded");
1435                            self.rate_limit_until = Some(self.clock.now() + std::time::Duration::from_secs(60));
1436                            self.pending_retry_message = Some(msg);
1437                        }
1438                        Err(e) => {
1439                            error!("[SteamClient] poll_event: ChatResponse ERROR - {:?}", e);
1440                            let _ = msg.respond_to.send(Err(e));
1441                        }
1442                    }
1443                }
1444                Ok(PollResult::BackgroundResult(job_id, result)) => {
1445                    debug!("[SteamClient] poll_event: Received BackgroundResult for job {}", job_id);
1446                    if let Some(task) = self.background_job_tasks.remove(&job_id) {
1447                        match task {
1448                            BackgroundTask::FriendsList => match result {
1449                                Ok(body) => {
1450                                    debug!("[SteamClient] poll_event: Decoding FriendsList response (len={})", body.len());
1451                                    match steam_protos::CFriendsListGetFriendsListResponse::decode(&body[..]) {
1452                                        Ok(response) => {
1453                                            debug!("[SteamClient] poll_event: Successfully decoded FriendsList, processing...");
1454                                            self.handle_friends_list_unified_response(response).await;
1455                                        }
1456                                        Err(e) => {
1457                                            error!("[SteamClient] poll_event: Failed to decode FriendsList response: {:?}", e);
1458                                        }
1459                                    }
1460                                }
1461                                Err(e) => {
1462                                    error!("[SteamClient] poll_event: Background FriendsList job failed: {:?}", e);
1463                                }
1464                            },
1465                            BackgroundTask::OfflineMessages(friend_id) => match result {
1466                                Ok(body) => {
1467                                    debug!("[SteamClient] poll_event: Decoding OfflineMessages response for {} (len={})", friend_id, body.len());
1468                                    match steam_protos::CFriendMessagesGetRecentMessagesResponse::decode(&body[..]) {
1469                                        Ok(response) => {
1470                                            debug!("[SteamClient] poll_event: Successfully decoded OfflineMessages, processing...");
1471                                            let last_view_timestamp = self.chat_last_view.get(&friend_id).copied().unwrap_or(0);
1472
1473                                            let messages = response
1474                                                .messages
1475                                                .into_iter()
1476                                                .map(|m| crate::services::chat::HistoryMessage {
1477                                                    sender: SteamID::from_steam_id64(m.accountid.unwrap_or(0) as u64),
1478                                                    timestamp: m.timestamp.unwrap_or(0),
1479                                                    ordinal: m.ordinal.unwrap_or(0),
1480                                                    message: m.message.unwrap_or_default(),
1481                                                    unread: m.timestamp.unwrap_or(0) > last_view_timestamp,
1482                                                })
1483                                                .collect();
1484
1485                                            self.event_queue.push_back(crate::client::events::SteamEvent::Chat(crate::client::events::ChatEvent::OfflineMessagesFetched { friend_id, messages }));
1486                                        }
1487                                        Err(e) => {
1488                                            error!("[SteamClient] poll_event: Failed to decode OfflineMessages response: {:?}", e);
1489                                        }
1490                                    }
1491                                }
1492                                Err(e) => {
1493                                    error!("[SteamClient] poll_event: Background OfflineMessages job failed for {}: {:?}", friend_id, e);
1494                                }
1495                            },
1496                        }
1497
1498                        // Check if the background task added events to the queue (e.g., FriendsList
1499                        // event)
1500                        if let Some(mut event) = self.event_queue.pop_front() {
1501                            self.handle_event(&mut event);
1502                            self.post_process_event(&event).await;
1503                            return Ok(Some(event));
1504                        }
1505                    } else {
1506                        warn!("[SteamClient] poll_event: Received BackgroundResult for unknown job {}", job_id);
1507                    }
1508                }
1509                Ok(PollResult::Timeout) => {
1510                    // Heartbeat deadline reached, loop will handle it
1511                }
1512                Err(e) => {
1513                    // Connection error
1514                    let reason = match &e {
1515                        SteamError::SteamResult(r) => Some(*r),
1516                        _ => Some(EResult::NoConnection),
1517                    };
1518                    return self.handle_connection_close(reason);
1519                }
1520            }
1521        }
1522    }
1523
1524    /// Handle a connection close and decide whether to reconnect.
1525    fn handle_connection_close(&mut self, reason: Option<EResult>) -> Result<Option<super::events::SteamEvent>, SteamError> {
1526        use super::events::{ConnectionEvent, SteamEvent};
1527
1528        debug!("Handling connection close, reason: {:?}", reason);
1529
1530        // Clean up connection state
1531        self.connection = None;
1532        self.connecting = false;
1533
1534        // Determine if we should reconnect
1535        let should_reconnect = !self.logging_off && self.options.auto_relogin && reason.map(|r| self.reconnect_manager.should_reconnect(r)).unwrap_or(true);
1536
1537        if should_reconnect && self.logon_details.is_some() {
1538            // Start reconnection sequence
1539            let eresult = reason.unwrap_or(EResult::NoConnection);
1540            self.reconnect_manager.start_reconnection(eresult);
1541
1542            // Blacklist the last server if available
1543            if let Some(ref server) = self.last_cm_server {
1544                self.reconnect_manager.blacklist_server(&server.endpoint);
1545            }
1546
1547            info!("Connection lost, will attempt reconnection");
1548        } else {
1549            // Not reconnecting - clear state
1550            self.steam_id = None;
1551            self.reconnect_manager.reset();
1552            self.relogging = false;
1553            self.logging_off = false;
1554        }
1555
1556        Ok(Some(SteamEvent::Connection(ConnectionEvent::Disconnected { reason, will_reconnect: should_reconnect })))
1557    }
1558
1559    /// Restore session state (games played, rich presence, etc.) after
1560    /// reconnection.
1561    async fn restore_session_state(&mut self) -> Result<(), SteamError> {
1562        info!("Restoring session state...");
1563
1564        // Restore games played
1565        let app_ids = self.session_recovery.last_playing_app_ids.clone();
1566        if !app_ids.is_empty() {
1567            debug!("Restoring games played: {:?}", app_ids);
1568            self.games_played_with_extra(app_ids, self.session_recovery.last_custom_game_name.clone()).await?;
1569        }
1570
1571        // Restore persona state
1572        if let Some(state) = self.session_recovery.last_persona_state {
1573            debug!("Restoring persona state: {:?}", state);
1574            self.set_persona(state, self.session_recovery.last_player_name.clone()).await?;
1575        }
1576
1577        // Restore rich presence
1578        let rp_data = self.session_recovery.last_rich_presence.clone();
1579        for (app_id, data) in rp_data {
1580            debug!("Restoring rich presence for app {}", app_id);
1581            self.upload_rich_presence(app_id, &data).await?;
1582        }
1583
1584        Ok(())
1585    }
1586
1587    /// Attempt to reconnect to Steam.
1588    async fn attempt_reconnect(&mut self) -> Result<(), SteamError> {
1589        // Get stored logon details
1590        let details = self.logon_details.clone().ok_or_else(|| SteamError::Other("No logon details available for reconnection".to_string()))?;
1591
1592        debug!("Attempting reconnection with stored credentials");
1593
1594        // Get CM server list
1595        let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
1596        let server = provider.get_server().await?;
1597        self.last_cm_server = Some(server.clone());
1598
1599        info!("Reconnecting to CM server: {}", server.endpoint);
1600
1601        // Connect to CM
1602        let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
1603        debug!("WebSocket connection established");
1604
1605        // Generate new session ID
1606        self.session_id = self.rng.gen_i32().abs();
1607
1608        // Send ClientHello (only if not using refresh token)
1609        if details.refresh_token.is_none() {
1610            let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
1611            let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
1612            connection.send(hello_msg.encode()).await?;
1613        }
1614
1615        // Determine login type
1616        let is_anonymous = details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none());
1617
1618        // Build and send ClientLogon
1619        let logon = self.build_logon_message(&details, is_anonymous);
1620        let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
1621        connection.send(logon_msg.encode()).await?;
1622
1623        // Wait for response
1624        let response = self.wait_for_logon_response(&mut *connection).await?;
1625
1626        // Handle response
1627        let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
1628
1629        if eresult != EResult::OK {
1630            return Err(SteamError::SteamResult(eresult));
1631        }
1632
1633        // Success! Update connection state
1634        self.connection = Some(connection);
1635        self.connecting = false;
1636
1637        // Update SteamID
1638        if let Some(sid) = response.client_supplied_steamid {
1639            self.steam_id = Some(SteamID::from(sid));
1640        }
1641
1642        self.cell_id = response.cell_id;
1643        self.vanity_url = response.vanity_url.clone();
1644
1645        if let Some(ref ip) = response.public_ip {
1646            if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
1647                self.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
1648            }
1649        }
1650
1651        info!("Successfully reconnected");
1652        Ok(())
1653    }
1654
1655    /// Poll all available events.
1656    ///
1657    /// This is useful for processing all events at once, including all
1658    /// sub-messages from Multi messages.
1659    pub async fn poll_events(&mut self) -> Result<Vec<super::events::SteamEvent>, SteamError> {
1660        let mut all_events = Vec::new();
1661
1662        while let Some(event) = self.poll_event().await? {
1663            all_events.push(event);
1664
1665            // If no more queued events, break to avoid blocking
1666            if self.event_queue.is_empty() {
1667                break;
1668            }
1669        }
1670
1671        Ok(all_events)
1672    }
1673
1674    /// Handle an event internally to update client state.
1675    fn handle_event(&mut self, event: &mut super::events::SteamEvent) {
1676        use super::events::{AppsEvent, AuthEvent, ConnectionEvent, FriendsEvent, SteamEvent};
1677
1678        match event {
1679            SteamEvent::Auth(AuthEvent::LoggedOn { steam_id }) => {
1680                self.steam_id = Some(*steam_id);
1681                self.connecting = false;
1682                // Reset reconnection state on successful login
1683                self.reconnect_manager.record_success();
1684            }
1685            SteamEvent::Auth(AuthEvent::LoggedOff { .. }) => {
1686                self.steam_id = None;
1687            }
1688            SteamEvent::Auth(AuthEvent::GameConnectTokens { tokens }) => {
1689                self.gc_tokens.extend(tokens.clone());
1690            }
1691            SteamEvent::Friends(FriendsEvent::FriendsList { friends, .. }) => {
1692                for friend in friends {
1693                    if friend.relationship == steam_enums::EFriendRelationship::None {
1694                        self.my_friends.remove(&friend.steam_id);
1695                    } else {
1696                        self.my_friends.insert(friend.steam_id, friend.relationship as u32);
1697                    }
1698                }
1699            }
1700            SteamEvent::Friends(FriendsEvent::PersonaState(persona)) => {
1701                let mut persona = *persona.clone();
1702                if let Some(game_id) = persona.game_id {
1703                    let app_id = game_id as u32;
1704                    if app_id != 0 {
1705                        // If we don't have the app info, mark it as pending
1706                        if !self.apps.contains_key(&app_id) {
1707                            self.pending_apps.insert(app_id);
1708                        } else if persona.game_name.is_none() || persona.game_name.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
1709                            // If we have the app info but game_name is missing, try to resolve it
1710                            if let Some(name) = self.get_app_name(app_id) {
1711                                persona.game_name = Some(name);
1712                            }
1713                        }
1714                    }
1715                }
1716
1717                // Update the users HashMap
1718                self.users.entry(persona.steam_id).and_modify(|existing| existing.merge(&persona)).or_insert_with(|| persona.clone());
1719
1720                // Also update the TTL-based persona cache
1721                self.persona_cache.insert(persona.steam_id, persona);
1722            }
1723            SteamEvent::Apps(AppsEvent::PlayingState { playing_app, blocked }) => {
1724                self.playing_blocked = *blocked;
1725                if *playing_app != 0 {
1726                    if !self.apps.contains_key(playing_app) {
1727                        self.pending_apps.insert(*playing_app);
1728                    }
1729                    self.apps.entry(*playing_app).or_insert_with(|| super::events::AppInfoData { app_id: *playing_app, change_number: 0, missing_token: false, app_info: None });
1730                }
1731            }
1732            SteamEvent::Apps(AppsEvent::ProductInfoResponse { apps, .. }) => {
1733                for (app_id, data) in apps {
1734                    self.apps.insert(*app_id, data.clone());
1735                    self.pending_apps.remove(app_id);
1736                }
1737            }
1738            SteamEvent::Connection(ConnectionEvent::Disconnected { will_reconnect, .. }) => {
1739                // Only clear steam_id if not reconnecting
1740                if !*will_reconnect {
1741                    self.steam_id = None;
1742                }
1743                self.connection = None;
1744                self.connecting = false;
1745            }
1746            SteamEvent::Connection(ConnectionEvent::ReconnectFailed { .. }) => {
1747                // Clear all state on permanent reconnection failure
1748                self.steam_id = None;
1749                self.connection = None;
1750                self.connecting = false;
1751                self.reconnect_manager.reset();
1752            }
1753            SteamEvent::Apps(super::events::AppsEvent::LicenseList { licenses }) => {
1754                self.licenses = licenses.clone();
1755            }
1756            _ => {}
1757        }
1758    }
1759
1760    /// Post-process events to trigger automatic actions (like fetching
1761    /// personas).
1762    async fn post_process_event(&mut self, event: &super::events::SteamEvent) {
1763        use super::events::{FriendsEvent, SteamEvent};
1764
1765        if let SteamEvent::Friends(FriendsEvent::FriendsList { incremental: false, .. }) = event {
1766            let friends: Vec<SteamID> = self.my_friends.iter().filter(|&(_, &rel)| rel == steam_enums::EFriendRelationship::Friend as u32).map(|(&id, _)| id).collect();
1767
1768            if !friends.is_empty() {
1769                debug!("Auto-fetching personas for {} friends", friends.len());
1770                if let Err(e) = self.get_personas(friends).await {
1771                    warn!("Failed to auto-fetch personas: {}", e);
1772                }
1773            }
1774        }
1775    }
1776
1777    /// Format a user's rich presence into a localized string.
1778    ///
1779    /// # Arguments
1780    /// * `steam_id` - The user's SteamID
1781    /// * `language` - The language for localization (e.g., "english")
1782    pub fn format_rich_presence(&self, steam_id: SteamID, language: &str) -> Option<String> {
1783        let user = self.users.get(&steam_id)?;
1784        let app_id = user.game_id? as u32;
1785        let app_info = self.apps.get(&app_id)?;
1786        let app_vdf = app_info.app_info.as_ref()?;
1787
1788        // Get rich presence localization for the given language
1789        let localization = app_vdf.get("common").and_then(|c| c.get("rich_presence")).and_then(|rp| rp.get("localization")).and_then(|loc| loc.get(language))?;
1790
1791        // Get the display token
1792        let display_token = user.rich_presence.get("steam_display")?;
1793        let mut formatted = localization.get_str(display_token)?.to_string();
1794
1795        // Replace placeholders (e.g., %map%, %status%)
1796        for (key, value) in &user.rich_presence {
1797            if key == "steam_display" {
1798                continue;
1799            }
1800            let placeholder = format!("%{}%", key);
1801            // The value itself might be a token that needs localization
1802            let localized_value = localization.get_str(value).unwrap_or(value);
1803            formatted = formatted.replace(&placeholder, localized_value);
1804        }
1805
1806        Some(formatted)
1807    }
1808
1809    /// Get an app's name from cache.
1810    pub fn get_app_name(&self, app_id: u32) -> Option<String> {
1811        // First check static cache (using binary search on sorted static array)
1812        if let Ok(idx) = crate::client::static_app_list::APP_LIST.binary_search_by_key(&app_id, |&(id, _)| id) {
1813            return Some(crate::client::static_app_list::APP_LIST[idx].1.to_string());
1814        }
1815
1816        // Then check PICS cache
1817        let info = self.apps.get(&app_id)?;
1818        if let Some(vdf) = &info.app_info {
1819            vdf.get("common").and_then(|c| c.get_str("name")).or_else(|| vdf.get_str("name")).map(|s| s.to_string())
1820        } else {
1821            None
1822        }
1823    }
1824
1825    /// Fetch info for apps that are currently in the pending queue.
1826    ///
1827    /// This is used for proactive caching of app names and rich presence
1828    /// localization.
1829    pub async fn fetch_pending_app_info(&mut self) -> Result<(), SteamError> {
1830        if self.pending_apps.is_empty() {
1831            return Ok(());
1832        }
1833
1834        let app_ids: Vec<u32> = self.pending_apps.iter().copied().collect();
1835        // Clear pending apps to prevent duplicate requests
1836        self.pending_apps.clear();
1837        self.get_product_info(app_ids).await
1838    }
1839
1840    /// Fetch and update active friend message sessions.
1841    ///
1842    /// This updates the local `users` cache with unread counts and last message
1843    /// timestamps from active chat sessions.
1844    pub async fn update_friend_sessions(&mut self) -> Result<(), SteamError> {
1845        let sessions = self.get_active_friend_sessions().await?;
1846
1847        for session in sessions {
1848            self.users
1849                .entry(session.friend)
1850                .and_modify(|user| {
1851                    user.unread_count = session.unread_count;
1852                    user.last_message_time = session.time_last_message;
1853                })
1854                .or_insert_with(|| UserPersona {
1855                    steam_id: session.friend,
1856                    unread_count: session.unread_count,
1857                    last_message_time: session.time_last_message,
1858                    ..Default::default()
1859                });
1860        }
1861
1862        Ok(())
1863    }
1864
1865    /// Re-log into Steam using stored credentials.
1866    ///
1867    /// This is useful when you want to manually trigger a reconnection.
1868    ///
1869    /// # Errors
1870    ///
1871    /// Returns an error if not currently logged in or if no credentials are
1872    /// stored.
1873    pub async fn relog(&mut self) -> Result<(), SteamError> {
1874        if self.steam_id.is_none() {
1875            return Err(SteamError::NotLoggedOn);
1876        }
1877
1878        if self.logon_details.is_none() {
1879            return Err(SteamError::Other("No stored credentials for relog".to_string()));
1880        }
1881
1882        info!("Initiating manual relog");
1883        self.relogging = true;
1884
1885        // Disconnect and trigger reconnection
1886        if let Some(conn) = self.connection.take() {
1887            let _ = conn.close().await;
1888        }
1889
1890        // Start reconnection sequence
1891        self.reconnect_manager.start_reconnection(EResult::NoConnection);
1892
1893        Ok(())
1894    }
1895
1896    /// Cancel any pending reconnection attempts.
1897    pub fn cancel_reconnect(&mut self) {
1898        self.reconnect_manager.reset();
1899        self.relogging = false;
1900    }
1901
1902    /// Check if currently attempting to reconnect.
1903    pub fn is_reconnecting(&self) -> bool {
1904        self.reconnect_manager.is_reconnecting()
1905    }
1906
1907    /// Get the current reconnection state.
1908    pub fn reconnect_state(&self) -> crate::internal::reconnect::ReconnectState {
1909        self.reconnect_manager.state()
1910    }
1911
1912    //=========================================================================
1913    // Event System Improvements
1914    //=========================================================================
1915
1916    /// Poll for events without blocking.
1917    ///
1918    /// Returns queued events immediately without waiting for network I/O.
1919    /// Useful for draining the event queue in non-async contexts.
1920    ///
1921    /// # Returns
1922    ///
1923    /// `Some(event)` if there's a queued event, `None` otherwise.
1924    ///
1925    /// # Example
1926    /// ```rust,ignore
1927    /// // Process all queued events without blocking
1928    /// while let Some(event) = client.try_poll_event() {
1929    ///     handle_event(event);
1930    /// }
1931    /// ```
1932    pub fn try_poll_event(&mut self) -> Option<super::events::SteamEvent> {
1933        if let Some(mut event) = self.event_queue.pop_front() {
1934            self.handle_event(&mut event);
1935            Some(event)
1936        } else {
1937            None
1938        }
1939    }
1940
1941    /// Poll for events with a timeout.
1942    ///
1943    /// Waits for an event up to the specified duration. Returns `None` if
1944    /// the timeout expires without receiving an event.
1945    ///
1946    /// # Arguments
1947    ///
1948    /// * `timeout` - Maximum time to wait for an event
1949    ///
1950    /// # Example
1951    /// ```rust,ignore
1952    /// use std::time::Duration;
1953    ///
1954    /// // Wait up to 5 seconds for an event
1955    /// match client.poll_event_timeout(Duration::from_secs(5)).await? {
1956    ///     Some(event) => tracing::info!("Got event: {:?}", event),
1957    ///     None => tracing::info!("Timeout - no event received"),
1958    /// }
1959    /// ```
1960    pub async fn poll_event_timeout(&mut self, timeout: std::time::Duration) -> Result<Option<super::events::SteamEvent>, SteamError> {
1961        match tokio::time::timeout(timeout, self.poll_event()).await {
1962            Ok(result) => result,
1963            Err(_) => Ok(None), // Timeout expired
1964        }
1965    }
1966
1967    /// Wait for an event matching a predicate.
1968    ///
1969    /// Polls for events until one matches the given predicate, then returns it.
1970    /// Non-matching events are discarded.
1971    ///
1972    /// # Arguments
1973    ///
1974    /// * `predicate` - A function that returns `true` for the desired event
1975    ///
1976    /// # Example
1977    /// ```rust,ignore
1978    /// // Wait for a chat message
1979    /// let chat_event = client.wait_for(|e| e.is_chat()).await?;
1980    ///
1981    /// // Wait for login to complete
1982    /// let logon = client.wait_for(|e| matches!(
1983    ///     e,
1984    ///     SteamEvent::Auth(AuthEvent::LoggedOn { .. })
1985    /// )).await?;
1986    /// ```
1987    pub async fn wait_for<F>(&mut self, predicate: F) -> Result<super::events::SteamEvent, SteamError>
1988    where
1989        F: Fn(&super::events::SteamEvent) -> bool,
1990    {
1991        loop {
1992            if let Some(event) = self.poll_event().await? {
1993                if predicate(&event) {
1994                    return Ok(event);
1995                }
1996            }
1997        }
1998    }
1999
2000    /// Wait for an event matching a predicate with a timeout.
2001    ///
2002    /// Combines [`wait_for`] with a timeout. Returns `None` if the timeout
2003    /// expires before finding a matching event.
2004    ///
2005    /// # Arguments
2006    ///
2007    /// * `predicate` - A function that returns `true` for the desired event
2008    /// * `timeout` - Maximum time to wait
2009    ///
2010    /// # Example
2011    /// ```rust,ignore
2012    /// use std::time::Duration;
2013    ///
2014    /// // Wait up to 30 seconds for login
2015    /// let logon = client.wait_for_timeout(
2016    ///     |e| e.is_auth(),
2017    ///     Duration::from_secs(30)
2018    /// ).await?;
2019    ///
2020    /// match logon {
2021    ///     Some(event) => tracing::info!("Logged in!"),
2022    ///     None => tracing::info!("Login timed out"),
2023    /// }
2024    /// ```
2025    pub async fn wait_for_timeout<F>(&mut self, predicate: F, timeout: std::time::Duration) -> Result<Option<super::events::SteamEvent>, SteamError>
2026    where
2027        F: Fn(&super::events::SteamEvent) -> bool,
2028    {
2029        match tokio::time::timeout(timeout, self.wait_for(predicate)).await {
2030            Ok(result) => result.map(Some),
2031            Err(_) => Ok(None), // Timeout expired
2032        }
2033    }
2034
2035    /// Drain all currently queued events without blocking.
2036    ///
2037    /// Returns all events that are currently in the queue.
2038    /// Does not wait for network I/O.
2039    ///
2040    /// # Example
2041    /// ```rust,ignore
2042    /// // Get all pending events at once
2043    /// let events = client.drain_queued_events();
2044    /// for event in events {
2045    ///     handle_event(event);
2046    /// }
2047    /// ```
2048    pub fn drain_queued_events(&mut self) -> Vec<super::events::SteamEvent> {
2049        let mut events: Vec<_> = self.event_queue.drain(..).collect();
2050        for event in &mut events {
2051            self.handle_event(event);
2052        }
2053        events
2054    }
2055
2056    /// Get the CS:GO service.
2057    pub fn csgo(&mut self) -> crate::services::CSGOClient<'_> {
2058        crate::services::CSGOClient::new(self)
2059    }
2060
2061    /// Helper to send a unified service method call and wait for response.
2062    pub(crate) async fn send_service_method_and_wait<Req: prost::Message, Resp: prost::Message + Default>(&mut self, method: &str, body: &Req) -> Result<Resp, SteamError> {
2063        let rx = self.send_service_method_with_job(method, body).await?;
2064
2065        // Wait for response
2066        let job_response = rx.await.map_err(|_| SteamError::ResponseTimeout)?;
2067
2068        let body = match job_response {
2069            crate::internal::jobs::JobResponse::Success(bytes) => bytes,
2070            crate::internal::jobs::JobResponse::Timeout => return Err(SteamError::ResponseTimeout),
2071            crate::internal::jobs::JobResponse::Error(msg) => return Err(SteamError::ProtocolError(msg)),
2072        };
2073
2074        Resp::decode(&body[..]).map_err(|_| SteamError::DeserializationFailed)
2075    }
2076}
2077
2078//=============================================================================
2079// Stream Implementation for SteamClient
2080//=============================================================================
2081
2082use std::task::{Context, Poll};
2083
2084use futures::Stream;
2085
2086/// A stream of Steam events.
2087///
2088/// Created by calling [`SteamClient::into_stream`].
2089///
2090/// **Note**: This stream only yields queued events synchronously. For full
2091/// async event polling (including network I/O), use [`poll_event`] in a loop
2092/// or with `futures::stream::unfold`.
2093///
2094/// # Example
2095/// ```rust,ignore
2096/// use futures::StreamExt;
2097///
2098/// // Using as a stream (queued events only)
2099/// let mut stream = client.into_stream();
2100///
2101/// // Process queued events
2102/// while let Some(Ok(event)) = stream.next().await {
2103///     tracing::info!("Event: {:?}", event);
2104/// }
2105///
2106/// // For full network I/O, use unfold:
2107/// use futures::stream::unfold;
2108///
2109/// let stream = unfold(client, |mut client| async move {
2110///     match client.poll_event().await {
2111///         Ok(Some(event)) => Some((Ok(event), client)),
2112///         Ok(None) => Some((Ok(SteamEvent::System(SystemEvent::Debug("no event".into()))), client)),
2113///         Err(e) => Some((Err(e), client)),
2114///     }
2115/// });
2116/// ```
2117pub struct SteamEventStream<'a> {
2118    client: &'a mut SteamClient,
2119}
2120
2121impl SteamClient {
2122    /// Convert this client into an event stream for queued events.
2123    ///
2124    /// Returns a `Stream` that yields queued `SteamEvent`s. This is useful
2125    /// for draining the event queue using stream combinators.
2126    ///
2127    /// **Note**: This stream only yields already-queued events. It does not
2128    /// perform network I/O. For full async event polling, use [`poll_event`]
2129    /// in a loop or create a stream with `futures::stream::unfold`.
2130    ///
2131    /// # Example
2132    /// ```rust,ignore
2133    /// use futures::StreamExt;
2134    ///
2135    /// // Drain queued events
2136    /// let mut stream = client.into_stream();
2137    /// while let Some(Ok(event)) = stream.next().await {
2138    ///     handle_event(event);
2139    /// }
2140    /// ```
2141    /// Send a unified service method call in the background.
2142    ///
2143    /// The response will be automatically handled when it arrives by checking
2144    /// the background job tasks in `poll_event`.
2145    pub(crate) async fn send_service_method_background<T: prost::Message>(&mut self, method: &str, body: &T, task: BackgroundTask) -> Result<(), SteamError> {
2146        let (job_id, rx) = self.job_manager.create_job().await;
2147
2148        self.send_service_method_with_job_id(method, body, job_id).await?;
2149
2150        self.background_job_tasks.insert(job_id, task);
2151
2152        let tx = self.background_job_results_tx.clone();
2153        tokio::spawn(async move {
2154            match rx.await {
2155                Ok(crate::internal::jobs::JobResponse::Success(bytes)) => {
2156                    let _ = tx.send((job_id, Ok(bytes))).await;
2157                }
2158                Ok(crate::internal::jobs::JobResponse::Timeout) => {
2159                    let _ = tx.send((job_id, Err(SteamError::Timeout))).await;
2160                }
2161                Ok(crate::internal::jobs::JobResponse::Error(e)) => {
2162                    let _ = tx.send((job_id, Err(SteamError::Other(e)))).await;
2163                }
2164                Err(_) => {
2165                    // Receiver closed
2166                }
2167            }
2168        });
2169
2170        Ok(())
2171    }
2172
2173    /// Helper to send a unified service method call with a specific job ID.
2174    async fn send_service_method_with_job_id<T: prost::Message>(&mut self, method: &str, body: &T, job_id: u64) -> Result<(), SteamError> {
2175        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
2176
2177        let header = ProtobufMessageHeader {
2178            header_length: 0,
2179            session_id: self.session_id,
2180            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
2181            job_id_source: job_id,
2182            job_id_target: u64::MAX,
2183            target_job_name: Some(method.to_string()),
2184            routing_appid: None,
2185        };
2186
2187        let msg = SteamMessage::new_proto(steam_enums::EMsg::ServiceMethodCallFromClient, header, body);
2188
2189        if let Some(ref mut conn) = self.connection {
2190            conn.send(msg.encode()).await?;
2191        } else {
2192            return Err(SteamError::NotConnected);
2193        }
2194
2195        Ok(())
2196    }
2197
2198    pub fn into_stream(&mut self) -> SteamEventStream<'_> {
2199        SteamEventStream { client: self }
2200    }
2201}
2202
2203impl<'a> Stream for SteamEventStream<'a> {
2204    type Item = Result<super::events::SteamEvent, SteamError>;
2205
2206    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2207        // Only yield queued events - does not perform network I/O
2208        // For full async polling, use poll_event() in a loop
2209        if let Some(mut event) = self.client.event_queue.pop_front() {
2210            self.client.handle_event(&mut event);
2211            Poll::Ready(Some(Ok(event)))
2212        } else {
2213            // No more queued events
2214            Poll::Ready(None)
2215        }
2216    }
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221    use super::*;
2222
2223    fn test_steam_id() -> SteamID {
2224        SteamID::from_steam_id64(76561198000000000)
2225    }
2226
2227    #[test]
2228    fn test_user_persona_merge_rich_presence() {
2229        let mut persona1 = UserPersona {
2230            steam_id: test_steam_id(),
2231            rich_presence: {
2232                let mut map = HashMap::new();
2233                map.insert("status".to_string(), "Online".to_string());
2234                map.insert("game".to_string(), "CS:GO".to_string());
2235                map
2236            },
2237            ..Default::default()
2238        };
2239
2240        let persona2 = UserPersona {
2241            steam_id: test_steam_id(),
2242            rich_presence: {
2243                let mut map = HashMap::new();
2244                map.insert("status".to_string(), "In-Game".to_string());
2245                map.insert("party".to_string(), "Open".to_string());
2246                map
2247            },
2248            ..Default::default()
2249        };
2250
2251        persona1.merge(&persona2);
2252
2253        assert_eq!(persona1.rich_presence.get("status"), Some(&"In-Game".to_string()));
2254        assert_eq!(persona1.rich_presence.get("game"), Some(&"CS:GO".to_string()));
2255        assert_eq!(persona1.rich_presence.get("party"), Some(&"Open".to_string()));
2256    }
2257
2258    #[test]
2259    fn test_user_persona_merge_unread_count() {
2260        let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2261
2262        let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 10, last_message_time: 200, ..Default::default() };
2263
2264        persona1.merge(&persona2);
2265
2266        assert_eq!(persona1.unread_count, 10);
2267        assert_eq!(persona1.last_message_time, 200);
2268    }
2269
2270    #[test]
2271    fn test_user_persona_merge_unread_count_zero() {
2272        let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2273
2274        // Merging with 0 should NOT overwrite existing values
2275        let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 0, last_message_time: 0, ..Default::default() };
2276
2277        persona1.merge(&persona2);
2278
2279        assert_eq!(persona1.unread_count, 5);
2280        assert_eq!(persona1.last_message_time, 100);
2281    }
2282}