use std::{
collections::{HashMap, VecDeque},
pin::Pin,
sync::Arc,
time::Duration,
};
use chrono::{DateTime, Utc};
use prost::Message;
use steam_auth::{CredentialsDetails, EAuthSessionGuardType, EAuthTokenPlatformType, LoginSession, LoginSessionOptions};
use steam_enums::{EMsg, EPersonaState, EResult};
use steam_protos::{CMsgClientHello, CMsgClientLogOff, CMsgClientLogon, CMsgClientLogonResponse, PROTOCOL_VERSION};
use steamid::SteamID;
use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::{
connection::{CmServer, CmServerProvider, HttpCmServerProvider, SteamConnection, WebSocketConnection},
error::SteamError,
options::SteamOptions,
protocol::SteamMessage,
types::{AccountInfo, EmailInfo, Limitations, LogOnDetails, LogOnResponse, VacStatus, WalletInfo},
};
#[derive(Debug, Clone)]
pub struct UserPersona {
pub steam_id: SteamID,
pub player_name: String,
pub persona_state: EPersonaState,
pub persona_state_flags: u32,
pub avatar_hash: Option<String>,
pub game_name: Option<String>,
pub game_id: Option<u64>,
pub last_logon: Option<DateTime<Utc>>,
pub last_logoff: Option<DateTime<Utc>>,
pub last_seen_online: Option<DateTime<Utc>>,
pub rich_presence: HashMap<String, String>,
pub steam_player_group: Option<String>,
pub rich_presence_status: Option<String>,
pub game_map: Option<String>,
pub game_score: Option<String>,
pub num_players: Option<u32>,
pub unread_count: u32,
pub last_message_time: u32,
}
impl Default for UserPersona {
fn default() -> Self {
Self {
steam_id: SteamID::default(),
player_name: String::new(),
persona_state: EPersonaState::Offline,
persona_state_flags: 0,
avatar_hash: None,
game_name: None,
game_id: None,
last_logon: None,
last_logoff: None,
last_seen_online: None,
rich_presence: HashMap::new(),
steam_player_group: None,
rich_presence_status: None,
game_map: None,
game_score: None,
num_players: None,
unread_count: 0,
last_message_time: 0,
}
}
}
impl UserPersona {
pub fn avatar_url(&self, size: &str) -> Option<String> {
let hash = self.avatar_hash.as_ref()?;
let size_suffix = match size {
"medium" => "_medium.jpg",
"full" => "_full.jpg",
_ => ".jpg",
};
Some(format!("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{}/{}{}", &hash[0..2], hash, size_suffix))
}
pub fn merge(&mut self, other: &UserPersona) {
if !other.player_name.is_empty() {
self.player_name = other.player_name.clone();
}
self.persona_state = other.persona_state;
if other.persona_state_flags != 0 {
self.persona_state_flags = other.persona_state_flags;
}
if other.avatar_hash.is_some() {
self.avatar_hash = other.avatar_hash.clone();
}
if other.game_name.is_some() {
self.game_name = other.game_name.clone();
}
if other.game_id.is_some() && other.game_id != Some(0) {
self.game_id = other.game_id;
}
if other.last_logon.is_some() {
self.last_logon = other.last_logon;
}
if other.last_logoff.is_some() {
self.last_logoff = other.last_logoff;
}
if other.last_seen_online.is_some() {
self.last_seen_online = other.last_seen_online;
}
for (k, v) in &other.rich_presence {
self.rich_presence.insert(k.clone(), v.clone());
}
if other.steam_player_group.is_some() {
self.steam_player_group = other.steam_player_group.clone();
}
if other.rich_presence_status.is_some() {
self.rich_presence_status = other.rich_presence_status.clone();
}
if other.game_map.is_some() {
self.game_map = other.game_map.clone();
}
if other.game_score.is_some() {
self.game_score = other.game_score.clone();
}
if other.unread_count > 0 {
self.unread_count = other.unread_count;
}
if other.last_message_time > 0 {
self.last_message_time = other.last_message_time;
}
}
}
use std::time::Instant;
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BackgroundTask {
FriendsList,
OfflineMessages(SteamID),
}
#[derive(Debug)]
pub(crate) struct QueuedMessage {
pub friend: SteamID,
pub message: String,
pub entry_type: steam_enums::EChatEntryType,
pub contains_bbcode: bool,
pub respond_to: oneshot::Sender<Result<crate::services::chat::SendMessageResult, SteamError>>,
}
const LOGON_TIMEOUT_ITERATIONS: usize = 150;
const LOGON_TIMEOUT_DELAY_MS: u64 = 100;
const HEARTBEAT_INTERVAL_SECONDS: u64 = 30;
pub struct SteamClient {
pub options: SteamOptions,
pub steam_id: Option<SteamID>,
pub public_ip: Option<String>,
pub cell_id: Option<u32>,
pub vanity_url: Option<String>,
pub account_info: Option<AccountInfo>,
pub email_info: Option<EmailInfo>,
pub limitations: Option<Limitations>,
pub vac: Option<VacStatus>,
pub wallet: Option<WalletInfo>,
pub my_friends: HashMap<SteamID, u32>,
pub users: HashMap<SteamID, UserPersona>,
pub apps: HashMap<u32, super::events::AppInfoData>,
pub licenses: Vec<super::events::LicenseEntry>,
pub pending_apps: rustc_hash::FxHashSet<u32>,
pub chat_last_view: HashMap<SteamID, u32>,
pub playing_blocked: bool,
pub playing_app_ids: Vec<u32>,
pub connect_time: u64,
pub connection_count: u32,
pub auth_seq_me: u32,
pub auth_seq_them: u32,
pub h_steam_pipe: u32,
pub gc_tokens: Vec<Vec<u8>>,
pub active_tickets: Vec<crate::services::appauth::AuthSessionTicket>,
pub last_time_party_register: Option<i64>,
pub(crate) chat_queue_tx: mpsc::UnboundedSender<QueuedMessage>,
pub(crate) chat_queue_rx: mpsc::UnboundedReceiver<QueuedMessage>,
pub(crate) rate_limit_until: Option<Instant>,
pub(crate) pending_retry_message: Option<QueuedMessage>,
pub(crate) chat_tasks: tokio::task::JoinSet<(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>)>,
pub(crate) connection: Option<Box<dyn SteamConnection>>,
pub(crate) login_session: Option<LoginSession>,
pub(crate) logon_details: Option<LogOnDetails>,
pub(crate) session_id: i32,
pub(crate) connecting: bool,
pub(crate) logging_off: bool,
pub(crate) temp_steam_id: Option<SteamID>,
pub(crate) event_queue: VecDeque<super::events::SteamEvent>,
pub(crate) reconnect_manager: crate::internal::reconnect::ReconnectManager,
pub(crate) heartbeat_manager: crate::internal::heartbeat::HeartbeatManager,
pub(crate) relogging: bool,
pub(crate) last_cm_server: Option<CmServer>,
pub(crate) job_manager: crate::internal::jobs::JobManager,
pub(crate) gc_jobs: crate::services::gc::GCJobManager,
pub(crate) background_job_tasks: HashMap<u64, BackgroundTask>,
pub(crate) background_job_results_tx: mpsc::Sender<(u64, Result<Bytes, SteamError>)>,
pub(crate) background_job_results_rx: mpsc::Receiver<(u64, Result<Bytes, SteamError>)>,
pub persona_cache: crate::cache::PersonaCache,
pub session_recovery: crate::services::SessionRecovery,
pub(crate) http_client: Arc<dyn crate::utils::http::HttpClient>,
pub(crate) clock: Arc<dyn crate::utils::clock::Clock>,
pub(crate) rng: Arc<dyn crate::utils::rng::Rng>,
}
impl SteamClient {
pub fn builder() -> super::builder::SteamClientBuilder {
super::builder::SteamClientBuilder::new()
}
pub fn new(options: SteamOptions) -> Self {
Self::builder().with_options(options).build()
}
pub(crate) fn with_all_providers(options: SteamOptions, http_client: Arc<dyn crate::utils::http::HttpClient>, clock: Arc<dyn crate::utils::clock::Clock>, rng: Arc<dyn crate::utils::rng::Rng>) -> Self {
let reconnect_config = options.reconnect.clone();
let (chat_queue_tx, chat_queue_rx) = mpsc::unbounded_channel();
let (bg_tx, bg_rx) = mpsc::channel(100);
Self {
options,
steam_id: None,
public_ip: None,
cell_id: None,
vanity_url: None,
account_info: None,
email_info: None,
limitations: None,
vac: None,
wallet: None,
my_friends: HashMap::new(),
users: HashMap::new(),
apps: HashMap::new(),
licenses: Vec::new(),
pending_apps: rustc_hash::FxHashSet::default(),
chat_last_view: HashMap::new(),
playing_blocked: false,
playing_app_ids: Vec::new(),
connect_time: 0,
connection_count: 0,
auth_seq_me: 0,
auth_seq_them: 0,
h_steam_pipe: (rng.gen_u32() % 1000000) + 1,
gc_tokens: Vec::new(),
active_tickets: Vec::new(),
last_time_party_register: None,
chat_queue_tx,
chat_queue_rx,
rate_limit_until: None,
pending_retry_message: None,
chat_tasks: tokio::task::JoinSet::new(),
connection: None,
login_session: None,
logon_details: None,
session_id: 0,
connecting: false,
logging_off: false,
temp_steam_id: None,
event_queue: VecDeque::new(),
reconnect_manager: crate::internal::reconnect::ReconnectManager::new(reconnect_config),
heartbeat_manager: crate::internal::heartbeat::HeartbeatManager::new(HEARTBEAT_INTERVAL_SECONDS),
relogging: false,
last_cm_server: None,
job_manager: crate::internal::jobs::JobManager::new(),
gc_jobs: crate::services::gc::GCJobManager::new(),
background_job_tasks: HashMap::new(),
background_job_results_tx: bg_tx,
background_job_results_rx: bg_rx,
persona_cache: crate::cache::PersonaCache::default(),
session_recovery: crate::services::SessionRecovery::new(),
http_client,
clock,
rng,
}
}
pub async fn log_on(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
if self.steam_id.is_some() {
return Err(SteamError::AlreadyLoggedOn);
}
if self.connecting {
return Err(SteamError::AlreadyConnecting);
}
self.connecting = true;
match self.execute_logon(details).await {
Ok(response) => Ok(response),
Err(e) => {
match &e {
SteamError::SteamResult(steam_enums::EResult::AccountLoginDeniedThrottle) => {
crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
}
SteamError::SteamResult(steam_enums::EResult::ServiceUnavailable) => {
crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "ServiceUnavailable");
}
SteamError::SteamResult(steam_enums::EResult::TryAnotherCM) => {
crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(5), "TryAnotherCM");
}
_ => {}
}
self.connecting = false;
Err(e)
}
}
}
async fn execute_logon(&mut self, details: LogOnDetails) -> Result<LogOnResponse, SteamError> {
self.logging_off = false;
info!("Starting Steam login...");
crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::CMConnection).await;
self.logon_details = Some(details.clone());
let is_web_logon = details.web_logon_token.is_some() && details.steam_id.is_some();
let is_anonymous = !is_web_logon && details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none() && !is_web_logon);
let mut temp_sid = SteamID::new();
temp_sid.universe = steamid::Universe::Public;
if is_web_logon {
info!("Logging in with web logon token");
if let Some(sid) = details.steam_id {
temp_sid = sid;
}
temp_sid.account_type = steamid::AccountType::Individual;
} else if is_anonymous {
info!("Logging in anonymously");
temp_sid.account_type = steamid::AccountType::AnonUser;
} else if let Some(ref token) = details.refresh_token {
info!("Logging in with refresh token");
match LoginSession::from_refresh_token(token.clone()) {
Ok(session) => {
if let Some(sid) = session.steam_id().cloned() {
temp_sid = sid;
}
self.login_session = Some(session);
}
Err(e) => {
return Err(SteamError::InvalidToken(e.to_string()));
}
}
} else if details.account_name.is_some() && details.password.is_some() {
info!("Logging in with account name and password");
temp_sid.account_type = steamid::AccountType::Individual;
}
self.temp_steam_id = Some(temp_sid);
let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
let server = provider.get_server().await?;
info!("Connecting to CM server: {}", server.endpoint);
let start_time = Instant::now();
let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
debug!("WebSocket connection established in {}ms", start_time.elapsed().as_millis());
self.session_id = self.rng.gen_i32().abs();
let skip_hello = details.refresh_token.is_some() || details.web_logon_token.is_some();
if !skip_hello {
let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
connection.send(hello_msg.encode()).await?;
debug!("Sent ClientHello");
}
let logon = self.build_logon_message(&details, is_anonymous);
let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
connection.send(logon_msg.encode()).await?;
debug!("Sent ClientLogon");
let response = self.wait_for_logon_response(&mut *connection).await?;
self.connection = Some(connection);
self.connecting = false;
let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
if eresult != EResult::OK {
self.steam_id = None;
return Err(SteamError::SteamResult(eresult));
}
let steam_id = if let Some(sid) = response.client_supplied_steamid { SteamID::from(sid) } else { self.temp_steam_id.unwrap_or_default() };
self.steam_id = Some(steam_id);
self.cell_id = response.cell_id;
self.vanity_url = response.vanity_url.clone();
if let Some(ref ip) = response.public_ip {
if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
self.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
}
}
if let Some(secs) = response.heartbeat_seconds {
if secs > 0 {
self.heartbeat_manager.set_interval(secs as u64);
}
}
self.heartbeat_manager.reset();
let username = details.account_name.as_deref().unwrap_or("Unknown");
info!("Logged on as steam_id: {}, name: {}, username: {}", steam_id.steam_id64(), "Unknown", username);
Ok(LogOnResponse {
eresult,
steam_id,
public_ip: self.public_ip.clone(),
cell_id: self.cell_id.unwrap_or(0),
vanity_url: self.vanity_url.clone(),
email_domain: response.email_domain.clone(),
steam_guard_required: false,
heartbeat_seconds: response.heartbeat_seconds,
server_time: response.rtime32_server_time,
account_flags: response.account_flags,
user_country: response.user_country.clone(),
ip_country_code: response.ip_country_code.clone(),
client_instance_id: response.client_instance_id,
token_id: response.token_id,
family_group_id: response.family_group_id,
eresult_extended: response.eresult_extended,
cell_id_ping_threshold: response.cell_id_ping_threshold,
force_client_update_check: response.force_client_update_check,
agreement_session_url: response.agreement_session_url.clone(),
legacy_out_of_game_heartbeat_seconds: response.legacy_out_of_game_heartbeat_seconds,
parental_settings: response.parental_settings.clone(),
parental_setting_signature: response.parental_setting_signature.clone(),
count_loginfailures_to_migrate: response.count_loginfailures_to_migrate,
count_disconnects_to_migrate: response.count_disconnects_to_migrate,
ogs_data_report_time_window: response.ogs_data_report_time_window,
steam2_ticket: response.steam2_ticket.clone(),
})
}
fn build_logon_message(&self, details: &LogOnDetails, is_anonymous: bool) -> CMsgClientLogon {
use crate::protocol::messages::{build_client_logon, LogonConfig};
let machine_name = if details.machine_name.is_some() {
details.machine_name.clone()
} else if !is_anonymous {
Some(format!("DESKTOP-{:06}", self.rng.gen_u32() % 1000000))
} else {
None
};
let identifier = if let Some(ref name) = details.account_name { Some(name.clone()) } else { self.temp_steam_id.as_ref().map(|sid| sid.steam_id64().to_string()) };
let config = LogonConfig::new().with_cell_id(self.cell_id).with_machine_name(machine_name).with_identifier(identifier);
build_client_logon(&config, details, is_anonymous)
}
async fn wait_for_logon_response(&self, connection: &mut dyn SteamConnection) -> Result<CMsgClientLogonResponse, SteamError> {
use std::io::Read;
use flate2::read::GzDecoder;
for _ in 0..LOGON_TIMEOUT_ITERATIONS {
match connection.recv().await? {
Some(data) => {
let msg = SteamMessage::decode_from_bytes(&data)?;
debug!("Received message: {:?}", msg.msg);
if msg.msg == EMsg::ClientLogOnResponse {
return msg.decode_body::<CMsgClientLogonResponse>();
}
if msg.msg == EMsg::Multi {
if let Ok(multi) = msg.decode_body::<steam_protos::CMsgMulti>() {
let body = multi.message_body.unwrap_or_default();
let payload = if multi.size_unzipped.unwrap_or(0) > 0 {
let decompressed_result = tokio::task::spawn_blocking(move || {
let mut decoder = GzDecoder::new(&body[..]);
let mut decompressed = Vec::new();
if decoder.read_to_end(&mut decompressed).is_ok() {
Some(decompressed)
} else {
None
}
})
.await
.map_err(|e| SteamError::Other(format!("Task join error: {}", e)))?;
if let Some(decompressed) = decompressed_result {
decompressed
} else {
tracing::error!("Failed to decompress Multi message");
continue;
}
} else {
body
};
let mut offset = 0;
while offset + 4 <= payload.len() {
let sub_size = u32::from_le_bytes([payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3]]) as usize;
offset += 4;
if offset + sub_size > payload.len() {
break;
}
let sub_data = &payload[offset..offset + sub_size];
if let Ok(sub_msg) = SteamMessage::decode_from_bytes(sub_data) {
debug!("Received sub-message inside Multi: {:?}", sub_msg.msg);
if sub_msg.msg == EMsg::ClientLogOnResponse {
return sub_msg.decode_body::<CMsgClientLogonResponse>();
}
}
offset += sub_size;
}
}
}
}
None => {
return Err(SteamError::ConnectionError("Connection closed by server during login".into()));
}
}
self.clock.sleep(Duration::from_millis(LOGON_TIMEOUT_DELAY_MS)).await;
}
Err(SteamError::Timeout)
}
fn create_proto_message<T: Message>(&self, msg: EMsg, body: &T) -> SteamMessage {
use crate::protocol::messages::{build_proto_header, create_steam_message};
let steam_id = self.temp_steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0);
let header = build_proto_header(self.session_id, steam_id, u64::MAX, u64::MAX);
create_steam_message(msg, header, body)
}
pub async fn log_off(&mut self) -> Result<(), SteamError> {
if self.steam_id.is_none() {
return Err(SteamError::NotLoggedOn);
}
info!("Logging off from Steam");
self.logging_off = true;
let logoff = CMsgClientLogOff::default();
let msg = self.create_proto_message(EMsg::ClientLogOff, &logoff);
if let Some(ref mut conn) = self.connection {
let _ = conn.send(msg.encode()).await;
}
if let Some(conn) = self.connection.take() {
conn.close().await?;
}
self.steam_id = None;
self.public_ip = None;
self.cell_id = None;
self.account_info = None;
self.email_info = None;
self.limitations = None;
self.vac = None;
self.wallet = None;
self.my_friends.clear();
self.logon_details = None;
self.session_id = 0;
self.logging_off = false;
Ok(())
}
pub async fn log_on_with_password(&mut self, account_name: &str, password: &str, steam_guard_code: Option<&str>, machine_auth_token: Option<&str>) -> Result<LogOnResponse, SteamError> {
if self.steam_id.is_some() {
return Err(SteamError::AlreadyLoggedOn);
}
if self.connecting {
return Err(SteamError::AlreadyConnecting);
}
info!("Starting password authentication for {}", account_name);
crate::internal::limiter::wait_for_permit(crate::internal::limiter::LoginType::WebAuth).await;
let mut session = LoginSession::new(EAuthTokenPlatformType::KEAuthTokenPlatformTypeSteamClient, Some(LoginSessionOptions { machine_friendly_name: Some(format!("DESKTOP-{}", self.rng.gen_u32() % 1000000)), ..Default::default() }));
let credentials = CredentialsDetails {
account_name: account_name.to_string(),
password: password.to_string(),
persistence: None,
steam_guard_machine_token: machine_auth_token.map(|s| s.to_string()),
steam_guard_code: steam_guard_code.map(|s| s.to_string()),
};
let start_result = match session.start_with_credentials(credentials).await {
Ok(res) => res,
Err(e) => {
if let steam_auth::SessionError::SteamError(_, steam_enums::EResult::AccountLoginDeniedThrottle) = e {
crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "AccountLoginDeniedThrottle");
}
return Err(SteamError::SessionError(e));
}
};
if start_result.action_required {
if let Some(ref actions) = start_result.valid_actions {
let action = actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeEmailCode)).or_else(|| actions.iter().find(|a| matches!(a.guard_type, EAuthSessionGuardType::KEAuthSessionGuardTypeDeviceCode))).or_else(|| actions.first());
if let Some(action) = action {
debug!("Steam Guard required: {:?}", action.guard_type);
self.login_session = Some(session);
return Err(SteamError::SteamGuardRequired { guard_type: action.guard_type, email_domain: action.detail.clone() });
}
}
warn!("Action required but no valid actions returned");
return Err(SteamError::InvalidCredentials);
}
self.complete_password_auth(session, account_name).await
}
pub async fn submit_steam_guard_code(&mut self, code: &str) -> Result<LogOnResponse, SteamError> {
let session = self.login_session.take().ok_or_else(|| SteamError::Other("No pending login session".to_string()))?;
let account_name = session.account_name().map(|s| s.to_string()).unwrap_or_default();
let mut session = session;
session.submit_steam_guard_code(code).await?;
self.complete_password_auth(session, &account_name).await
}
async fn complete_password_auth(&mut self, mut session: LoginSession, account_name: &str) -> Result<LogOnResponse, SteamError> {
let poll_result = loop {
match session.poll().await? {
Some(result) => break result,
None => {
let interval = session.poll_interval();
self.clock.sleep(Duration::from_secs_f32(interval)).await;
}
}
};
info!("Password authentication successful, got refresh token");
debug!("Refresh token account: {}", poll_result.account_name);
if let Some(ref _guard_data) = poll_result.new_guard_data {
debug!("Received new Steam Guard machine token");
self.login_session = Some(session);
}
self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::RefreshToken { token: poll_result.refresh_token.clone(), account_name: poll_result.account_name.clone() }));
self.log_on(LogOnDetails {
refresh_token: Some(poll_result.refresh_token),
account_name: Some(account_name.to_string()),
..Default::default()
})
.await
}
pub fn is_logged_in(&self) -> bool {
self.steam_id.is_some()
}
pub fn reset_connecting_state(&mut self) {
self.connecting = false;
}
pub async fn get_web_session(&mut self) -> Result<(String, Vec<String>), SteamError> {
if self.steam_id.is_none() {
return Err(SteamError::NotLoggedOn);
}
let session = self.login_session.as_mut().ok_or_else(|| SteamError::Other("No login session available. Web session requires refresh token login.".to_string()))?;
let cookies = session.get_web_cookies().await.map_err(|e| SteamError::Other(format!("Failed to get web cookies: {}", e)))?;
let session_id = cookies.iter().find(|c| c.starts_with("sessionid=")).and_then(|c| c.strip_prefix("sessionid=")).and_then(|c| c.split(';').next()).map(|s| s.to_string()).unwrap_or_else(|| {
format!("{:x}", self.rng.gen_u64())
});
self.event_queue.push_back(super::events::SteamEvent::Auth(super::events::AuthEvent::WebSession { session_id: session_id.clone(), cookies: cookies.clone() }));
Ok((session_id, cookies))
}
pub(crate) async fn send_binary_message(&mut self, msg_type: EMsg, body: &[u8]) -> Result<(), SteamError> {
use crate::protocol::{ExtendedMessageHeader, MessageHeader, SteamMessage};
let header = ExtendedMessageHeader {
header_size: 36,
header_version: 2,
target_job_id: u64::MAX,
source_job_id: u64::MAX,
header_canary: 239,
steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
session_id: self.session_id,
};
let msg = SteamMessage {
msg: msg_type,
is_proto: false,
header: MessageHeader::Extended(header),
body: bytes::Bytes::copy_from_slice(body),
};
if let Some(ref mut conn) = self.connection {
conn.send(msg.encode()).await?;
}
Ok(())
}
pub(crate) async fn send_message<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<(), SteamError> {
use crate::protocol::{ProtobufMessageHeader, SteamMessage};
let header = ProtobufMessageHeader {
header_length: 0,
session_id: self.session_id,
steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
job_id_source: u64::MAX,
job_id_target: u64::MAX,
target_job_name: None,
routing_appid: None,
};
let msg = SteamMessage::new_proto(msg_type, header, body);
if let Some(ref mut conn) = self.connection {
conn.send(msg.encode()).await?;
}
Ok(())
}
pub(crate) async fn send_message_with_routing<T: Message>(&mut self, msg_type: EMsg, routing_appid: u32, body: &T) -> Result<(), SteamError> {
use crate::protocol::{ProtobufMessageHeader, SteamMessage};
let header = ProtobufMessageHeader {
header_length: 0,
session_id: self.session_id,
steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
job_id_source: u64::MAX,
job_id_target: u64::MAX,
target_job_name: None,
routing_appid: Some(routing_appid),
};
let msg = SteamMessage::new_proto(msg_type, header, body);
if let Some(ref mut conn) = self.connection {
conn.send(msg.encode()).await?;
}
Ok(())
}
pub(crate) async fn send_message_with_job<T: Message>(&mut self, msg_type: EMsg, body: &T) -> Result<tokio::sync::oneshot::Receiver<crate::internal::jobs::JobResponse>, SteamError> {
use crate::protocol::{ProtobufMessageHeader, SteamMessage};
let (job_id, response_rx) = self.job_manager.create_job().await;
info!("[SteamClient] send_message_with_job: Created JobID={} for EMsg::{:?}", job_id, msg_type);
let header = ProtobufMessageHeader {
header_length: 0,
session_id: self.session_id,
steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
job_id_source: job_id,
job_id_target: u64::MAX,
target_job_name: None,
routing_appid: None,
};
let msg = SteamMessage::new_proto(msg_type, header, body);
if let Some(ref mut conn) = self.connection {
conn.send(msg.encode()).await?;
}
Ok(response_rx)
}
async fn process_queued_message(&mut self, msg: QueuedMessage) {
info!("[SteamClient] process_queued_message: Processing message to {}", msg.friend);
let processed_message = if msg.contains_bbcode { msg.message.replace('[', "\\[") } else { msg.message.clone() };
debug!("[SteamClient] process_queued_message: Original len={}, Processed len={}", msg.message.len(), processed_message.len());
let request = steam_protos::CFriendMessagesSendMessageRequest {
steamid: Some(msg.friend.steam_id64()),
chat_entry_type: Some(msg.entry_type as i32),
message: Some(processed_message),
contains_bbcode: Some(msg.contains_bbcode),
..Default::default()
};
info!("[SteamClient] process_queued_message: Sending FriendMessages.SendMessage#1 to Steam");
match self.send_service_method_with_job("FriendMessages.SendMessage#1", &request).await {
Ok(rx) => {
info!("[SteamClient] process_queued_message: Request sent, waiting for response...");
let friend_id = msg.friend;
let original_message = msg.message.clone();
self.chat_tasks.spawn(
async move {
const CHAT_TIMEOUT_SECS: u64 = 30;
debug!("[SteamClient] chat_task: Waiting for response (timeout: {}s)", CHAT_TIMEOUT_SECS);
let res = match tokio::time::timeout(std::time::Duration::from_secs(CHAT_TIMEOUT_SECS), rx).await {
Ok(Ok(crate::internal::jobs::JobResponse::Success(body))) => {
info!("[SteamClient] chat_task: Received Success response, body len={}", body.len());
steam_protos::CFriendMessagesSendMessageResponse::decode(&body[..])
.map(|response| {
info!("[SteamClient] chat_task: Decoded response - server_ts={}, ordinal={}", response.server_timestamp.unwrap_or(0), response.ordinal.unwrap_or(0));
crate::services::chat::SendMessageResult {
modified_message: response.modified_message.unwrap_or(original_message),
server_timestamp: response.server_timestamp.unwrap_or(0),
ordinal: response.ordinal.unwrap_or(0),
}
})
.map_err(|e| {
error!("[SteamClient] chat_task: Failed to decode response: {:?}", e);
SteamError::DeserializationFailed
})
}
Ok(Ok(crate::internal::jobs::JobResponse::Timeout)) => {
error!("[SteamClient] chat_task: Job response timeout (Steam didn't respond)");
Err(SteamError::ResponseTimeout)
}
Ok(Ok(crate::internal::jobs::JobResponse::Error(e))) => {
error!("[SteamClient] chat_task: Job response error: {}", e);
Err(SteamError::ProtocolError(e))
}
Ok(Err(e)) => {
error!("[SteamClient] chat_task: Job channel closed: {:?}", e);
Err(SteamError::Other("Job response channel closed".into()))
}
Err(_) => {
error!("[SteamClient] chat_task: Tokio timeout elapsed after {}s", CHAT_TIMEOUT_SECS);
Err(SteamError::ResponseTimeout)
}
};
match &res {
Ok(r) => info!("[SteamClient] chat_task: Final result SUCCESS - ts={}", r.server_timestamp),
Err(e) => error!("[SteamClient] chat_task: Final result ERROR - {:?}", e),
}
(msg, res)
}
.instrument(info_span!("chat_send", friend = %friend_id)),
);
}
Err(e) => {
error!("[SteamClient] process_queued_message: Failed to send request to Steam: {:?}", e);
let _ = msg.respond_to.send(Err(e));
}
}
}
pub async fn send_unified_request_and_wait<T, R>(&mut self, method: &str, request: &T) -> Result<R, SteamError>
where
T: prost::Message,
R: prost::Message + Default,
{
use crate::internal::jobs::JobResponse;
let response_rx = self.send_service_method_with_job(method, request).await?;
match response_rx.await {
Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
Err(_) => Err(SteamError::Other("Job response channel closed".into())),
}
}
pub async fn send_request_and_wait<T, R>(&mut self, emsg: EMsg, request: &T) -> Result<R, SteamError>
where
T: prost::Message,
R: prost::Message + Default,
{
use crate::internal::jobs::JobResponse;
let response_rx = self.send_message_with_job(emsg, request).await?;
match response_rx.await {
Ok(JobResponse::Success(body)) => R::decode(&body[..]).map_err(|e| SteamError::ProtocolError(format!("Failed to decode response: {}", e))),
Ok(JobResponse::Timeout) => Err(SteamError::Timeout),
Ok(JobResponse::Error(e)) => Err(SteamError::Other(e)),
Err(_) => Err(SteamError::Other("Job response channel closed".into())),
}
}
pub async fn poll_event(&mut self) -> Result<Option<super::events::SteamEvent>, SteamError> {
use super::events::{ConnectionEvent, SteamEvent};
if let Some(mut event) = self.event_queue.pop_front() {
self.handle_event(&mut event);
self.post_process_event(&event).await;
return Ok(Some(event));
}
self.job_manager.cleanup_expired().await;
if self.reconnect_manager.is_reconnecting() {
if let Some(attempt) = self.reconnect_manager.check_ready() {
let delay = self.reconnect_manager.current_delay();
let max_attempts = self.reconnect_manager.max_attempts();
self.event_queue.push_back(SteamEvent::Connection(ConnectionEvent::ReconnectAttempt { attempt, max_attempts, delay }));
match self.attempt_reconnect().await {
Ok(()) => {
self.reconnect_manager.record_success();
info!("Reconnection successful");
if let Err(e) = self.restore_session_state().await {
warn!("Failed to restore session state: {}", e);
}
}
Err(e) => {
warn!("Reconnection attempt {} failed: {:?}", attempt, e);
let server = self.last_cm_server.take();
let should_continue = self.reconnect_manager.record_failure(server.as_ref());
if !should_continue {
let reason = self.reconnect_manager.last_disconnect_reason();
let attempts = self.reconnect_manager.attempt();
return Ok(Some(SteamEvent::Connection(ConnectionEvent::ReconnectFailed { reason, attempts })));
}
}
}
} else if let Some(wait_time) = self.reconnect_manager.time_until_next_attempt() {
let sleep_time = wait_time.min(Duration::from_millis(100));
self.clock.sleep(sleep_time).await;
}
}
if let Some(mut event) = self.event_queue.pop_front() {
self.handle_event(&mut event);
self.post_process_event(&event).await;
return Ok(Some(event));
}
loop {
if let Some(msg) = self.pending_retry_message.take() {
let rate_limited = self.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
if !rate_limited {
self.process_queued_message(msg).await;
} else {
self.pending_retry_message = Some(msg);
}
}
let rate_limited = self.rate_limit_until.map(|t| t > self.clock.now()).unwrap_or(false);
let can_process_queue = !rate_limited && self.pending_retry_message.is_none();
if self.connection.is_none() {
return Ok(None);
}
let now = self.clock.now();
let hb_timeout = self.heartbeat_manager.time_until_next_heartbeat(now);
if let Some(timeout) = hb_timeout {
if timeout == Duration::ZERO {
debug!("Sending heartbeat");
let msg = crate::internal::heartbeat::HeartbeatManager::build_heartbeat_message(self.session_id, self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0));
let result = {
if let Some(conn) = self.connection.as_mut() {
conn.send(msg.encode()).await
} else {
Err(SteamError::NotConnected)
}
};
if let Err(e) = result {
warn!("Failed to send heartbeat: {}", e);
let reason = match &e {
SteamError::SteamResult(r) => Some(*r),
_ => Some(EResult::NoConnection),
};
return self.handle_connection_close(reason);
}
self.heartbeat_manager.record_heartbeat(self.clock.now());
continue;
}
}
enum PollResult {
Packet(Option<bytes::Bytes>),
Queue(QueuedMessage),
ChatResponse(QueuedMessage, Result<crate::services::chat::SendMessageResult, SteamError>),
BackgroundResult(u64, Result<Bytes, SteamError>),
Timeout,
}
let poll_res: Result<PollResult, SteamError> = {
let conn = self.connection.as_mut().unwrap();
let queue = &mut self.chat_queue_rx;
let recv_fut = conn.recv();
let rate_limit_timeout = self.rate_limit_until.map(|t| {
let now = self.clock.now();
if t > now {
t - now
} else {
Duration::ZERO
}
});
let sleep_duration = match (hb_timeout, rate_limit_timeout) {
(Some(hb), Some(rl)) => Some(hb.min(rl)),
(Some(hb), None) => Some(hb),
(None, Some(rl)) => Some(rl),
(None, None) => None,
};
tokio::select! {
res = recv_fut => {
Ok(PollResult::Packet(res?))
}
Some(msg) = queue.recv(), if can_process_queue => {
Ok(PollResult::Queue(msg))
}
Some(join_result) = self.chat_tasks.join_next() => {
match join_result {
Ok((msg, res)) => Ok(PollResult::ChatResponse(msg, res)),
Err(e) => {
warn!("Chat task failed: {:?}", e);
Ok(PollResult::Timeout) }
}
}
res = self.background_job_results_rx.recv() => {
match res {
Some((job_id, result)) => Ok(PollResult::BackgroundResult(job_id, result)),
None => Ok(PollResult::Timeout),
}
}
_ = async {
if let Some(d) = sleep_duration {
tokio::time::sleep(d).await;
} else {
std::future::pending::<()>().await;
}
} => {
Ok(PollResult::Timeout)
}
}
};
match poll_res {
Ok(PollResult::Packet(Some(data))) => {
let messages = super::events::MessageHandler::decode_packet(&data);
for decoded in messages {
if let Some(job_id) = decoded.job_id_target {
info!("[SteamClient] poll_event: Completing job {} via job_manager", job_id);
self.job_manager.complete_job_success(job_id, decoded.body).await;
}
for event in &decoded.events {
if let super::events::SteamEvent::Apps(super::events::AppsEvent::GCReceived(gc_msg)) = event {
self.gc_jobs.try_complete(gc_msg.appid, gc_msg.msg_type, gc_msg.payload.clone());
}
}
self.event_queue.extend(decoded.events);
}
if let Some(mut event) = self.event_queue.pop_front() {
self.handle_event(&mut event);
self.post_process_event(&event).await;
return Ok(Some(event));
}
}
Ok(PollResult::Packet(None)) => {
return self.handle_connection_close(None);
}
Ok(PollResult::Queue(msg)) => {
info!("[SteamClient] poll_event: Dequeued message from chat_queue_rx for {}", msg.friend);
self.process_queued_message(msg).await;
}
Ok(PollResult::ChatResponse(msg, result)) => {
info!("[SteamClient] poll_event: Received ChatResponse for {}", msg.friend);
match result {
Ok(res) => {
info!("[SteamClient] poll_event: ChatResponse SUCCESS - server_ts={}", res.server_timestamp);
let _ = msg.respond_to.send(Ok(res));
}
Err(SteamError::SteamResult(steam_enums::EResult::RateLimitExceeded)) => {
warn!("[SteamClient] poll_event: Rate limit exceeded for chat message, backing off for 60 seconds");
crate::internal::limiter::penalize_abuse(std::time::Duration::from_secs(60), "RateLimitExceeded");
self.rate_limit_until = Some(self.clock.now() + std::time::Duration::from_secs(60));
self.pending_retry_message = Some(msg);
}
Err(e) => {
error!("[SteamClient] poll_event: ChatResponse ERROR - {:?}", e);
let _ = msg.respond_to.send(Err(e));
}
}
}
Ok(PollResult::BackgroundResult(job_id, result)) => {
debug!("[SteamClient] poll_event: Received BackgroundResult for job {}", job_id);
if let Some(task) = self.background_job_tasks.remove(&job_id) {
match task {
BackgroundTask::FriendsList => match result {
Ok(body) => {
debug!("[SteamClient] poll_event: Decoding FriendsList response (len={})", body.len());
match steam_protos::CFriendsListGetFriendsListResponse::decode(&body[..]) {
Ok(response) => {
debug!("[SteamClient] poll_event: Successfully decoded FriendsList, processing...");
self.handle_friends_list_unified_response(response).await;
}
Err(e) => {
error!("[SteamClient] poll_event: Failed to decode FriendsList response: {:?}", e);
}
}
}
Err(e) => {
error!("[SteamClient] poll_event: Background FriendsList job failed: {:?}", e);
}
},
BackgroundTask::OfflineMessages(friend_id) => match result {
Ok(body) => {
debug!("[SteamClient] poll_event: Decoding OfflineMessages response for {} (len={})", friend_id, body.len());
match steam_protos::CFriendMessagesGetRecentMessagesResponse::decode(&body[..]) {
Ok(response) => {
debug!("[SteamClient] poll_event: Successfully decoded OfflineMessages, processing...");
let last_view_timestamp = self.chat_last_view.get(&friend_id).copied().unwrap_or(0);
let messages = response
.messages
.into_iter()
.map(|m| crate::services::chat::HistoryMessage {
sender: SteamID::from_steam_id64(m.accountid.unwrap_or(0) as u64),
timestamp: m.timestamp.unwrap_or(0),
ordinal: m.ordinal.unwrap_or(0),
message: m.message.unwrap_or_default(),
unread: m.timestamp.unwrap_or(0) > last_view_timestamp,
})
.collect();
self.event_queue.push_back(crate::client::events::SteamEvent::Chat(crate::client::events::ChatEvent::OfflineMessagesFetched { friend_id, messages }));
}
Err(e) => {
error!("[SteamClient] poll_event: Failed to decode OfflineMessages response: {:?}", e);
}
}
}
Err(e) => {
error!("[SteamClient] poll_event: Background OfflineMessages job failed for {}: {:?}", friend_id, e);
}
},
}
if let Some(mut event) = self.event_queue.pop_front() {
self.handle_event(&mut event);
self.post_process_event(&event).await;
return Ok(Some(event));
}
} else {
warn!("[SteamClient] poll_event: Received BackgroundResult for unknown job {}", job_id);
}
}
Ok(PollResult::Timeout) => {
}
Err(e) => {
let reason = match &e {
SteamError::SteamResult(r) => Some(*r),
_ => Some(EResult::NoConnection),
};
return self.handle_connection_close(reason);
}
}
}
}
fn handle_connection_close(&mut self, reason: Option<EResult>) -> Result<Option<super::events::SteamEvent>, SteamError> {
use super::events::{ConnectionEvent, SteamEvent};
debug!("Handling connection close, reason: {:?}", reason);
self.connection = None;
self.connecting = false;
let should_reconnect = !self.logging_off && self.options.auto_relogin && reason.map(|r| self.reconnect_manager.should_reconnect(r)).unwrap_or(true);
if should_reconnect && self.logon_details.is_some() {
let eresult = reason.unwrap_or(EResult::NoConnection);
self.reconnect_manager.start_reconnection(eresult);
if let Some(ref server) = self.last_cm_server {
self.reconnect_manager.blacklist_server(&server.endpoint);
}
info!("Connection lost, will attempt reconnection");
} else {
self.steam_id = None;
self.reconnect_manager.reset();
self.relogging = false;
self.logging_off = false;
}
Ok(Some(SteamEvent::Connection(ConnectionEvent::Disconnected { reason, will_reconnect: should_reconnect })))
}
async fn restore_session_state(&mut self) -> Result<(), SteamError> {
info!("Restoring session state...");
let app_ids = self.session_recovery.last_playing_app_ids.clone();
if !app_ids.is_empty() {
debug!("Restoring games played: {:?}", app_ids);
self.games_played_with_extra(app_ids, self.session_recovery.last_custom_game_name.clone()).await?;
}
if let Some(state) = self.session_recovery.last_persona_state {
debug!("Restoring persona state: {:?}", state);
self.set_persona(state, self.session_recovery.last_player_name.clone()).await?;
}
let rp_data = self.session_recovery.last_rich_presence.clone();
for (app_id, data) in rp_data {
debug!("Restoring rich presence for app {}", app_id);
self.upload_rich_presence(app_id, &data).await?;
}
Ok(())
}
async fn attempt_reconnect(&mut self) -> Result<(), SteamError> {
let details = self.logon_details.clone().ok_or_else(|| SteamError::Other("No logon details available for reconnection".to_string()))?;
debug!("Attempting reconnection with stored credentials");
let provider = HttpCmServerProvider::new(self.http_client.clone(), self.rng.clone(), Arc::new(steam_cm_provider::RealConnectivityChecker));
let server = provider.get_server().await?;
self.last_cm_server = Some(server.clone());
info!("Reconnecting to CM server: {}", server.endpoint);
let mut connection: Box<dyn SteamConnection> = Box::new(WebSocketConnection::connect(server).await?);
debug!("WebSocket connection established");
self.session_id = self.rng.gen_i32().abs();
if details.refresh_token.is_none() {
let hello = CMsgClientHello { protocol_version: Some(PROTOCOL_VERSION) };
let hello_msg = self.create_proto_message(EMsg::ClientHello, &hello);
connection.send(hello_msg.encode()).await?;
}
let is_anonymous = details.anonymous || (details.account_name.is_none() && details.refresh_token.is_none());
let logon = self.build_logon_message(&details, is_anonymous);
let logon_msg = self.create_proto_message(EMsg::ClientLogon, &logon);
connection.send(logon_msg.encode()).await?;
let response = self.wait_for_logon_response(&mut *connection).await?;
let eresult = EResult::from_i32(response.eresult.unwrap_or(2)).unwrap_or(EResult::Fail);
if eresult != EResult::OK {
return Err(SteamError::SteamResult(eresult));
}
self.connection = Some(connection);
self.connecting = false;
if let Some(sid) = response.client_supplied_steamid {
self.steam_id = Some(SteamID::from(sid));
}
self.cell_id = response.cell_id;
self.vanity_url = response.vanity_url.clone();
if let Some(ref ip) = response.public_ip {
if let Some(steam_protos::cmsg_ip_address::Ip::V4(v4)) = &ip.ip {
self.public_ip = Some(format!("{}.{}.{}.{}", (v4 >> 24) & 0xFF, (v4 >> 16) & 0xFF, (v4 >> 8) & 0xFF, v4 & 0xFF));
}
}
info!("Successfully reconnected");
Ok(())
}
pub async fn poll_events(&mut self) -> Result<Vec<super::events::SteamEvent>, SteamError> {
let mut all_events = Vec::new();
while let Some(event) = self.poll_event().await? {
all_events.push(event);
if self.event_queue.is_empty() {
break;
}
}
Ok(all_events)
}
fn handle_event(&mut self, event: &mut super::events::SteamEvent) {
use super::events::{AppsEvent, AuthEvent, ConnectionEvent, FriendsEvent, SteamEvent};
match event {
SteamEvent::Auth(AuthEvent::LoggedOn { steam_id }) => {
self.steam_id = Some(*steam_id);
self.connecting = false;
self.reconnect_manager.record_success();
}
SteamEvent::Auth(AuthEvent::LoggedOff { .. }) => {
self.steam_id = None;
}
SteamEvent::Auth(AuthEvent::GameConnectTokens { tokens }) => {
self.gc_tokens.extend(tokens.clone());
}
SteamEvent::Friends(FriendsEvent::FriendsList { friends, .. }) => {
for friend in friends {
if friend.relationship == steam_enums::EFriendRelationship::None {
self.my_friends.remove(&friend.steam_id);
} else {
self.my_friends.insert(friend.steam_id, friend.relationship as u32);
}
}
}
SteamEvent::Friends(FriendsEvent::PersonaState(persona)) => {
let mut persona = *persona.clone();
if let Some(game_id) = persona.game_id {
let app_id = game_id as u32;
if app_id != 0 {
if !self.apps.contains_key(&app_id) {
self.pending_apps.insert(app_id);
} else if persona.game_name.is_none() || persona.game_name.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
if let Some(name) = self.get_app_name(app_id) {
persona.game_name = Some(name);
}
}
}
}
self.users.entry(persona.steam_id).and_modify(|existing| existing.merge(&persona)).or_insert_with(|| persona.clone());
self.persona_cache.insert(persona.steam_id, persona);
}
SteamEvent::Apps(AppsEvent::PlayingState { playing_app, blocked }) => {
self.playing_blocked = *blocked;
if *playing_app != 0 {
if !self.apps.contains_key(playing_app) {
self.pending_apps.insert(*playing_app);
}
self.apps.entry(*playing_app).or_insert_with(|| super::events::AppInfoData { app_id: *playing_app, change_number: 0, missing_token: false, app_info: None });
}
}
SteamEvent::Apps(AppsEvent::ProductInfoResponse { apps, .. }) => {
for (app_id, data) in apps {
self.apps.insert(*app_id, data.clone());
self.pending_apps.remove(app_id);
}
}
SteamEvent::Connection(ConnectionEvent::Disconnected { will_reconnect, .. }) => {
if !*will_reconnect {
self.steam_id = None;
}
self.connection = None;
self.connecting = false;
}
SteamEvent::Connection(ConnectionEvent::ReconnectFailed { .. }) => {
self.steam_id = None;
self.connection = None;
self.connecting = false;
self.reconnect_manager.reset();
}
SteamEvent::Apps(super::events::AppsEvent::LicenseList { licenses }) => {
self.licenses = licenses.clone();
}
_ => {}
}
}
async fn post_process_event(&mut self, event: &super::events::SteamEvent) {
use super::events::{FriendsEvent, SteamEvent};
if let SteamEvent::Friends(FriendsEvent::FriendsList { incremental: false, .. }) = event {
let friends: Vec<SteamID> = self.my_friends.iter().filter(|&(_, &rel)| rel == steam_enums::EFriendRelationship::Friend as u32).map(|(&id, _)| id).collect();
if !friends.is_empty() {
debug!("Auto-fetching personas for {} friends", friends.len());
if let Err(e) = self.get_personas(friends).await {
warn!("Failed to auto-fetch personas: {}", e);
}
}
}
}
pub fn format_rich_presence(&self, steam_id: SteamID, language: &str) -> Option<String> {
let user = self.users.get(&steam_id)?;
let app_id = user.game_id? as u32;
let app_info = self.apps.get(&app_id)?;
let app_vdf = app_info.app_info.as_ref()?;
let localization = app_vdf.get("common").and_then(|c| c.get("rich_presence")).and_then(|rp| rp.get("localization")).and_then(|loc| loc.get(language))?;
let display_token = user.rich_presence.get("steam_display")?;
let mut formatted = localization.get_str(display_token)?.to_string();
for (key, value) in &user.rich_presence {
if key == "steam_display" {
continue;
}
let placeholder = format!("%{}%", key);
let localized_value = localization.get_str(value).unwrap_or(value);
formatted = formatted.replace(&placeholder, localized_value);
}
Some(formatted)
}
pub fn get_app_name(&self, app_id: u32) -> Option<String> {
if let Ok(idx) = crate::client::static_app_list::APP_LIST.binary_search_by_key(&app_id, |&(id, _)| id) {
return Some(crate::client::static_app_list::APP_LIST[idx].1.to_string());
}
let info = self.apps.get(&app_id)?;
if let Some(vdf) = &info.app_info {
vdf.get("common").and_then(|c| c.get_str("name")).or_else(|| vdf.get_str("name")).map(|s| s.to_string())
} else {
None
}
}
pub async fn fetch_pending_app_info(&mut self) -> Result<(), SteamError> {
if self.pending_apps.is_empty() {
return Ok(());
}
let app_ids: Vec<u32> = self.pending_apps.iter().copied().collect();
self.pending_apps.clear();
self.get_product_info(app_ids).await
}
pub async fn update_friend_sessions(&mut self) -> Result<(), SteamError> {
let sessions = self.get_active_friend_sessions().await?;
for session in sessions {
self.users
.entry(session.friend)
.and_modify(|user| {
user.unread_count = session.unread_count;
user.last_message_time = session.time_last_message;
})
.or_insert_with(|| UserPersona {
steam_id: session.friend,
unread_count: session.unread_count,
last_message_time: session.time_last_message,
..Default::default()
});
}
Ok(())
}
pub async fn relog(&mut self) -> Result<(), SteamError> {
if self.steam_id.is_none() {
return Err(SteamError::NotLoggedOn);
}
if self.logon_details.is_none() {
return Err(SteamError::Other("No stored credentials for relog".to_string()));
}
info!("Initiating manual relog");
self.relogging = true;
if let Some(conn) = self.connection.take() {
let _ = conn.close().await;
}
self.reconnect_manager.start_reconnection(EResult::NoConnection);
Ok(())
}
pub fn cancel_reconnect(&mut self) {
self.reconnect_manager.reset();
self.relogging = false;
}
pub fn is_reconnecting(&self) -> bool {
self.reconnect_manager.is_reconnecting()
}
pub fn reconnect_state(&self) -> crate::internal::reconnect::ReconnectState {
self.reconnect_manager.state()
}
pub fn try_poll_event(&mut self) -> Option<super::events::SteamEvent> {
if let Some(mut event) = self.event_queue.pop_front() {
self.handle_event(&mut event);
Some(event)
} else {
None
}
}
pub async fn poll_event_timeout(&mut self, timeout: std::time::Duration) -> Result<Option<super::events::SteamEvent>, SteamError> {
match tokio::time::timeout(timeout, self.poll_event()).await {
Ok(result) => result,
Err(_) => Ok(None), }
}
pub async fn wait_for<F>(&mut self, predicate: F) -> Result<super::events::SteamEvent, SteamError>
where
F: Fn(&super::events::SteamEvent) -> bool,
{
loop {
if let Some(event) = self.poll_event().await? {
if predicate(&event) {
return Ok(event);
}
}
}
}
pub async fn wait_for_timeout<F>(&mut self, predicate: F, timeout: std::time::Duration) -> Result<Option<super::events::SteamEvent>, SteamError>
where
F: Fn(&super::events::SteamEvent) -> bool,
{
match tokio::time::timeout(timeout, self.wait_for(predicate)).await {
Ok(result) => result.map(Some),
Err(_) => Ok(None), }
}
pub fn drain_queued_events(&mut self) -> Vec<super::events::SteamEvent> {
let mut events: Vec<_> = self.event_queue.drain(..).collect();
for event in &mut events {
self.handle_event(event);
}
events
}
pub fn csgo(&mut self) -> crate::services::CSGOClient<'_> {
crate::services::CSGOClient::new(self)
}
pub(crate) async fn send_service_method_and_wait<Req: prost::Message, Resp: prost::Message + Default>(&mut self, method: &str, body: &Req) -> Result<Resp, SteamError> {
let rx = self.send_service_method_with_job(method, body).await?;
let job_response = rx.await.map_err(|_| SteamError::ResponseTimeout)?;
let body = match job_response {
crate::internal::jobs::JobResponse::Success(bytes) => bytes,
crate::internal::jobs::JobResponse::Timeout => return Err(SteamError::ResponseTimeout),
crate::internal::jobs::JobResponse::Error(msg) => return Err(SteamError::ProtocolError(msg)),
};
Resp::decode(&body[..]).map_err(|_| SteamError::DeserializationFailed)
}
}
use std::task::{Context, Poll};
use futures::Stream;
pub struct SteamEventStream<'a> {
client: &'a mut SteamClient,
}
impl SteamClient {
pub(crate) async fn send_service_method_background<T: prost::Message>(&mut self, method: &str, body: &T, task: BackgroundTask) -> Result<(), SteamError> {
let (job_id, rx) = self.job_manager.create_job().await;
self.send_service_method_with_job_id(method, body, job_id).await?;
self.background_job_tasks.insert(job_id, task);
let tx = self.background_job_results_tx.clone();
tokio::spawn(async move {
match rx.await {
Ok(crate::internal::jobs::JobResponse::Success(bytes)) => {
let _ = tx.send((job_id, Ok(bytes))).await;
}
Ok(crate::internal::jobs::JobResponse::Timeout) => {
let _ = tx.send((job_id, Err(SteamError::Timeout))).await;
}
Ok(crate::internal::jobs::JobResponse::Error(e)) => {
let _ = tx.send((job_id, Err(SteamError::Other(e)))).await;
}
Err(_) => {
}
}
});
Ok(())
}
async fn send_service_method_with_job_id<T: prost::Message>(&mut self, method: &str, body: &T, job_id: u64) -> Result<(), SteamError> {
use crate::protocol::{ProtobufMessageHeader, SteamMessage};
let header = ProtobufMessageHeader {
header_length: 0,
session_id: self.session_id,
steam_id: self.steam_id.as_ref().map(|s| s.steam_id64()).unwrap_or(0),
job_id_source: job_id,
job_id_target: u64::MAX,
target_job_name: Some(method.to_string()),
routing_appid: None,
};
let msg = SteamMessage::new_proto(steam_enums::EMsg::ServiceMethodCallFromClient, header, body);
if let Some(ref mut conn) = self.connection {
conn.send(msg.encode()).await?;
} else {
return Err(SteamError::NotConnected);
}
Ok(())
}
pub fn into_stream(&mut self) -> SteamEventStream<'_> {
SteamEventStream { client: self }
}
}
impl<'a> Stream for SteamEventStream<'a> {
type Item = Result<super::events::SteamEvent, SteamError>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(mut event) = self.client.event_queue.pop_front() {
self.client.handle_event(&mut event);
Poll::Ready(Some(Ok(event)))
} else {
Poll::Ready(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_steam_id() -> SteamID {
SteamID::from_steam_id64(76561198000000000)
}
#[test]
fn test_user_persona_merge_rich_presence() {
let mut persona1 = UserPersona {
steam_id: test_steam_id(),
rich_presence: {
let mut map = HashMap::new();
map.insert("status".to_string(), "Online".to_string());
map.insert("game".to_string(), "CS:GO".to_string());
map
},
..Default::default()
};
let persona2 = UserPersona {
steam_id: test_steam_id(),
rich_presence: {
let mut map = HashMap::new();
map.insert("status".to_string(), "In-Game".to_string());
map.insert("party".to_string(), "Open".to_string());
map
},
..Default::default()
};
persona1.merge(&persona2);
assert_eq!(persona1.rich_presence.get("status"), Some(&"In-Game".to_string()));
assert_eq!(persona1.rich_presence.get("game"), Some(&"CS:GO".to_string()));
assert_eq!(persona1.rich_presence.get("party"), Some(&"Open".to_string()));
}
#[test]
fn test_user_persona_merge_unread_count() {
let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 10, last_message_time: 200, ..Default::default() };
persona1.merge(&persona2);
assert_eq!(persona1.unread_count, 10);
assert_eq!(persona1.last_message_time, 200);
}
#[test]
fn test_user_persona_merge_unread_count_zero() {
let mut persona1 = UserPersona { steam_id: test_steam_id(), unread_count: 5, last_message_time: 100, ..Default::default() };
let persona2 = UserPersona { steam_id: test_steam_id(), unread_count: 0, last_message_time: 0, ..Default::default() };
persona1.merge(&persona2);
assert_eq!(persona1.unread_count, 5);
assert_eq!(persona1.last_message_time, 100);
}
}