Skip to main content

steam_client/client/
steam_client.rs

1//! Main Steam client implementation.
2
3mod polling;
4mod reconnect;
5mod state;
6
7use std::{
8    collections::{HashMap, VecDeque},
9    pin::Pin,
10    sync::Arc,
11    time::Duration,
12};
13
14use chrono::{DateTime, Utc};
15use prost::Message;
16use steam_auth::{CredentialsDetails, EAuthSessionGuardType, EAuthTokenPlatformType, LoginSession, LoginSessionOptions};
17use steam_enums::{EMsg, EPersonaState, EResult};
18use steam_protos::{CMsgClientHello, CMsgClientLogOff, CMsgClientLogon, CMsgClientLogonResponse, PROTOCOL_VERSION};
19use steamid::SteamID;
20use tracing::{debug, error, info, info_span, warn, Instrument};
21
22use crate::{
23    connection::{CmServer, CmServerProvider, HttpCmServerProvider, SteamConnection, WebSocketConnection},
24    error::SteamError,
25    options::SteamOptions,
26    protocol::SteamMessage,
27    types::{AccountInfo, EmailInfo, Limitations, LogOnDetails, LogOnResponse, VacStatus, WalletInfo},
28};
29
30/// User persona information.
31#[derive(Debug, Clone)]
32pub struct UserPersona {
33    pub steam_id: SteamID,
34    pub player_name: String,
35    pub persona_state: EPersonaState,
36    pub persona_state_flags: u32,
37    pub avatar_hash: Option<String>,
38    pub game_name: Option<String>,
39    pub game_id: Option<u64>,
40    pub last_logon: Option<DateTime<Utc>>,
41    pub last_logoff: Option<DateTime<Utc>>,
42    pub last_seen_online: Option<DateTime<Utc>>,
43    pub rich_presence: HashMap<String, String>,
44    /// Steam player group ID (lobby/party) from rich presence
45    pub steam_player_group: Option<String>,
46    /// Status string from rich presence (e.g. "Competitive Mirage [ 3 : 6 ]")
47    pub rich_presence_status: Option<String>,
48    /// Map name from rich presence (e.g. "de_mirage")
49    pub game_map: Option<String>,
50    /// Game score from rich presence (e.g. "[ 3 : 6 ]")
51    pub game_score: Option<String>,
52    /// Number of players/slots in lobby/party from rich presence
53    pub num_players: Option<u32>,
54    pub unread_count: u32,
55    pub last_message_time: u32,
56}
57
58impl Default for UserPersona {
59    fn default() -> Self {
60        Self {
61            steam_id: SteamID::default(),
62            player_name: String::new(),
63            persona_state: EPersonaState::Offline,
64            persona_state_flags: 0,
65            avatar_hash: None,
66            game_name: None,
67            game_id: None,
68            last_logon: None,
69            last_logoff: None,
70            last_seen_online: None,
71            rich_presence: HashMap::new(),
72            steam_player_group: None,
73            rich_presence_status: None,
74            game_map: None,
75            game_score: None,
76            num_players: None,
77            unread_count: 0,
78            last_message_time: 0,
79        }
80    }
81}
82
83impl UserPersona {
84    /// Get the URL for the user's avatar.
85    ///
86    /// Size can be "icon" (32x32), "medium" (64x64), or "full" (184x184).
87    pub fn avatar_url(&self, size: &str) -> Option<String> {
88        let hash = self.avatar_hash.as_ref()?;
89        let size_suffix = match size {
90            "medium" => "_medium.jpg",
91            "full" => "_full.jpg",
92            _ => ".jpg",
93        };
94        Some(format!("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{}/{}{}", &hash[0..2], hash, size_suffix))
95    }
96
97    /// Merge another persona data into this one, overwriting only
98    /// non-empty/Some values.
99    pub fn merge(&mut self, other: &UserPersona) {
100        if !other.player_name.is_empty() {
101            self.player_name = other.player_name.clone();
102        }
103        self.persona_state = other.persona_state;
104        if other.persona_state_flags != 0 {
105            self.persona_state_flags = other.persona_state_flags;
106        }
107        if other.avatar_hash.is_some() {
108            self.avatar_hash = other.avatar_hash.clone();
109        }
110        if other.game_name.is_some() {
111            self.game_name = other.game_name.clone();
112        }
113        if other.game_id.is_some() && other.game_id != Some(0) {
114            self.game_id = other.game_id;
115        }
116        if other.last_logon.is_some() {
117            self.last_logon = other.last_logon;
118        }
119        if other.last_logoff.is_some() {
120            self.last_logoff = other.last_logoff;
121        }
122        if other.last_seen_online.is_some() {
123            self.last_seen_online = other.last_seen_online;
124        }
125        for (k, v) in &other.rich_presence {
126            self.rich_presence.insert(k.clone(), v.clone());
127        }
128        if other.steam_player_group.is_some() {
129            self.steam_player_group = other.steam_player_group.clone();
130        }
131        if other.rich_presence_status.is_some() {
132            self.rich_presence_status = other.rich_presence_status.clone();
133        }
134        if other.game_map.is_some() {
135            self.game_map = other.game_map.clone();
136        }
137        if other.game_score.is_some() {
138            self.game_score = other.game_score.clone();
139        }
140        // Only update unread count and last message time if they are non-zero in the
141        // other persona
142        if other.unread_count > 0 {
143            self.unread_count = other.unread_count;
144        }
145        if other.last_message_time > 0 {
146            self.last_message_time = other.last_message_time;
147        }
148    }
149}
150
151use std::time::Instant;
152
153use bytes::Bytes;
154use tokio::sync::{mpsc, oneshot};
155
156/// Background tasks that can be triggered without awaiting.
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub(crate) enum BackgroundTask {
159    FriendsList,
160    OfflineMessages(SteamID),
161}
162
163/// Message queued for sending.
164#[derive(Debug)]
165pub(crate) struct QueuedMessage {
166    pub friend: SteamID,
167    pub message: String,
168    pub entry_type: steam_enums::EChatEntryType,
169    pub contains_bbcode: bool,
170    pub respond_to: oneshot::Sender<Result<crate::services::chat::SendMessageResult, SteamError>>,
171}
172
173/// Wall-clock timeout for waiting on a `ClientLogOnResponse` after sending
174/// `ClientLogon`. Steam usually responds within ~1s; 15s leaves headroom for
175/// slow links and Multi-message decoding.
176const LOGON_RESPONSE_TIMEOUT: Duration = Duration::from_secs(15);
177/// Heartbeat interval in seconds.
178const HEARTBEAT_INTERVAL_SECONDS: u64 = 30;
179
180/// Per-account state populated from the login response and account-info
181/// service calls. Cleared on logoff.
182#[derive(Debug, Default, Clone)]
183pub(crate) struct AccountState {
184    pub(crate) public_ip: Option<String>,
185    pub(crate) cell_id: Option<u32>,
186    pub(crate) vanity_url: Option<String>,
187    pub(crate) info: Option<AccountInfo>,
188    pub(crate) email: Option<EmailInfo>,
189    pub(crate) limitations: Option<Limitations>,
190    pub(crate) vac: Option<VacStatus>,
191    pub(crate) wallet: Option<WalletInfo>,
192}
193
194/// Apps and licenses cached on this client.
195#[derive(Debug, Default)]
196pub(crate) struct AppsState {
197    /// App information cache (AppID -> Info), populated from PICS responses.
198    pub(crate) info: HashMap<u32, super::events::AppInfoData>,
199    /// Account licenses (packages owned).
200    pub(crate) licenses: Vec<super::events::LicenseEntry>,
201    /// App IDs that need PICS info fetching.
202    pub(crate) pending: rustc_hash::FxHashSet<u32>,
203    /// Whether Steam has blocked us from setting a "Playing" status.
204    pub(crate) playing_blocked: bool,
205    /// App IDs currently set via `games_played`.
206    pub(crate) playing_app_ids: Vec<u32>,
207}
208
209/// Friends, personas, chat read state, and persona cache.
210#[derive(Debug, Default)]
211pub(crate) struct SocialState {
212    /// Friends list (SteamID -> relationship).
213    pub(crate) friends: HashMap<SteamID, u32>,
214    /// User personas (live state map).
215    pub(crate) users: HashMap<SteamID, UserPersona>,
216    /// Chat last view timestamps (SteamID -> unix timestamp).
217    pub(crate) chat_last_view: HashMap<SteamID, u32>,
218    /// Persona cache with TTL expiration.
219    pub(crate) persona_cache: crate::cache::PersonaCache,
220}
221
222/// Outbound chat-message queue, rate-limit state, and the JoinSet of
223/// in-flight chat-send tasks. Fields are accessed split (`&mut self.chat.rx`
224/// vs `self.chat.tasks.join_next()`) inside `tokio::select!`; Rust's
225/// disjoint-field borrow analysis keeps this sound.
226pub(crate) struct ChatQueueState {
227    pub(crate) tx: mpsc::UnboundedSender<QueuedMessage>,
228    pub(crate) rx: mpsc::UnboundedReceiver<QueuedMessage>,
229    pub(crate) rate_limit_until: Option<Instant>,
230    pub(crate) pending_retry_message: Option<QueuedMessage>,
231    pub(crate) tasks: tokio::task::JoinSet<(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>)>,
232}
233
234/// Authentication & connection-lifecycle state. Updated during
235/// `log_on`/`log_off`/reconnect; cleared on logoff.
236#[derive(Default)]
237pub(crate) struct AuthState {
238    pub(crate) login_session: Option<LoginSession>,
239    pub(crate) logon_details: Option<LogOnDetails>,
240    pub(crate) session_id: i32,
241    pub(crate) connecting: bool,
242    pub(crate) logging_off: bool,
243    pub(crate) relogging: bool,
244    pub(crate) temp_steam_id: Option<SteamID>,
245    pub(crate) auth_seq_me: u32,
246    pub(crate) auth_seq_them: u32,
247    pub(crate) last_cm_server: Option<CmServer>,
248}
249
250/// The main Steam client.
251///
252/// State fields are `pub(crate)` and read via accessor methods (`steam_id()`,
253/// `cell_id()`, etc.). Direct field access was retired in the 0.2.x cycle to
254/// stabilize the public surface for the road to 1.0.
255pub struct SteamClient {
256    /// Client options (kept `pub` because it's user-supplied configuration).
257    pub options: SteamOptions,
258
259    /// The logged-in SteamID (None if not logged in).
260    pub(crate) steam_id: Option<SteamID>,
261
262    /// Account-derived state (IP, cell, account info, wallet, VAC, …).
263    ///
264    /// Wrapped in `Arc<parking_lot::RwLock>` so `&self` accessor methods
265    /// (`account_info`, `wallet`, `vac_status`, …) can read without exclusive
266    /// access to the whole client. Never hold a guard across `.await`.
267    pub(crate) account: Arc<parking_lot::RwLock<AccountState>>,
268
269    /// Apps & licenses (PICS cache, owned licenses, playing status).
270    ///
271    /// Wrapped in `Arc<parking_lot::RwLock>` so `&self` accessor methods
272    /// (`licenses`, `get_app_name`, `format_rich_presence`) can read without
273    /// exclusive access. Methods that need long-lived references to inner
274    /// `AppInfoData` should bind `let apps = self.apps.read();` at the top
275    /// of the (synchronous) function. Never hold across `.await`.
276    pub(crate) apps: Arc<parking_lot::RwLock<AppsState>>,
277
278    /// Friends, personas, chat read state, persona cache.
279    ///
280    /// Wrapped in `Arc<parking_lot::RwLock>` so `&self` accessor methods
281    /// (`friend_relationship`, `persona`, `friends`) work without exclusive
282    /// access to the whole client. **Never hold a guard across an `.await`** —
283    /// `parking_lot::RwLockReadGuard` is `Send`, so the compiler won't catch
284    /// it, but a held lock blocks all other accessors until the await resolves.
285    pub(crate) social: Arc<parking_lot::RwLock<SocialState>>,
286
287    // Connection stats
288    pub(crate) connect_time: u64,
289    pub(crate) connection_count: u32,
290
291    /// Authentication & connection-lifecycle state.
292    ///
293    /// Wrapped in `Arc<parking_lot::RwLock>` for symmetry with the other
294    /// sub-structs. Note: most reads are quick value extractions; methods
295    /// that need `&mut LoginSession` across an `.await` (`get_web_session`)
296    /// `take()` the session, do the I/O, then re-insert it.
297    pub(crate) auth: Arc<parking_lot::RwLock<AuthState>>,
298
299    pub(crate) h_steam_pipe: u32,
300    pub(crate) gc_tokens: Vec<Vec<u8>>,
301    pub(crate) active_tickets: Vec<crate::services::appauth::AuthSessionTicket>,
302
303    /// Last time this account successfully registered for a CSGO party
304    /// (in-memory)
305    pub(crate) last_time_party_register: Option<i64>,
306
307    /// Outbound chat queue + in-flight send tasks.
308    pub(crate) chat: ChatQueueState,
309
310    // Internal state (crate-visible)
311    pub(crate) connection: Option<Box<dyn SteamConnection>>,
312    pub(crate) event_queue: VecDeque<super::events::SteamEvent>,
313
314    // Reconnection state
315    pub(crate) reconnect_manager: crate::internal::reconnect::ReconnectManager,
316    pub(crate) heartbeat_manager: crate::internal::heartbeat::HeartbeatManager,
317
318    // Job manager for request-response correlation
319    pub(crate) job_manager: crate::internal::jobs::JobManager,
320
321    // GC job manager for GC message request-response correlation
322    pub(crate) gc_jobs: crate::services::gc::GCJobManager,
323
324    /// Mapping of Job IDs to specific background tasks
325    pub(crate) background_job_tasks: HashMap<u64, BackgroundTask>,
326
327    /// Channel for background job results
328    pub(crate) background_job_results_tx: mpsc::Sender<(u64, Result<Bytes, SteamError>)>,
329    pub(crate) background_job_results_rx: mpsc::Receiver<(u64, Result<Bytes, SteamError>)>,
330
331    // Session recovery helper
332    pub(crate) session_recovery: crate::services::SessionRecovery,
333
334    // Dependency injection for testing
335    pub(crate) http_client: Arc<dyn crate::utils::http::HttpClient>,
336    pub(crate) clock: Arc<dyn crate::utils::clock::Clock>,
337    pub(crate) rng: Arc<dyn crate::utils::rng::Rng>,
338}
339
340impl SteamClient {
341    /// Create a builder for constructing a SteamClient instance.
342    ///
343    /// The builder allows injecting mock dependencies for testing.
344    /// For production use, prefer `SteamClient::new()` instead.
345    ///
346    /// # Example
347    ///
348    /// ```rust
349    /// use steam_client::SteamClient;
350    ///
351    /// // For testing with mocked dependencies
352    /// let (client, mocks) = SteamClient::builder()
353    ///     .with_mock_http()
354    ///     .with_mock_clock()
355    ///     .build_with_mocks();
356    ///
357    /// // Control the mock clock in tests
358    /// if let Some(clock) = mocks.clock {
359    ///     clock.advance(std::time::Duration::from_secs(10));
360    /// }
361    /// ```
362    pub fn builder() -> super::builder::SteamClientBuilder {
363        super::builder::SteamClientBuilder::new()
364    }
365
366    /// Create a new Steam client with the given options.
367    pub fn new(options: SteamOptions) -> Self {
368        Self::builder().with_options(options).build()
369    }
370
371    /// Create a new Steam client with all custom providers.
372    ///
373    /// This is primarily useful for testing, allowing all mock providers to be
374    /// injected.
375    pub(crate) fn with_all_providers(options: SteamOptions, http_client: Arc<dyn crate::utils::http::HttpClient>, clock: Arc<dyn crate::utils::clock::Clock>, rng: Arc<dyn crate::utils::rng::Rng>) -> Self {
376        let reconnect_config = options.reconnect.clone();
377        let (chat_queue_tx, chat_queue_rx) = mpsc::unbounded_channel();
378        let (bg_tx, bg_rx) = mpsc::channel(100);
379
380        Self {
381            options,
382            steam_id: None,
383            account: Arc::new(parking_lot::RwLock::new(AccountState::default())),
384            apps: Arc::new(parking_lot::RwLock::new(AppsState::default())),
385            social: Arc::new(parking_lot::RwLock::new(SocialState::default())),
386            auth: Arc::new(parking_lot::RwLock::new(AuthState::default())),
387            connect_time: 0,
388            connection_count: 0,
389            h_steam_pipe: (rng.gen_u32() % 1000000) + 1,
390            gc_tokens: Vec::new(),
391            active_tickets: Vec::new(),
392            last_time_party_register: None,
393            chat: ChatQueueState {
394                tx: chat_queue_tx,
395                rx: chat_queue_rx,
396                rate_limit_until: None,
397                pending_retry_message: None,
398                tasks: tokio::task::JoinSet::new(),
399            },
400            connection: None,
401            event_queue: VecDeque::new(),
402            reconnect_manager: crate::internal::reconnect::ReconnectManager::new(reconnect_config),
403            heartbeat_manager: crate::internal::heartbeat::HeartbeatManager::new(HEARTBEAT_INTERVAL_SECONDS),
404            job_manager: crate::internal::jobs::JobManager::new(),
405            gc_jobs: crate::services::gc::GCJobManager::new(),
406            background_job_tasks: HashMap::new(),
407            background_job_results_tx: bg_tx,
408            background_job_results_rx: bg_rx,
409            session_recovery: crate::services::SessionRecovery::new(),
410            http_client,
411            clock,
412            rng,
413        }
414    }
415
416    /// Log on to Steam.
417    pub async fn log_on(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
418        if self.steam_id.is_some() {
419            return Err(SteamError::AlreadyLoggedOn);
420        }
421
422        if self.auth.read().connecting {
423            return Err(SteamError::AlreadyConnecting);
424        }
425
426        self.auth.write().connecting = true;
427
428        match self.execute_logon(details).await {
429            Ok(response) => Ok(response),
430            Err(e) => {
431                match &e {
432                    SteamError::SteamResult(steam_enums::EResult::AccountLoginDeniedThrottle) => {
433                        crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
434                    }
435                    SteamError::SteamResult(steam_enums::EResult::ServiceUnavailable) => {
436                        crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "ServiceUnavailable");
437                    }
438                    SteamError::SteamResult(steam_enums::EResult::TryAnotherCM) => {
439                        crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "TryAnotherCM");
440                    }
441                    _ => {}
442                }
443                self.auth.write().connecting = false;
444                Err(e)
445            }
446        }
447    }
448
449    async fn execute_logon(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
450        self.auth.write().logging_off = false;
451
452        info!("Starting Steam login...");
453        crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::CMConnection).await;
454
455        // Store logon details
456        self.auth.write().logon_details = Some(details.clone());
457
458        // Determine login type
459        let is_web_logon = details.web_logon_token.is_some() && details.steam_id.is_some();
460        let is_anonymous = !is_web_logon && details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none() && !is_web_logon);
461
462        // Set up temporary SteamID
463        let mut temp_sid = SteamID::new();
464        temp_sid.universe = steamid::Universe::Public;
465
466        if is_web_logon {
467            // Web logon token login (from clientjstoken)
468            info!("Logging in with web logon token");
469            if let Some(sid) = details.steam_id {
470                temp_sid = sid;
471            }
472            temp_sid.account_type = steamid::AccountType::Individual;
473        } else if is_anonymous {
474            info!("Logging in anonymously");
475            temp_sid.account_type = steamid::AccountType::AnonUser;
476        } else if let Some(ref token) = details.refresh_token {
477            info!("Logging in with refresh token");
478            // Create login session from refresh token
479            match LoginSession::from_refresh_token(token.clone()) {
480                Ok(session) => {
481                    if let Some(sid) = session.steam_id().cloned() {
482                        temp_sid = sid;
483                    }
484                    self.auth.write().login_session = Some(session);
485                }
486                Err(e) => {
487                    return Err(SteamError::InvalidToken(e.to_string()));
488                }
489            }
490        } else if details.account_name.is_some() && details.password.is_some() {
491            info!("Logging in with account name and password");
492            temp_sid.account_type = steamid::AccountType::Individual;
493        }
494
495        self.auth.write().temp_steam_id = Some(temp_sid);
496
497        // Get CM server list
498        let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
499        let server = provider.get_server().await?;
500        info!("Connecting to CM server: {}", server.endpoint);
501        let start_time = Instant::now();
502
503        // Connect to CM
504        let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
505        debug!("WebSocket connection established in {}ms", start_time.elapsed().as_millis());
506
507        // Generate session ID
508        self.auth.write().session_id = (self.rng.gen_u32() & 0x7FFF_FFFF) as i32;
509
510        // Send ClientHello (only if not using refresh token or web_logon_token,
511        // matching node-steam-user behavior)
512        let skip_hello = details.refresh_token.is_some() || details.web_logon_token.is_some();
513        if !skip_hello {
514            let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
515            let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
516            connection.send(hello_msg.encode()).await?;
517            debug!("Sent ClientHello");
518        }
519
520        // Build and send ClientLogon
521        let logon = self.build_logon_message(&details, is_anonymous);
522        let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
523        connection.send(logon_msg.encode()).await?;
524        debug!("Sent ClientLogon");
525
526        // Wait for response
527        let response = self.wait_for_logon_response(&mut *connection).await?;
528
529        self.connection = Some(connection);
530        self.auth.write().connecting = false;
531
532        // Handle response
533        let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
534
535        if eresult != EResult::OK {
536            self.steam_id = None;
537            return Err(SteamError::SteamResult(eresult));
538        }
539
540        // Success!
541        let steam_id = if let Some(sid) = response.client_supplied_steamid {
542            SteamID::from(sid)
543        } else if let Some(sid) = self.auth.read().temp_steam_id {
544            sid
545        } else {
546            return Err(SteamError::bad_response("Logon response missing client_supplied_steamid"));
547        };
548
549        self.steam_id = Some(steam_id);
550        {
551            let mut account = self.account.write();
552            account.cell_id = response.cell_id;
553            account.vanity_url = response.vanity_url.clone();
554
555            if let Some(ref ip) = response.public_ip {
556                if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
557                    account.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
558                }
559            }
560        }
561
562        // Initialize heartbeat
563        if let Some(secs) = response.heartbeat_seconds {
564            if secs > 0 {
565                self.heartbeat_manager.set_interval(secs as u64);
566            }
567        }
568        self.heartbeat_manager.reset();
569
570        let username = details.account_name.as_deref().unwrap_or("Unknown");
571        info!("Logged on as steam_id: {}, username: {}", steam_id.steam_id64(), username);
572
573        Ok(LogOnResponse {
574            eresult,
575            steam_id,
576            public_ip: self.account.read().public_ip.clone(),
577            cell_id: self.account.read().cell_id.unwrap_or(0),
578            vanity_url: self.account.read().vanity_url.clone(),
579            email_domain: response.email_domain.clone(),
580            steam_guard_required: false,
581            heartbeat_seconds: response.heartbeat_seconds,
582            server_time: response.rtime32_server_time,
583            account_flags: response.account_flags,
584            user_country: response.user_country.clone(),
585            ip_country_code: response.ip_country_code.clone(),
586            client_instance_id: response.client_instance_id,
587            token_id: response.token_id,
588            family_group_id: response.family_group_id,
589            eresult_extended: response.eresult_extended,
590            cell_id_ping_threshold: response.cell_id_ping_threshold,
591            force_client_update_check: response.force_client_update_check,
592            agreement_session_url: response.agreement_session_url.clone(),
593            legacy_out_of_game_heartbeat_seconds: response.legacy_out_of_game_heartbeat_seconds,
594            parental_settings: response.parental_settings.clone(),
595            parental_setting_signature: response.parental_setting_signature.clone(),
596            count_loginfailures_to_migrate: response.count_loginfailures_to_migrate,
597            count_disconnects_to_migrate: response.count_disconnects_to_migrate,
598            ogs_data_report_time_window: response.ogs_data_report_time_window,
599            steam2_ticket: response.steam2_ticket.clone(),
600        })
601    }
602
603    /// Build the ClientLogon protobuf message.
604    fn build_logon_message(&self, details: &LogOnDetails, is_anonymous: bool) -> CMsgClientLogon {
605        use crate::protocol::messages::{build_client_logon, LogonConfig};
606
607        // Generate machine name if not provided and not anonymous
608        let machine_name = if details.machine_name.is_some() {
609            details.machine_name.clone()
610        } else if !is_anonymous {
611            Some(format!("DESKTOP-{:06}", self.rng.gen_u32() % 1000000))
612        } else {
613            None
614        };
615
616        let identifier = if let Some(ref name) = details.account_name { Some(name.clone()) } else { self.auth.read().temp_steam_id.as_ref().map(|sid| sid.steam_id64().to_string()) };
617
618        // Build config from current state
619        let config = LogonConfig::new().with_cell_id(self.account.read().cell_id).with_machine_name(machine_name).with_identifier(identifier);
620
621        // Delegate to pure function
622        build_client_logon(&config, details, is_anonymous)
623    }
624
625    /// Wait for and parse the logon response.
626    ///
627    /// Drives `connection.recv()` (already async) inside a wall-clock timeout.
628    /// Previously this used a bounded iteration count with a 100ms sleep
629    /// between recvs, which (a) added up to 100ms of unnecessary latency to
630    /// every logon and (b) made the effective timeout depend on incoming
631    /// message rate. `tokio::time::timeout` gives a real 15s deadline regardless
632    /// of how many Multi sub-messages we have to walk through.
633    async fn wait_for_logon_response(&self, connection: &mut dyn SteamConnection) -> Result<CMsgClientLogonResponse, SteamError> {
634        use std::io::Read;
635
636        use flate2::read::GzDecoder;
637
638        let work = async {
639            loop {
640                let data = match connection.recv().await? {
641                    Some(d) => d,
642                    None => return Err(SteamError::ConnectionError("Connection closed by server during login".into())),
643                };
644
645                let msg = SteamMessage::decode_from_bytes(&data)?;
646                debug!("Received message: {:?}", msg.msg);
647
648                if msg.msg == EMsg::ClientLogOnResponse {
649                    return msg.decode_body::<CMsgClientLogonResponse>();
650                }
651
652                // Multi messages may contain the logon response nested inside.
653                if msg.msg == EMsg::Multi {
654                    let multi = match msg.decode_body::<steam_protos::CMsgMulti>() {
655                        Ok(m) => m,
656                        Err(e) => {
657                            warn!("Failed to decode Multi message: {}", e);
658                            continue;
659                        }
660                    };
661                    let body = multi.message_body.unwrap_or_default();
662                    let payload = if multi.size_unzipped.unwrap_or(0) > 0 {
663                        // Gzip compressed — offload decompression to a blocking task
664                        // to avoid stalling the async runtime.
665                        let decompressed_result = tokio::task::spawn_blocking(move || {
666                            let mut decoder = GzDecoder::new(&body[..]);
667                            let mut decompressed = Vec::new();
668                            if decoder.read_to_end(&mut decompressed).is_ok() {
669                                Some(decompressed)
670                            } else {
671                                None
672                            }
673                        })
674                        .await
675                        .map_err(|e| SteamError::Other(format!("Task join error: {}", e)))?;
676
677                        match decompressed_result {
678                            Some(d) => d,
679                            None => {
680                                error!("Failed to decompress Multi message");
681                                continue;
682                            }
683                        }
684                    } else {
685                        body
686                    };
687
688                    // Iterate over sub-messages
689                    let mut offset = 0;
690                    while offset + 4 <= payload.len() {
691                        let sub_size = u32::from_le_bytes([payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3]]) as usize;
692                        offset += 4;
693
694                        if offset + sub_size > payload.len() {
695                            break;
696                        }
697
698                        let sub_data = &payload[offset..offset + sub_size];
699                        match SteamMessage::decode_from_bytes(sub_data) {
700                            Ok(sub_msg) => {
701                                debug!("Received sub-message inside Multi: {:?}", sub_msg.msg);
702                                if sub_msg.msg == EMsg::ClientLogOnResponse {
703                                    return sub_msg.decode_body::<CMsgClientLogonResponse>();
704                                }
705                            }
706                            Err(e) => {
707                                warn!("Failed to decode sub-message inside Multi: {}", e);
708                            }
709                        }
710                        offset += sub_size;
711                    }
712                }
713            }
714        };
715
716        match tokio::time::timeout(LOGON_RESPONSE_TIMEOUT, work).await {
717            Ok(result) => result,
718            Err(_) => Err(SteamError::Timeout),
719        }
720    }
721
722    /// Create a protobuf message with header.
723    fn create_proto_message<T: Message>(&self, msg: EMsg, body: &T) -> SteamMessage {
724        use crate::protocol::messages::{build_proto_header, create_steam_message};
725
726        let steam_id = self.auth.read().temp_steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0);
727        let header = build_proto_header(self.auth.read().session_id, steam_id, u64::MAX, u64::MAX);
728
729        create_steam_message(msg, header, body)
730    }
731
732    /// Log off from Steam.
733    pub async fn log_off(&mut self) -> Result<(), SteamError> {
734        if self.steam_id.is_none() {
735            return Err(SteamError::NotLoggedOn);
736        }
737
738        info!("Logging off from Steam");
739        self.auth.write().logging_off = true;
740
741        // Create logoff message first (before borrowing connection)
742        let logoff = CMsgClientLogOff::default();
743        let msg = self.create_proto_message(EMsg::ClientLogOff, &logoff);
744
745        // Send logoff message
746        if let Some(ref mut conn) = self.connection {
747            let _ = conn.send(msg.encode()).await;
748        }
749
750        // Close connection (capture result; always clear state regardless of outcome)
751        let close_result = if let Some(conn) = self.connection.take() { conn.close().await } else { Ok(()) };
752
753        // Clear state
754        self.steam_id = None;
755        *self.account.write() = AccountState::default();
756        self.social.write().friends.clear();
757        self.auth.write().logon_details = None;
758        self.auth.write().session_id = 0;
759
760        self.auth.write().logging_off = false;
761
762        close_result
763    }
764
765    /// Log on with username and password using steam-session.
766    ///
767    /// This method handles the full password authentication flow including:
768    /// - RSA password encryption
769    /// - Starting an auth session with Steam
770    /// - Obtaining a refresh token
771    ///
772    /// If Steam Guard is required, returns `SteamError::SteamGuardRequired`
773    /// with details about what code is needed. Use
774    /// [`submit_steam_guard_code`] to provide the code and complete
775    /// authentication.
776    ///
777    /// On success, emits a `RefreshToken` event with the token that can be
778    /// saved for future logins.
779    ///
780    /// # Example
781    /// ```rust,ignore
782    /// let result = client.log_on_with_password("username", "password", None, None).await;
783    /// match result {
784    ///     Ok(response) => tracing::info!("Logged in as {}", response.steam_id.steam3()),
785    ///     Err(SteamError::SteamGuardRequired { guard_type, email_domain }) => {
786    ///         tracing::info!("Need Steam Guard code: {:?}", guard_type);
787    ///         // Get code from user, then call submit_steam_guard_code
788    ///     }
789    ///     Err(e) => tracing::info!("Login failed: {:?}", e),
790    /// }
791    /// ```
792    pub async fn log_on_with_password(&mut self, account_name: &str, password: &str, steam_guard_code: Option<&str>, machine_auth_token: Option<&str>) -> Result<LogOnResponse, SteamError> {
793        if self.steam_id.is_some() {
794            return Err(SteamError::AlreadyLoggedOn);
795        }
796
797        if self.auth.read().connecting {
798            return Err(SteamError::AlreadyConnecting);
799        }
800
801        debug!("Starting password authentication for {}", account_name);
802        crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
803
804        // Create login session for Steam Client platform
805        let mut session = LoginSession::new(EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient, Some(LoginSessionOptions { machine_friendly_name: Some(format!("DESKTOP-{}", self.rng.gen_u32() % 1000000)), ..Default::default() }));
806
807        // Start authentication with credentials
808        let credentials = CredentialsDetails {
809            account_name: account_name.to_string(),
810            password: password.to_string(),
811            persistence: None,
812            steam_guard_machine_token: machine_auth_token.map(|s| s.to_string()),
813            steam_guard_code: steam_guard_code.map(|s| s.to_string()),
814        };
815
816        let start_result = match session.start_with_credentials(credentials).await {
817            Ok(res) => res,
818            Err(e) => {
819                if let steam_auth::SessionError::SteamError(_, steam_enums::EResult::AccountLoginDeniedThrottle) = e {
820                    crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
821                }
822                return Err(SteamError::SessionError(e));
823            }
824        };
825
826        // Check if action (Steam Guard) is required
827        if start_result.action_required {
828            if let Some(ref actions) = start_result.valid_actions {
829                // Find the best action to report to user
830                // Priority: EmailCode > DeviceCode > DeviceConfirmation
831                let action = actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeEmailCode)).or_else(|| actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeDeviceCode))).or_else(|| actions.first());
832
833                if let Some(action) = action {
834                    debug!("Steam Guard required: {:?}", action.guard_type);
835
836                    // Store session for later submission
837                    self.auth.write().login_session = Some(session);
838
839                    return Err(SteamError::SteamGuardRequired { guard_type: action.guard_type, email_domain: action.detail.clone() });
840                }
841            }
842
843            // Shouldn't happen, but handle gracefully
844            warn!("Action required but no valid actions returned");
845            return Err(SteamError::InvalidCredentials);
846        }
847
848        // No Steam Guard needed, continue polling for result
849        self.complete_password_auth(session, account_name).await
850    }
851
852    /// Submit a Steam Guard code to complete password authentication.
853    ///
854    /// Call this after [`log_on_with_password`] returns
855    /// `SteamError::SteamGuardRequired`.
856    ///
857    /// # Example
858    /// ```rust,ignore
859    /// // After receiving SteamGuardRequired error:
860    /// let code = get_code_from_user();
861    /// let response = client.submit_steam_guard_code(&code).await?;
862    /// tracing::info!("Logged in as {}", response.steam_id.steam3());
863    /// ```
864    pub async fn submit_steam_guard_code(&mut self, code: &str) -> Result<LogOnResponse, SteamError> {
865        let session = self.auth.write().login_session.take().ok_or_else(|| SteamError::Other("No pending login session".to_string()))?;
866
867        let account_name = session.account_name().map(|s| s.to_string()).unwrap_or_default();
868
869        let mut session = session;
870        session.submit_steam_guard_code(code).await?;
871
872        self.complete_password_auth(session, &account_name).await
873    }
874
875    /// Complete password authentication by polling for tokens.
876    async fn complete_password_auth(&mut self, mut session: LoginSession, account_name: &str) -> Result<LogOnResponse, SteamError> {
877        // Poll for authentication result
878        let poll_result = loop {
879            match session.poll().await? {
880                Some(result) => break result,
881                None => {
882                    // Wait before polling again
883                    let interval = session.poll_interval();
884                    self.clock.sleep(Duration::from_secs_f32(interval)).await;
885                }
886            }
887        };
888
889        debug!("Password authentication successful, got refresh token");
890        debug!("Refresh token account: {}", poll_result.account_name);
891
892        // Store machine token if we got one
893        if let Some(ref _guard_data) = poll_result.new_guard_data {
894            debug!("Received new Steam Guard machine token");
895            // Store for future use (could be saved to disk by the user)
896            self.auth.write().login_session = Some(session);
897        }
898
899        // Queue RefreshToken event
900        self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::RefreshToken { token: poll_result.refresh_token.clone(), account_name: poll_result.account_name.clone() }));
901
902        // Now log on with the refresh token
903        self.log_on(LogOnDetails {
904            refresh_token: Some(poll_result.refresh_token),
905            account_name: Some(account_name.to_string()),
906            ..Default::default()
907        })
908        .await
909    }
910
911    /// Log on to Steam using web session cookies.
912    ///
913    /// The cookies themselves are never sent to the CM. Instead, they are used
914    /// to mint a short-lived web logon token via Steam's
915    /// `/chat/clientjstoken` endpoint, and that token (together with the
916    /// account name and SteamID returned by the endpoint) is what authenticates
917    /// the CM logon.
918    ///
919    /// # Arguments
920    /// * `cookies` - Web session cookies for steamcommunity.com (must contain
921    ///   at least `steamLoginSecure`, e.g.
922    ///   `"sessionid=xxx; steamLoginSecure=yyy"`).
923    ///
924    /// # Errors
925    /// - `SteamError::InvalidCredentials` if the cookies are not logged in or
926    ///   the endpoint returns no token.
927    /// - `SteamError::NetworkError` / `SteamError::ProtocolError` on HTTP or
928    ///   parsing failures.
929    ///
930    /// # Example
931    /// ```rust,ignore
932    /// let cookies = "sessionid=...; steamLoginSecure=...";
933    /// let response = client.log_on_with_cookies(cookies).await?;
934    /// tracing::info!("Logged in as {}", response.steam_id);
935    /// ```
936    pub async fn log_on_with_cookies(&mut self, cookies: &str) -> Result<LogOnResponse, SteamError> {
937        if self.steam_id.is_some() {
938            return Err(SteamError::AlreadyLoggedOn);
939        }
940        if self.auth.read().connecting {
941            return Err(SteamError::AlreadyConnecting);
942        }
943
944        crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
945
946        let (web_logon_token, account_name, steam_id) = fetch_client_js_token(self.http_client.as_ref(), cookies).await?;
947
948        debug!("Minted web logon token for {} ({})", account_name, steam_id);
949
950        self.log_on(LogOnDetails {
951            web_logon_token: Some(web_logon_token),
952            account_name: Some(account_name),
953            steam_id: Some(steam_id),
954            ..Default::default()
955        })
956        .await
957    }
958
959    /// Force reset the connecting state.
960    /// Use this if a connection attempt is cancelled externally (e.g. by a
961    /// timeout).
962    pub fn reset_connecting_state(&mut self) {
963        self.auth.write().connecting = false;
964    }
965
966    /// Get web session cookies for Steam web APIs.
967    ///
968    /// This method fetches cookies that can be used to authenticate with Steam
969    /// web services like steamcommunity.com and store.steampowered.com. It
970    /// requires a valid login session with a refresh token (not available
971    /// for web_logon_token or anonymous logins).
972    ///
973    /// On success, this also queues a `WebSession` event with the session data.
974    ///
975    /// # Returns
976    /// - `Ok((session_id, cookies))` - The session ID and cookies for web
977    ///   authentication.
978    /// - `Err(SteamError::NotLoggedOn)` - If not logged in.
979    /// - `Err(SteamError::Other)` - If no login session is available (e.g.,
980    ///   web_logon_token login).
981    ///
982    /// # Example
983    /// ```rust,ignore
984    /// let (session_id, cookies) = client.get_web_session().await?;
985    /// tracing::info!("Session ID: {}", session_id);
986    /// for cookie in &cookies {
987    ///     tracing::info!("Cookie: {}", cookie);
988    /// }
989    /// ```
990    pub async fn get_web_session(&mut self) -> Result<(String, Vec<String>), SteamError> {
991        if self.steam_id.is_none() {
992            return Err(SteamError::NotLoggedOn);
993        }
994
995        // Take the session out (we can't hold the write guard across the HTTP
996        // `.await`), run the call, then put it back.
997        let mut session = self.auth.write().login_session.take().ok_or_else(|| SteamError::Other("No login session available. Web session requires refresh token login.".to_string()))?;
998
999        let cookies_result = session.get_web_cookies().await;
1000        self.auth.write().login_session = Some(session);
1001        let cookies = cookies_result.map_err(|e| SteamError::Other(format!("Failed to get web cookies: {}", e)))?;
1002
1003        // Extract session ID from cookies
1004        let session_id = cookies.iter().find(|c| c.starts_with("sessionid=")).and_then(|c| c.strip_prefix("sessionid=")).and_then(|c| c.split(';').next()).map(|s| s.to_string()).unwrap_or_else(|| {
1005            // Generate a session ID if not in cookies
1006            format!("{:x}", self.rng.gen_u64())
1007        });
1008
1009        // Queue WebSession event
1010        self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::WebSession { session_id: session_id.clone(), cookies: cookies.clone() }));
1011
1012        Ok((session_id, cookies))
1013    }
1014
1015    /// Helper to send a binary message.
1016    pub(crate) async fn send_binary_message(&mut self, msg_type: EMsg, body: &[u8]) -> Result<(), SteamError> {
1017        use crate::protocol::{ExtendedMessageHeader, MessageHeader, SteamMessage};
1018
1019        let header = ExtendedMessageHeader {
1020            header_size: 36,
1021            header_version: 2,
1022            target_job_id: u64::MAX,
1023            source_job_id: u64::MAX,
1024            header_canary: 239,
1025            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1026            session_id: self.auth.read().session_id,
1027        };
1028
1029        let msg = SteamMessage {
1030            msg: msg_type,
1031            is_proto: false,
1032            header: MessageHeader::Extended(header),
1033            body: bytes::Bytes::copy_from_slice(body),
1034        };
1035
1036        if let Some(ref mut conn) = self.connection {
1037            conn.send(msg.encode()).await?;
1038        }
1039
1040        Ok(())
1041    }
1042
1043    /// Helper to send a protobuf message.
1044    pub(crate) async fn send_message<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<(), SteamError> {
1045        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1046
1047        let header = ProtobufMessageHeader {
1048            header_length: 0,
1049            session_id: self.auth.read().session_id,
1050            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1051            job_id_source: u64::MAX,
1052            job_id_target: u64::MAX,
1053            target_job_name: None,
1054            routing_appid: None,
1055        };
1056
1057        let msg = SteamMessage::new_proto(msg_type, header, body);
1058
1059        if let Some(ref mut conn) = self.connection {
1060            conn.send(msg.encode()).await?;
1061        }
1062
1063        Ok(())
1064    }
1065
1066    /// Helper to send a protobuf message with routing app ID.
1067    pub(crate) async fn send_message_with_routing<T: Message>(&mut self, msg_type: EMsg, routing_appid: u32, body: &T) -> Result<(), SteamError> {
1068        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1069
1070        let header = ProtobufMessageHeader {
1071            header_length: 0,
1072            session_id: self.auth.read().session_id,
1073            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1074            job_id_source: u64::MAX,
1075            job_id_target: u64::MAX,
1076            target_job_name: None,
1077            routing_appid: Some(routing_appid),
1078        };
1079
1080        let msg = SteamMessage::new_proto(msg_type, header, body);
1081
1082        if let Some(ref mut conn) = self.connection {
1083            conn.send(msg.encode()).await?;
1084        }
1085
1086        Ok(())
1087    }
1088
1089    /// Helper to send a protobuf message with job tracking for request-response
1090    /// correlation.
1091    ///
1092    /// Returns a receiver that will receive the response body when Steam
1093    /// replies. The job ID is automatically set in the message header.
1094    pub(crate) async fn send_message_with_job<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<tokio::sync::oneshot::Receiver<crate::internal::jobs::JobResponse>, SteamError> {
1095        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
1096
1097        // Create a job to track this request
1098        let (job_id, response_rx) = self.job_manager.create_job().await;
1099        info!("[SteamClient] send_message_with_job: Created JobID={} for EMsg::{:?}", job_id, msg_type);
1100
1101        let header = ProtobufMessageHeader {
1102            header_length: 0,
1103            session_id: self.auth.read().session_id,
1104            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
1105            job_id_source: job_id,
1106            job_id_target: u64::MAX,
1107            target_job_name: None,
1108            routing_appid: None,
1109        };
1110
1111        let msg = SteamMessage::new_proto(msg_type, header, body);
1112
1113        if let Some(ref mut conn) = self.connection {
1114            conn.send(msg.encode()).await?;
1115        }
1116
1117        Ok(response_rx)
1118    }
1119
1120    /// Process a queued message.
1121    async fn process_queued_message(&mut self, msg: QueuedMessage) {
1122        info!("[SteamClient] process_queued_message: Processing message to {}", msg.friend);
1123
1124        // Match JS behavior: escape [ chars if contains_bbcode
1125        let processed_message = if msg.contains_bbcode { msg.message.replace('[', "\\[") } else { msg.message.clone() };
1126
1127        debug!("[SteamClient] process_queued_message: Original len={}, Processed len={}", msg.message.len(), processed_message.len());
1128
1129        let request = steam_protos::CFriendMessagesSendMessageRequest {
1130            steamid: Some(msg.friend.steam_id64()),
1131            chat_entry_type: Some(msg.entry_type as i32),
1132            message: Some(processed_message),
1133            contains_bbcode: Some(msg.contains_bbcode),
1134            ..Default::default()
1135        };
1136
1137        info!("[SteamClient] process_queued_message: Sending FriendMessages.SendMessage#1 to Steam");
1138
1139        // Send the request and get a job receiver
1140        match self.send_service_method_with_job("FriendMessages.SendMessage#1", &request).await {
1141            Ok(rx) => {
1142                info!("[SteamClient] process_queued_message: Request sent, waiting for response...");
1143                let friend_id = msg.friend;
1144                let original_message = msg.message.clone();
1145
1146                // Spawn task into JoinSet with timeout and tracing span
1147                self.chat.tasks.spawn(
1148                    async move {
1149                        const CHAT_TIMEOUT_SECS: u64 = 30;
1150
1151                        debug!("[SteamClient] chat_task: Waiting for response (timeout: {}s)", CHAT_TIMEOUT_SECS);
1152
1153                        let res = match tokio::time::timeout(std::time::Duration::from_secs(CHAT_TIMEOUT_SECS), rx).await {
1154                            Ok(Ok(crate::internal::jobs::JobResponse::Success(body))) => {
1155                                info!("[SteamClient] chat_task: Received Success response, body len={}", body.len());
1156                                steam_protos::CFriendMessagesSendMessageResponse::decode(&body[..])
1157                                    .map(|response| {
1158                                        info!("[SteamClient] chat_task: Decoded response - server_ts={}, ordinal={}", response.server_timestamp.unwrap_or(0), response.ordinal.unwrap_or(0));
1159                                        crate::services::chat::SendMessageResult {
1160                                            modified_message: response.modified_message.unwrap_or(original_message),
1161                                            server_timestamp: response.server_timestamp.unwrap_or(0),
1162                                            ordinal: response.ordinal.unwrap_or(0),
1163                                        }
1164                                    })
1165                                    .map_err(|e| {
1166                                        error!("[SteamClient] chat_task: Failed to decode response: {:?}", e);
1167                                        SteamError::DeserializationFailed
1168                                    })
1169                            }
1170                            Ok(Ok(crate::internal::jobs::JobResponse::Timeout)) => {
1171                                error!("[SteamClient] chat_task: Job response timeout (Steam didn't respond)");
1172                                Err(SteamError::ResponseTimeout)
1173                            }
1174                            Ok(Ok(crate::internal::jobs::JobResponse::Error(e))) => {
1175                                error!("[SteamClient] chat_task: Job response error: {}", e);
1176                                Err(SteamError::ProtocolError(e))
1177                            }
1178                            Ok(Err(e)) => {
1179                                error!("[SteamClient] chat_task: Job channel closed: {:?}", e);
1180                                Err(SteamError::Other("Job response channel closed".into()))
1181                            }
1182                            Err(_) => {
1183                                // Timeout elapsed
1184                                error!("[SteamClient] chat_task: Tokio timeout elapsed after {}s", CHAT_TIMEOUT_SECS);
1185                                Err(SteamError::ResponseTimeout)
1186                            }
1187                        };
1188
1189                        match &res {
1190                            Ok(r) => info!("[SteamClient] chat_task: Final result SUCCESS - ts={}", r.server_timestamp),
1191                            Err(e) => error!("[SteamClient] chat_task: Final result ERROR - {:?}", e),
1192                        }
1193
1194                        (msg, res)
1195                    }
1196                    .instrument(info_span!("chat_send", friend = %friend_id)),
1197                );
1198            }
1199            Err(e) => {
1200                // Failed to even send the request (maybe disconnected)
1201                error!("[SteamClient] process_queued_message: Failed to send request to Steam: {:?}", e);
1202                let _ = msg.respond_to.send(Err(e));
1203            }
1204        }
1205    }
1206
1207    /// Send a unified service method request and wait for the response.
1208    ///
1209    /// This handles the common pattern of sending a request with job tracking,
1210    /// waiting for the response, decoding it, and mapping errors.
1211    pub async fn send_unified_request_and_wait<T, R>(&mut self, method: &str, request: &T) -> Result<R, SteamError>
1212    where
1213        T: prost::Message,
1214        R: prost::Message + Default,
1215    {
1216        use crate::internal::jobs::JobResponse;
1217
1218        // Send with job tracking
1219        let response_rx = self.send_service_method_with_job(method, request).await?;
1220
1221        // Wait for response
1222        match response_rx.await {
1223            Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1224            Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1225            Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1226            Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1227        }
1228    }
1229
1230    /// Send a protobuf request and wait for the response.
1231    ///
1232    /// This handles the common pattern of sending a request with job tracking,
1233    /// waiting for the response, decoding it, and mapping errors.
1234    pub async fn send_request_and_wait<T, R>(&mut self, emsg: EMsg, request: &T) -> Result<R, SteamError>
1235    where
1236        T: prost::Message,
1237        R: prost::Message + Default,
1238    {
1239        use crate::internal::jobs::JobResponse;
1240
1241        // Send with job tracking
1242        let response_rx = self.send_message_with_job(emsg, request).await?;
1243
1244        // Wait for response
1245        match response_rx.await {
1246            Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
1247            Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
1248            Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
1249            Err(_) => Err(SteamError::Other("Job response channel closed".into())),
1250        }
1251    }
1252
1253    /// Poll for incoming events.
1254    ///
1255    /// This method receives messages from the Steam connection and returns
1256    /// decoded events. Call this in a loop to process incoming messages.
1257    /// It also handles automatic reconnection when disconnected.
1258    ///
1259    /// # Example
1260    /// ```rust,ignore
1261    /// loop {
1262    ///     if let Some(event) = client.poll_event().await? {
1263    ///         match event {
1264    ///             SteamEvent::FriendMessage { sender, message, .. } => {
1265    ///                 tracing::info!("{} says: {}", sender, message);
1266    ///             }
1267    ///             SteamEvent::PersonaState(persona) => {
1268    ///                 tracing::info!("{} is now {:?}", persona.player_name, persona.persona_state);
1269    ///             }
1270    ///             SteamEvent::ReconnectAttempt { attempt, max_attempts, .. } => {
1271    ///                 tracing::info!("Reconnecting... attempt {}/{}", attempt, max_attempts);
1272    ///             }
1273    ///             SteamEvent::Disconnected { will_reconnect, .. } => {
1274    ///                 if !will_reconnect {
1275    ///                     break; // Exit loop on permanent disconnect
1276    ///                 }
1277    ///             }
1278    ///             _ => {}
1279    ///         }
1280    ///     }
1281    /// }
1282    /// ```
1283    pub async fn poll_event(&mut self) -> Result<Option<super::events::SteamEvent>, SteamError> {
1284        use super::events::{ConnectionEvent, SteamEvent};
1285
1286        // First check if we have queued events from a previous Multi message
1287        if let Some(mut event) = self.event_queue.pop_front() {
1288            self.handle_event(&mut event);
1289            self.post_process_event(&event).await;
1290            return Ok(Some(event));
1291        }
1292
1293        // Cleanup expired jobs
1294        self.job_manager.cleanup_expired().await;
1295
1296        // Check if we should attempt reconnection
1297        if self.reconnect_manager.is_reconnecting() {
1298            if let Some(attempt) = self.reconnect_manager.check_ready() {
1299                let delay = self.reconnect_manager.current_delay();
1300                let max_attempts = self.reconnect_manager.max_attempts();
1301
1302                // Queue the reconnect attempt event
1303                self.event_queue.push_back(SteamEvent::Connection(ConnectionEvent::ReconnectAttempt { attempt, max_attempts, delay }));
1304
1305                // Attempt reconnection
1306                match self.attempt_reconnect().await {
1307                    Ok(()) => {
1308                        // Successful reconnection - reset manager and continue
1309                        self.reconnect_manager.record_success();
1310                        info!("Reconnection successful");
1311
1312                        // Restore session state
1313                        if let Err(e) = self.restore_session_state().await {
1314                            warn!("Failed to restore session state: {}", e);
1315                        }
1316                    }
1317                    Err(e) => {
1318                        // Failed reconnection attempt
1319                        warn!("Reconnection attempt {} failed: {:?}", attempt, e);
1320
1321                        let server = self.auth.write().last_cm_server.take();
1322                        let should_continue = self.reconnect_manager.record_failure(server.as_ref());
1323
1324                        if !should_continue {
1325                            // Max attempts reached
1326                            let reason = self.reconnect_manager.last_disconnect_reason();
1327                            let attempts = self.reconnect_manager.attempt();
1328
1329                            return Ok(Some(SteamEvent::Connection(ConnectionEvent::ReconnectFailed { reason, attempts })));
1330                        }
1331                    }
1332                }
1333            } else if let Some(wait_time) = self.reconnect_manager.time_until_next_attempt() {
1334                // Still waiting for backoff - sleep briefly and return no event
1335                // We don't want to block the full backoff period, so use a short sleep
1336                let sleep_time = wait_time.min(Duration::from_millis(100));
1337                self.clock.sleep(sleep_time).await;
1338            }
1339        }
1340
1341        // Process queued events first
1342        if let Some(mut event) = self.event_queue.pop_front() {
1343            self.handle_event(&mut event);
1344            self.post_process_event(&event).await;
1345            return Ok(Some(event));
1346        }
1347
1348        loop {
1349            // Check retry message first (priority over network?)
1350            // We check it at start of loop.
1351            if let Some(msg) = self.chat.pending_retry_message.take() {
1352                let rate_limited = self.chat.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1353
1354                if !rate_limited {
1355                    self.process_queued_message(msg).await;
1356                    // Loop again to process effects or next
1357                } else {
1358                    // Put it back
1359                    self.chat.pending_retry_message = Some(msg);
1360                }
1361            }
1362
1363            // Determine if we can process queue
1364            let rate_limited = self.chat.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
1365            let can_process_queue = !rate_limited && self.chat.pending_retry_message.is_none();
1366
1367            // Check connection
1368            if self.connection.is_none() {
1369                return Ok(None);
1370            }
1371
1372            // Calculate timeout for recv (heartbeat)
1373            let now = self.clock.now();
1374            let hb_timeout = self.heartbeat_manager.time_until_next_heartbeat(now);
1375
1376            // Check if immediate heartbeat needed
1377            if let Some(timeout) = hb_timeout {
1378                if timeout == Duration::ZERO {
1379                    debug!("Sending heartbeat");
1380                    let msg = crate::internal::heartbeat::HeartbeatManager::build_heartbeat_message(self.auth.read().session_id, self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0));
1381
1382                    // We wrap send in block to limit borrow scope
1383                    let result = {
1384                        if let Some(conn) = self.connection.as_mut() {
1385                            conn.send(msg.encode()).await
1386                        } else {
1387                            Err(SteamError::NotConnected)
1388                        }
1389                    };
1390
1391                    if let Err(e) = result {
1392                        warn!("Failed to send heartbeat: {}", e);
1393                        // Connection likely dead
1394                        let reason = match &e {
1395                            SteamError::SteamResult(r) => Some(*r),
1396                            _ => Some(EResult::NoConnection),
1397                        };
1398                        return self.handle_connection_close(reason);
1399                    }
1400
1401                    self.heartbeat_manager.record_heartbeat(self.clock.now());
1402                    continue;
1403                }
1404            }
1405
1406            enum PollResult {
1407                Packet(Option<bytes::Bytes>),
1408                Queue(QueuedMessage),
1409                ChatResponse(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>),
1410                BackgroundResult(u64, Result<Bytes, SteamError>),
1411                Timeout,
1412            }
1413
1414            let poll_res: Result<PollResult, SteamError> = {
1415                let conn = self.connection.as_mut().unwrap();
1416                let queue = &mut self.chat.rx;
1417
1418                let recv_fut = conn.recv();
1419
1420                // Determine timeout duration
1421                // We want to wake up for:
1422                // 1. Next heartbeat
1423                // 2. Next rate limit expiration (if we have a pending retry or want to process
1424                //    queue)
1425                let rate_limit_timeout = self.chat.rate_limit_until.map(|t| {
1426                    let now = self.clock.now();
1427                    if t > now {
1428                        t - now
1429                    } else {
1430                        Duration::ZERO
1431                    }
1432                });
1433
1434                let sleep_duration = match (hb_timeout, rate_limit_timeout) {
1435                    (Some(hb), Some(rl)) => Some(hb.min(rl)),
1436                    (Some(hb), None) => Some(hb),
1437                    (None, Some(rl)) => Some(rl),
1438                    (None, None) => None,
1439                };
1440
1441                tokio::select! {
1442                    res = recv_fut => {
1443                        Ok(PollResult::Packet(res?))
1444                    }
1445                    Some(msg) = queue.recv(), if can_process_queue => {
1446                        Ok(PollResult::Queue(msg))
1447                    }
1448                    Some(join_result) = self.chat.tasks.join_next() => {
1449                        match join_result {
1450                            Ok((msg, res)) => Ok(PollResult::ChatResponse(msg, res)),
1451                            Err(e) => {
1452                                // Task panicked or was cancelled
1453                                warn!("Chat task failed: {:?}", e);
1454                                Ok(PollResult::Timeout) // Treat as a non-event
1455                            }
1456                        }
1457                    }
1458                    res = self.background_job_results_rx.recv() => {
1459                        match res {
1460                            Some((job_id, result)) => Ok(PollResult::BackgroundResult(job_id, result)),
1461                            None => Ok(PollResult::Timeout),
1462                        }
1463                    }
1464                    _ = async {
1465                         if let Some(d) = sleep_duration {
1466                             tokio::time::sleep(d).await;
1467                         } else {
1468                             std::future::pending::<()>().await;
1469                         }
1470                    } => {
1471                        Ok(PollResult::Timeout)
1472                    }
1473                }
1474            };
1475
1476            match poll_res {
1477                Ok(PollResult::Packet(Some(data))) => {
1478                    // Decode the message(s)
1479                    let messages = super::events::MessageHandler::decode_packet(&data);
1480
1481                    for decoded in messages {
1482                        // Complete any pending job if this is a response
1483                        if let Some(job_id) = decoded.job_id_target {
1484                            info!("[SteamClient] poll_event: Completing job {} via job_manager", job_id);
1485                            self.job_manager.complete_job_success(job_id, decoded.body).await;
1486                        }
1487
1488                        // Check for GC events and complete pending GC jobs
1489                        for event in &decoded.events {
1490                            if let super::events::SteamEvent::Apps(super::events::AppsEvent::GCReceived(gc_msg)) = event {
1491                                self.gc_jobs.try_complete(gc_msg.appid, gc_msg.msg_type, gc_msg.payload.clone());
1492                            }
1493                        }
1494
1495                        // Queue all events
1496                        self.event_queue.extend(decoded.events);
1497                    }
1498
1499                    // Return the first event if available
1500                    if let Some(mut event) = self.event_queue.pop_front() {
1501                        self.handle_event(&mut event);
1502                        self.post_process_event(&event).await;
1503                        return Ok(Some(event));
1504                    }
1505                }
1506                Ok(PollResult::Packet(None)) => {
1507                    // Connection closed
1508                    return self.handle_connection_close(None);
1509                }
1510                Ok(PollResult::Queue(msg)) => {
1511                    info!("[SteamClient] poll_event: Dequeued message from chat_queue_rx for {}", msg.friend);
1512                    self.process_queued_message(msg).await;
1513                    // Loop to process next
1514                }
1515                Ok(PollResult::ChatResponse(msg, result)) => {
1516                    info!("[SteamClient] poll_event: Received ChatResponse for {}", msg.friend);
1517                    match result {
1518                        Ok(res) => {
1519                            info!("[SteamClient] poll_event: ChatResponse SUCCESS - server_ts={}", res.server_timestamp);
1520                            let _ = msg.respond_to.send(Ok(res));
1521                        }
1522                        Err(SteamError::SteamResult(steam_enums::EResult::RateLimitExceeded)) => {
1523                            warn!("[SteamClient] poll_event: Rate limit exceeded for chat message, backing off for 60 seconds");
1524                            crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "RateLimitExceeded");
1525                            self.chat.rate_limit_until = Some(self.clock.now() + std::time::Duration::from_secs(60));
1526                            self.chat.pending_retry_message = Some(msg);
1527                        }
1528                        Err(e) => {
1529                            error!("[SteamClient] poll_event: ChatResponse ERROR - {:?}", e);
1530                            let _ = msg.respond_to.send(Err(e));
1531                        }
1532                    }
1533                }
1534                Ok(PollResult::BackgroundResult(job_id, result)) => {
1535                    debug!("[SteamClient] poll_event: Received BackgroundResult for job {}", job_id);
1536                    if let Some(task) = self.background_job_tasks.remove(&job_id) {
1537                        match task {
1538                            BackgroundTask::FriendsList => match result {
1539                                Ok(body) => {
1540                                    debug!("[SteamClient] poll_event: Decoding FriendsList response (len={})", body.len());
1541                                    match steam_protos::CFriendsListGetFriendsListResponse::decode(&body[..]) {
1542                                        Ok(response) => {
1543                                            debug!("[SteamClient] poll_event: Successfully decoded FriendsList, processing...");
1544                                            self.handle_friends_list_unified_response(response).await;
1545                                        }
1546                                        Err(e) => {
1547                                            error!("[SteamClient] poll_event: Failed to decode FriendsList response: {:?}", e);
1548                                        }
1549                                    }
1550                                }
1551                                Err(e) => {
1552                                    error!("[SteamClient] poll_event: Background FriendsList job failed: {:?}", e);
1553                                }
1554                            },
1555                            BackgroundTask::OfflineMessages(friend_id) => match result {
1556                                Ok(body) => {
1557                                    debug!("[SteamClient] poll_event: Decoding OfflineMessages response for {} (len={})", friend_id, body.len());
1558                                    match steam_protos::CFriendMessagesGetRecentMessagesResponse::decode(&body[..]) {
1559                                        Ok(response) => {
1560                                            debug!("[SteamClient] poll_event: Successfully decoded OfflineMessages, processing...");
1561                                            let last_view_timestamp = self.social.read().chat_last_view.get(&friend_id).copied().unwrap_or(0);
1562
1563                                            let messages = response
1564                                                .messages
1565                                                .into_iter()
1566                                                .map(|m| crate::services::chat::HistoryMessage {
1567                                                    sender: SteamID::from_steam_id64(m.accountid.unwrap_or(0) as u64),
1568                                                    timestamp: m.timestamp.unwrap_or(0),
1569                                                    ordinal: m.ordinal.unwrap_or(0),
1570                                                    message: m.message.unwrap_or_default(),
1571                                                    unread: m.timestamp.unwrap_or(0) > last_view_timestamp,
1572                                                })
1573                                                .collect();
1574
1575                                            self.event_queue.push_back(crate::client::events::SteamEvent::Chat(crate::client::events::ChatEvent::OfflineMessagesFetched { friend_id, messages }));
1576                                        }
1577                                        Err(e) => {
1578                                            error!("[SteamClient] poll_event: Failed to decode OfflineMessages response: {:?}", e);
1579                                        }
1580                                    }
1581                                }
1582                                Err(e) => {
1583                                    error!("[SteamClient] poll_event: Background OfflineMessages job failed for {}: {:?}", friend_id, e);
1584                                }
1585                            },
1586                        }
1587
1588                        // Check if the background task added events to the queue (e.g., FriendsList
1589                        // event)
1590                        if let Some(mut event) = self.event_queue.pop_front() {
1591                            self.handle_event(&mut event);
1592                            self.post_process_event(&event).await;
1593                            return Ok(Some(event));
1594                        }
1595                    } else {
1596                        warn!("[SteamClient] poll_event: Received BackgroundResult for unknown job {}", job_id);
1597                    }
1598                }
1599                Ok(PollResult::Timeout) => {
1600                    // Heartbeat deadline reached, loop will handle it
1601                }
1602                Err(e) => {
1603                    // Connection error
1604                    let reason = match &e {
1605                        SteamError::SteamResult(r) => Some(*r),
1606                        _ => Some(EResult::NoConnection),
1607                    };
1608                    return self.handle_connection_close(reason);
1609                }
1610            }
1611        }
1612    }
1613
1614    /// Handle a connection close and decide whether to reconnect.
1615    fn handle_connection_close(&mut self, reason: Option<EResult>) -> Result<Option<super::events::SteamEvent>, SteamError> {
1616        use super::events::{ConnectionEvent, SteamEvent};
1617
1618        debug!("Handling connection close, reason: {:?}", reason);
1619
1620        // Clean up connection state
1621        self.connection = None;
1622        self.auth.write().connecting = false;
1623
1624        // Determine if we should reconnect
1625        let should_reconnect = !self.auth.read().logging_off && self.options.auto_relogin && reason.map(|r| self.reconnect_manager.should_reconnect(r)).unwrap_or(true);
1626
1627        if should_reconnect && self.auth.read().logon_details.is_some() {
1628            // Start reconnection sequence
1629            let eresult = reason.unwrap_or(EResult::NoConnection);
1630            self.reconnect_manager.start_reconnection(eresult);
1631
1632            // Blacklist the last server if available
1633            if let Some(server) = self.auth.read().last_cm_server.clone() {
1634                self.reconnect_manager.blacklist_server(&server.endpoint);
1635            }
1636
1637            info!("Connection lost, will attempt reconnection");
1638        } else {
1639            // Not reconnecting - clear state
1640            self.steam_id = None;
1641            self.reconnect_manager.reset();
1642            self.auth.write().relogging = false;
1643            self.auth.write().logging_off = false;
1644        }
1645
1646        Ok(Some(SteamEvent::Connection(ConnectionEvent::Disconnected { reason, will_reconnect: should_reconnect })))
1647    }
1648
1649    /// Restore session state (games played, rich presence, etc.) after
1650    /// reconnection.
1651    async fn restore_session_state(&mut self) -> Result<(), SteamError> {
1652        info!("Restoring session state...");
1653
1654        // Restore games played
1655        let app_ids = self.session_recovery.last_playing_app_ids.clone();
1656        if !app_ids.is_empty() {
1657            debug!("Restoring games played: {:?}", app_ids);
1658            self.games_played_with_extra(app_ids, self.session_recovery.last_custom_game_name.clone()).await?;
1659        }
1660
1661        // Restore persona state
1662        if let Some(state) = self.session_recovery.last_persona_state {
1663            debug!("Restoring persona state: {:?}", state);
1664            self.set_persona(state, self.session_recovery.last_player_name.clone()).await?;
1665        }
1666
1667        // Restore rich presence
1668        let rp_data = self.session_recovery.last_rich_presence.clone();
1669        for (app_id, data) in rp_data {
1670            debug!("Restoring rich presence for app {}", app_id);
1671            self.upload_rich_presence(app_id, &data).await?;
1672        }
1673
1674        Ok(())
1675    }
1676
1677    /// Attempt to reconnect to Steam.
1678    async fn attempt_reconnect(&mut self) -> Result<(), SteamError> {
1679        // Get stored logon details
1680        let details = self.auth.read().logon_details.clone().ok_or_else(|| SteamError::Other("No logon details available for reconnection".to_string()))?;
1681
1682        debug!("Attempting reconnection with stored credentials");
1683
1684        // Get CM server list
1685        let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
1686        let server = provider.get_server().await?;
1687        self.auth.write().last_cm_server = Some(server.clone());
1688
1689        info!("Reconnecting to CM server: {}", server.endpoint);
1690
1691        // Connect to CM
1692        let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
1693        debug!("WebSocket connection established");
1694
1695        // Generate new session ID
1696        self.auth.write().session_id = (self.rng.gen_u32() & 0x7FFF_FFFF) as i32;
1697
1698        // Send ClientHello (only if not using refresh token)
1699        if details.refresh_token.is_none() {
1700            let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
1701            let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
1702            connection.send(hello_msg.encode()).await?;
1703        }
1704
1705        // Determine login type
1706        let is_anonymous = details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none());
1707
1708        // Build and send ClientLogon
1709        let logon = self.build_logon_message(&details, is_anonymous);
1710        let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
1711        connection.send(logon_msg.encode()).await?;
1712
1713        // Wait for response
1714        let response = self.wait_for_logon_response(&mut *connection).await?;
1715
1716        // Handle response
1717        let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
1718
1719        if eresult != EResult::OK {
1720            return Err(SteamError::SteamResult(eresult));
1721        }
1722
1723        // Success! Update connection state
1724        self.connection = Some(connection);
1725        self.auth.write().connecting = false;
1726
1727        // Update SteamID
1728        if let Some(sid) = response.client_supplied_steamid {
1729            self.steam_id = Some(SteamID::from(sid));
1730        }
1731
1732        {
1733            let mut account = self.account.write();
1734            account.cell_id = response.cell_id;
1735            account.vanity_url = response.vanity_url.clone();
1736
1737            if let Some(ref ip) = response.public_ip {
1738                if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
1739                    account.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
1740                }
1741            }
1742        }
1743
1744        info!("Successfully reconnected");
1745        Ok(())
1746    }
1747
1748    /// Poll all available events.
1749    ///
1750    /// This is useful for processing all events at once, including all
1751    /// sub-messages from Multi messages.
1752    pub async fn poll_events(&mut self) -> Result<Vec<super::events::SteamEvent>, SteamError> {
1753        let mut all_events = Vec::new();
1754
1755        while let Some(event) = self.poll_event().await? {
1756            all_events.push(event);
1757
1758            // If no more queued events, break to avoid blocking
1759            if self.event_queue.is_empty() {
1760                break;
1761            }
1762        }
1763
1764        Ok(all_events)
1765    }
1766
1767    /// Handle an event internally to update client state.
1768    fn handle_event(&mut self, event: &mut super::events::SteamEvent) {
1769        use super::events::{AppsEvent, AuthEvent, ConnectionEvent, FriendsEvent, SteamEvent};
1770
1771        match event {
1772            SteamEvent::Auth(AuthEvent::LoggedOn { steam_id }) => {
1773                self.steam_id = Some(*steam_id);
1774                self.auth.write().connecting = false;
1775                // Reset reconnection state on successful login
1776                self.reconnect_manager.record_success();
1777            }
1778            SteamEvent::Auth(AuthEvent::LoggedOff { .. }) => {
1779                self.steam_id = None;
1780            }
1781            SteamEvent::Auth(AuthEvent::GameConnectTokens { tokens }) => {
1782                self.gc_tokens.extend(tokens.clone());
1783            }
1784            SteamEvent::Friends(FriendsEvent::FriendsList { friends, .. }) => {
1785                for friend in friends {
1786                    if friend.relationship == steam_enums::EFriendRelationship::None {
1787                        self.social.write().friends.remove(&friend.steam_id);
1788                    } else {
1789                        self.social.write().friends.insert(friend.steam_id, friend.relationship as u32);
1790                    }
1791                }
1792            }
1793            SteamEvent::Friends(FriendsEvent::PersonaState(persona)) => {
1794                let mut persona = *persona.clone();
1795                if let Some(game_id) = persona.game_id {
1796                    let app_id = game_id as u32;
1797                    if app_id != 0 {
1798                        // If we don't have the app info, mark it as pending
1799                        if !self.apps.read().info.contains_key(&app_id) {
1800                            self.apps.write().pending.insert(app_id);
1801                        } else if persona.game_name.is_none() || persona.game_name.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
1802                            // If we have the app info but game_name is missing, try to resolve it
1803                            if let Some(name) = self.get_app_name(app_id) {
1804                                persona.game_name = Some(name);
1805                            }
1806                        }
1807                    }
1808                }
1809
1810                // Update the users HashMap
1811                self.social.write().users.entry(persona.steam_id).and_modify(|existing| existing.merge(&persona)).or_insert_with(|| persona.clone());
1812
1813                // Also update the TTL-based persona cache
1814                self.social.read().persona_cache.insert(persona.steam_id, persona);
1815            }
1816            SteamEvent::Apps(AppsEvent::PlayingState { playing_app, blocked }) => {
1817                let mut apps = self.apps.write();
1818                apps.playing_blocked = *blocked;
1819                if *playing_app != 0 {
1820                    if !apps.info.contains_key(playing_app) {
1821                        apps.pending.insert(*playing_app);
1822                    }
1823                    apps.info.entry(*playing_app).or_insert_with(|| super::events::AppInfoData { app_id: *playing_app, change_number: 0, missing_token: false, app_info: None });
1824                }
1825            }
1826            SteamEvent::Apps(AppsEvent::ProductInfoResponse { apps: new_apps, .. }) => {
1827                let mut apps = self.apps.write();
1828                for (app_id, data) in new_apps {
1829                    apps.info.insert(*app_id, data.clone());
1830                    apps.pending.remove(app_id);
1831                }
1832            }
1833            SteamEvent::Connection(ConnectionEvent::Disconnected { will_reconnect, .. }) => {
1834                // Only clear steam_id if not reconnecting
1835                if !*will_reconnect {
1836                    self.steam_id = None;
1837                }
1838                self.connection = None;
1839                self.auth.write().connecting = false;
1840            }
1841            SteamEvent::Connection(ConnectionEvent::ReconnectFailed { .. }) => {
1842                // Clear all state on permanent reconnection failure
1843                self.steam_id = None;
1844                self.connection = None;
1845                self.auth.write().connecting = false;
1846                self.reconnect_manager.reset();
1847            }
1848            SteamEvent::Apps(super::events::AppsEvent::LicenseList { licenses }) => {
1849                self.apps.write().licenses = licenses.clone();
1850            }
1851            _ => {}
1852        }
1853    }
1854
1855    /// Post-process events to trigger automatic actions (like fetching
1856    /// personas).
1857    async fn post_process_event(&mut self, event: &super::events::SteamEvent) {
1858        use super::events::{FriendsEvent, SteamEvent};
1859
1860        if let SteamEvent::Friends(FriendsEvent::FriendsList { incremental: false, .. }) = event {
1861            let friends: Vec<SteamID> = self.social.read().friends.iter().filter(|&(_, &rel)| rel == steam_enums::EFriendRelationship::Friend as u32).map(|(&id, _)| id).collect();
1862
1863            if !friends.is_empty() {
1864                debug!("Auto-fetching personas for {} friends", friends.len());
1865                if let Err(e) = self.get_personas(friends).await {
1866                    warn!("Failed to auto-fetch personas: {}", e);
1867                }
1868            }
1869        }
1870    }
1871
1872    /// Format a user's rich presence into a localized string.
1873    ///
1874    /// # Arguments
1875    /// * `steam_id` - The user's SteamID
1876    /// * `language` - The language for localization (e.g., "english")
1877    pub fn format_rich_presence(&self, steam_id: SteamID, language: &str) -> Option<String> {
1878        // Clone the persona out so we don't hold the social read guard across
1879        // the apps lookup further down.
1880        let user = self.social.read().users.get(&steam_id).cloned()?;
1881        let app_id = user.game_id? as u32;
1882        // Hold the apps read guard for the rest of the (sync) function so we
1883        // can borrow into the VDF without cloning it.
1884        let apps = self.apps.read();
1885        let app_info = apps.info.get(&app_id)?;
1886        let app_vdf = app_info.app_info.as_ref()?;
1887
1888        // Get rich presence localization for the given language
1889        let localization = app_vdf.get("common").and_then(|c| c.get("rich_presence")).and_then(|rp| rp.get("localization")).and_then(|loc| loc.get(language))?;
1890
1891        // Get the display token
1892        let display_token = user.rich_presence.get("steam_display")?;
1893        let mut formatted = localization.get_str(display_token)?.to_string();
1894
1895        // Replace placeholders (e.g., %map%, %status%)
1896        for (key, value) in &user.rich_presence {
1897            if key == "steam_display" {
1898                continue;
1899            }
1900            let placeholder = format!("%{}%", key);
1901            // The value itself might be a token that needs localization
1902            let localized_value = localization.get_str(value).unwrap_or(value);
1903            formatted = formatted.replace(&placeholder, localized_value);
1904        }
1905
1906        Some(formatted)
1907    }
1908
1909    /// Get an app's name from cache.
1910    pub fn get_app_name(&self, app_id: u32) -> Option<String> {
1911        // First check the bundled static AppList (only built with the
1912        // `static-app-list` feature; binary search on a sorted static array).
1913        #[cfg(feature = "static-app-list")]
1914        {
1915            if let Ok(idx) = crate::client::static_app_list::APP_LIST.binary_search_by_key(&app_id, |&(id, _)| id) {
1916                return Some(crate::client::static_app_list::APP_LIST[idx].1.to_string());
1917            }
1918        }
1919
1920        // Then check PICS cache
1921        let apps = self.apps.read();
1922        let info = apps.info.get(&app_id)?;
1923        if let Some(vdf) = &info.app_info {
1924            vdf.get("common").and_then(|c| c.get_str("name")).or_else(|| vdf.get_str("name")).map(|s| s.to_string())
1925        } else {
1926            None
1927        }
1928    }
1929
1930    /// Fetch info for apps that are currently in the pending queue.
1931    ///
1932    /// This is used for proactive caching of app names and rich presence
1933    /// localization.
1934    pub async fn fetch_pending_app_info(&mut self) -> Result<(), SteamError> {
1935        if self.apps.read().pending.is_empty() {
1936            return Ok(());
1937        }
1938
1939        let app_ids: Vec<u32> = self.apps.read().pending.iter().copied().collect();
1940        // Clear pending apps to prevent duplicate requests
1941        self.apps.write().pending.clear();
1942        self.get_product_info(app_ids).await
1943    }
1944
1945    /// Fetch and update active friend message sessions.
1946    ///
1947    /// This updates the local `users` cache with unread counts and last message
1948    /// timestamps from active chat sessions.
1949    pub async fn update_friend_sessions(&mut self) -> Result<(), SteamError> {
1950        let sessions = self.get_active_friend_sessions().await?;
1951
1952        // Acquire one write guard for the whole batch — never held across an .await.
1953        {
1954            let mut social = self.social.write();
1955            for session in sessions {
1956                social
1957                    .users
1958                    .entry(session.friend)
1959                    .and_modify(|user| {
1960                        user.unread_count = session.unread_count;
1961                        user.last_message_time = session.time_last_message;
1962                    })
1963                    .or_insert_with(|| UserPersona {
1964                        steam_id: session.friend,
1965                        unread_count: session.unread_count,
1966                        last_message_time: session.time_last_message,
1967                        ..Default::default()
1968                    });
1969            }
1970        }
1971
1972        Ok(())
1973    }
1974
1975    /// Get the CS:GO service.
1976    pub fn csgo(&mut self) -> crate::services::CSGOClient<'_> {
1977        crate::services::CSGOClient::new(self)
1978    }
1979
1980    /// Helper to send a unified service method call and wait for response.
1981    pub(crate) async fn send_service_method_and_wait<Req: prost::Message, Resp: prost::Message + Default>(&mut self, method: &str, body: &Req) -> Result<Resp, SteamError> {
1982        let rx = self.send_service_method_with_job(method, body).await?;
1983
1984        // Wait for response
1985        let job_response = rx.await.map_err(|_| SteamError::ResponseTimeout)?;
1986
1987        let body = match job_response {
1988            crate::internal::jobs::JobResponse::Success(bytes) => bytes,
1989            crate::internal::jobs::JobResponse::Timeout => return Err(SteamError::ResponseTimeout),
1990            crate::internal::jobs::JobResponse::Error(msg) => return Err(SteamError::ProtocolError(msg)),
1991        };
1992
1993        Resp::decode(&body[..]).map_err(|_| SteamError::DeserializationFailed)
1994    }
1995}
1996
1997//=============================================================================
1998// Stream Implementation for SteamClient
1999//=============================================================================
2000
2001use std::task::{Context, Poll};
2002
2003use futures::Stream;
2004
2005/// A stream of Steam events.
2006///
2007/// Created by calling [`SteamClient::into_stream`].
2008///
2009/// **Note**: This stream only yields queued events synchronously. For full
2010/// async event polling (including network I/O), use [`poll_event`] in a loop
2011/// or with `futures::stream::unfold`.
2012///
2013/// # Example
2014/// ```rust,ignore
2015/// use futures::StreamExt;
2016///
2017/// // Using as a stream (queued events only)
2018/// let mut stream = client.into_stream();
2019///
2020/// // Process queued events
2021/// while let Some(Ok(event)) = stream.next().await {
2022///     tracing::info!("Event: {:?}", event);
2023/// }
2024///
2025/// // For full network I/O, use unfold:
2026/// use futures::stream::unfold;
2027///
2028/// let stream = unfold(client, |mut client| async move {
2029///     match client.poll_event().await {
2030///         Ok(Some(event)) => Some((Ok(event), client)),
2031///         Ok(None) => Some((Ok(SteamEvent::System(SystemEvent::Debug("no event".into()))), client)),
2032///         Err(e) => Some((Err(e), client)),
2033///     }
2034/// });
2035/// ```
2036pub struct SteamEventStream<'a> {
2037    client: &'a mut SteamClient,
2038}
2039
2040impl SteamClient {
2041    /// Convert this client into an event stream for queued events.
2042    ///
2043    /// Returns a `Stream` that yields queued `SteamEvent`s. This is useful
2044    /// for draining the event queue using stream combinators.
2045    ///
2046    /// **Note**: This stream only yields already-queued events. It does not
2047    /// perform network I/O. For full async event polling, use [`poll_event`]
2048    /// in a loop or create a stream with `futures::stream::unfold`.
2049    ///
2050    /// # Example
2051    /// ```rust,ignore
2052    /// use futures::StreamExt;
2053    ///
2054    /// // Drain queued events
2055    /// let mut stream = client.into_stream();
2056    /// while let Some(Ok(event)) = stream.next().await {
2057    ///     handle_event(event);
2058    /// }
2059    /// ```
2060    /// Send a unified service method call in the background.
2061    ///
2062    /// The response will be automatically handled when it arrives by checking
2063    /// the background job tasks in `poll_event`.
2064    pub(crate) async fn send_service_method_background<T: prost::Message>(&mut self, method: &str, body: &T, task: BackgroundTask) -> Result<(), SteamError> {
2065        let (job_id, rx) = self.job_manager.create_job().await;
2066
2067        self.send_service_method_with_job_id(method, body, job_id).await?;
2068
2069        self.background_job_tasks.insert(job_id, task);
2070
2071        let tx = self.background_job_results_tx.clone();
2072        tokio::spawn(async move {
2073            match rx.await {
2074                Ok(crate::internal::jobs::JobResponse::Success(bytes)) => {
2075                    let _ = tx.send((job_id, Ok(bytes))).await;
2076                }
2077                Ok(crate::internal::jobs::JobResponse::Timeout) => {
2078                    let _ = tx.send((job_id, Err(SteamError::Timeout))).await;
2079                }
2080                Ok(crate::internal::jobs::JobResponse::Error(e)) => {
2081                    let _ = tx.send((job_id, Err(SteamError::Other(e)))).await;
2082                }
2083                Err(_) => {
2084                    // Receiver closed
2085                }
2086            }
2087        });
2088
2089        Ok(())
2090    }
2091
2092    /// Helper to send a unified service method call with a specific job ID.
2093    async fn send_service_method_with_job_id<T: prost::Message>(&mut self, method: &str, body: &T, job_id: u64) -> Result<(), SteamError> {
2094        use crate::protocol::{ProtobufMessageHeader, SteamMessage};
2095
2096        let header = ProtobufMessageHeader {
2097            header_length: 0,
2098            session_id: self.auth.read().session_id,
2099            steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
2100            job_id_source: job_id,
2101            job_id_target: u64::MAX,
2102            target_job_name: Some(method.to_string()),
2103            routing_appid: None,
2104        };
2105
2106        let msg = SteamMessage::new_proto(steam_enums::EMsg::ServiceMethodCallFromClient, header, body);
2107
2108        if let Some(ref mut conn) = self.connection {
2109            conn.send(msg.encode()).await?;
2110        } else {
2111            return Err(SteamError::NotConnected);
2112        }
2113
2114        Ok(())
2115    }
2116
2117    pub fn into_stream(&mut self) -> SteamEventStream<'_> {
2118        SteamEventStream { client: self }
2119    }
2120}
2121
2122impl<'a> Stream for SteamEventStream<'a> {
2123    type Item = Result<super::events::SteamEvent, SteamError>;
2124
2125    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2126        // Only yield queued events - does not perform network I/O
2127        // For full async polling, use poll_event() in a loop
2128        if let Some(mut event) = self.client.event_queue.pop_front() {
2129            self.client.handle_event(&mut event);
2130            Poll::Ready(Some(Ok(event)))
2131        } else {
2132            // No more queued events
2133            Poll::Ready(None)
2134        }
2135    }
2136}
2137
2138/// Response shape of `https://steamcommunity.com/chat/clientjstoken`.
2139#[derive(Debug, serde::Deserialize)]
2140struct ClientJsTokenResponse {
2141    #[serde(default)]
2142    logged_in: bool,
2143    #[serde(default)]
2144    steamid: Option<String>,
2145    #[serde(default)]
2146    account_name: Option<String>,
2147    #[serde(default)]
2148    token: Option<String>,
2149}
2150
2151/// Call Steam's `chat/clientjstoken` endpoint using the supplied cookies and
2152/// return `(web_logon_token, account_name, steam_id)`.
2153async fn fetch_client_js_token(http_client: &dyn crate::utils::http::HttpClient, cookies: &str) -> Result<(String, String, SteamID), SteamError> {
2154    let response = http_client.get_with_cookies("https://steamcommunity.com/chat/clientjstoken", cookies).await?;
2155
2156    if !response.is_success() {
2157        return Err(SteamError::Other(format!("clientjstoken HTTP error: {}", response.status)));
2158    }
2159
2160    let body: ClientJsTokenResponse = response.json().map_err(|e| SteamError::ProtocolError(format!("Failed to parse clientjstoken response: {}", e)))?;
2161
2162    if !body.logged_in {
2163        return Err(SteamError::InvalidCredentials);
2164    }
2165
2166    let token = body.token.filter(|s| !s.is_empty()).ok_or_else(|| SteamError::ProtocolError("clientjstoken returned no token".to_string()))?;
2167    let account_name = body.account_name.filter(|s| !s.is_empty()).ok_or_else(|| SteamError::ProtocolError("clientjstoken returned no account_name".to_string()))?;
2168    let steamid_str = body.steamid.filter(|s| !s.is_empty()).ok_or_else(|| SteamError::ProtocolError("clientjstoken returned no steamid".to_string()))?;
2169    let steamid_u64: u64 = steamid_str.parse().map_err(|_| SteamError::ProtocolError(format!("clientjstoken returned malformed steamid: {}", steamid_str)))?;
2170    let steam_id = SteamID::from(steamid_u64);
2171
2172    Ok((token, account_name, steam_id))
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177    use super::*;
2178
2179    fn test_steam_id() -> SteamID {
2180        SteamID::from_steam_id64(76561198000000000)
2181    }
2182
2183    #[test]
2184    fn test_user_persona_merge_rich_presence() {
2185        let mut persona1 = UserPersona {
2186            steam_id: test_steam_id(),
2187            rich_presence: {
2188                let mut map = HashMap::new();
2189                map.insert("status".to_string(), "Online".to_string());
2190                map.insert("game".to_string(), "CS:GO".to_string());
2191                map
2192            },
2193            ..Default::default()
2194        };
2195
2196        let persona2 = UserPersona {
2197            steam_id: test_steam_id(),
2198            rich_presence: {
2199                let mut map = HashMap::new();
2200                map.insert("status".to_string(), "In-Game".to_string());
2201                map.insert("party".to_string(), "Open".to_string());
2202                map
2203            },
2204            ..Default::default()
2205        };
2206
2207        persona1.merge(&persona2);
2208
2209        assert_eq!(persona1.rich_presence.get("status"), Some(&"In-Game".to_string()));
2210        assert_eq!(persona1.rich_presence.get("game"), Some(&"CS:GO".to_string()));
2211        assert_eq!(persona1.rich_presence.get("party"), Some(&"Open".to_string()));
2212    }
2213
2214    #[test]
2215    fn test_user_persona_merge_unread_count() {
2216        let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2217
2218        let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 10, last_message_time: 200, ..Default::default() };
2219
2220        persona1.merge(&persona2);
2221
2222        assert_eq!(persona1.unread_count, 10);
2223        assert_eq!(persona1.last_message_time, 200);
2224    }
2225
2226    #[test]
2227    fn test_user_persona_merge_unread_count_zero() {
2228        let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
2229
2230        // Merging with 0 should NOT overwrite existing values
2231        let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 0, last_message_time: 0, ..Default::default() };
2232
2233        persona1.merge(&persona2);
2234
2235        assert_eq!(persona1.unread_count, 5);
2236        assert_eq!(persona1.last_message_time, 100);
2237    }
2238}