use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use super::error::RemoteSteamUserError;
use super::health::{self, HealthTransition, NodeOutcome};
const DEFAULT_MAX_RETRIES: usize = 5;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
static ROUND_ROBIN_SEED: AtomicUsize = AtomicUsize::new(0);
fn build_http_client() -> Client {
Client::builder()
.connect_timeout(DEFAULT_CONNECT_TIMEOUT)
.timeout(DEFAULT_TIMEOUT)
.min_tls_version(reqwest::tls::Version::TLS_1_2)
.https_only(true)
.build()
.unwrap_or_else(|e| {
tracing::error!(error = %e, "RemoteSteamUser: failed to build reqwest client; using default client");
Client::new()
})
}
#[derive(Debug, Deserialize)]
struct ApiResponse {
success: bool,
data: Option<serde_json::Value>,
error: Option<String>,
}
#[derive(Debug, Serialize, Clone)]
struct AuthPayload {
cookies: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
access_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
refresh_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
mobile_access_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
identity_secret: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
shared_secret: Option<String>,
}
pub struct RemoteSteamUser {
urls: Vec<String>,
index: AtomicUsize,
max_retries: usize,
http: Client,
auth: AuthPayload,
api_key: Option<String>,
}
impl RemoteSteamUser {
pub fn new(base_url: &str, cookies: &[&str]) -> Self {
Self::with_urls(&[base_url], cookies)
}
pub fn with_urls(urls: &[&str], cookies: &[&str]) -> Self {
let http = build_http_client();
Self {
urls: urls.iter().map(|u| u.trim_end_matches('/').to_string()).collect(),
index: AtomicUsize::new(ROUND_ROBIN_SEED.fetch_add(1, Ordering::Relaxed)),
max_retries: DEFAULT_MAX_RETRIES,
http,
auth: AuthPayload {
cookies: cookies.iter().map(|c| c.to_string()).collect(),
access_token: None,
refresh_token: None,
mobile_access_token: None,
identity_secret: None,
shared_secret: None,
},
api_key: None,
}
}
pub fn set_max_retries(&mut self, max: usize) {
self.max_retries = max;
}
pub fn set_access_token(&mut self, token: String) {
self.auth.access_token = Some(token);
}
pub fn set_refresh_token(&mut self, token: String) {
self.auth.refresh_token = Some(token);
}
pub fn set_mobile_access_token(&mut self, token: String) {
self.auth.mobile_access_token = Some(token);
}
pub fn set_identity_secret(&mut self, secret: String) {
self.auth.identity_secret = Some(secret);
}
pub fn set_shared_secret(&mut self, secret: String) {
self.auth.shared_secret = Some(secret);
}
pub fn set_api_key(&mut self, key: String) {
self.api_key = Some(key);
}
fn next_url(&self) -> &str {
if self.urls.is_empty() {
return "";
}
let eligible = health::eligible_indices(&self.urls);
if eligible.is_empty() {
let idx = self.index.fetch_add(1, Ordering::Relaxed) % self.urls.len();
return &self.urls[idx];
}
let pos = self.index.fetch_add(1, Ordering::Relaxed) % eligible.len();
&self.urls[eligible[pos]]
}
fn record_node(base: &str, outcome: NodeOutcome) {
match health::record(base, outcome) {
HealthTransition::Quarantined { consecutive } => {
tracing::warn!(
url = %base,
consecutive = consecutive,
quarantine_secs = health::QUARANTINE_DURATION.as_secs(),
"proxy node quarantined"
);
}
HealthTransition::Recovered => {
tracing::info!(url = %base, "proxy node recovered");
}
HealthTransition::None => {}
}
}
pub(crate) async fn call(&self, path: &str, extra: serde_json::Value) -> Result<serde_json::Value, RemoteSteamUserError> {
if self.urls.is_empty() {
return Err(RemoteSteamUserError::NoUrls);
}
let mut body = match extra {
serde_json::Value::Object(map) => map,
_ => serde_json::Map::new(),
};
body.insert("auth".to_string(), serde_json::to_value(&self.auth).map_err(RemoteSteamUserError::Json)?);
let body = serde_json::Value::Object(body);
let mut last_error: Option<RemoteSteamUserError> = None;
let backoff_base = Duration::from_millis(250);
let backoff_cap = Duration::from_secs(8);
for attempt in 0..self.max_retries {
if attempt > 0 {
let delay = std::cmp::min(
backoff_base * 2u32.saturating_pow(attempt as u32 - 1),
backoff_cap,
);
tokio::time::sleep(delay).await;
}
let base = self.next_url().to_string();
let url = format!("{}{}", base, path);
tracing::debug!("POST {} (attempt {})", url, attempt + 1);
let mut req = self.http.post(&url).json(&body);
if let Some(ref key) = self.api_key {
req = req.bearer_auth(key);
}
let resp = match req.send().await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Request to {} failed: {}", url, e);
Self::record_node(&base, NodeOutcome::RetryableFault);
last_error = Some(RemoteSteamUserError::Http(e));
continue;
}
};
let status = resp.status();
let text = match resp.text().await {
Ok(t) => t,
Err(e) => {
tracing::warn!("Failed to read response body from {}: {}", url, e);
Self::record_node(&base, NodeOutcome::RetryableFault);
last_error = Some(RemoteSteamUserError::Http(e));
continue;
}
};
if text.is_empty() {
tracing::warn!("Empty response from {}", url);
Self::record_node(&base, NodeOutcome::RetryableFault);
last_error = Some(RemoteSteamUserError::Api { status: status.as_u16(), message: "Empty response".into(), url: url.clone() });
continue;
}
let api_resp: ApiResponse = match serde_json::from_str(&text) {
Ok(r) => r,
Err(e) => {
tracing::warn!("Failed to parse response from {}: {} (status {})", url, e, status);
Self::record_node(&base, NodeOutcome::RetryableFault);
last_error = Some(RemoteSteamUserError::Json(e));
continue;
}
};
if !api_resp.success {
let msg = api_resp.error.clone().unwrap_or_else(|| format!("Unknown error (HTTP {})", status));
let should_retry = if status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS {
true
} else if status.is_client_error() {
false
} else {
let lower = msg.to_lowercase();
lower.contains("timeout")
|| lower.contains("unavailable")
|| lower.contains("503")
|| lower.contains("502")
|| lower.contains("429")
};
let outcome = if is_steam_tunneled_error(&msg) {
NodeOutcome::Success
} else {
NodeOutcome::RetryableFault
};
Self::record_node(&base, outcome);
if should_retry {
tracing::warn!("Retryable failure from {} (HTTP {}): {}", url, status, msg);
last_error = Some(RemoteSteamUserError::Api { status: status.as_u16(), message: msg, url: url.clone() });
continue;
} else {
tracing::warn!("Terminal failure from {} (HTTP {}): {}", url, status, msg);
return Err(RemoteSteamUserError::Api { status: status.as_u16(), message: msg, url: url.clone() });
}
}
Self::record_node(&base, NodeOutcome::Success);
return Ok(api_resp.data.unwrap_or(serde_json::Value::Null));
}
match last_error {
Some(e) => Err(e),
None => Err(RemoteSteamUserError::AllRetriesFailed),
}
}
pub(crate) async fn call_typed<T: serde::de::DeserializeOwned>(&self, path: &str, extra: serde_json::Value) -> Result<T, RemoteSteamUserError> {
let value = self.call(path, extra).await?;
serde_json::from_value(value).map_err(RemoteSteamUserError::Json)
}
pub(crate) async fn call_void(&self, path: &str, extra: serde_json::Value) -> Result<(), RemoteSteamUserError> {
self.call(path, extra).await?;
Ok(())
}
}
fn is_steam_tunneled_error(message: &str) -> bool {
message.to_lowercase().contains("steam error")
}
impl std::fmt::Debug for RemoteSteamUser {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteSteamUser").field("urls", &self.urls).field("max_retries", &self.max_retries).field("cookies_count", &self.auth.cookies.len()).field("has_api_key", &self.api_key.is_some()).finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detects_steam_tunneled_errors() {
assert!(is_steam_tunneled_error("Steam error: HTTP 429 from http://no.url.provided.local/"));
assert!(is_steam_tunneled_error("Steam error: Not logged in"));
assert!(!is_steam_tunneled_error("HTTP 502 Bad Gateway"));
assert!(!is_steam_tunneled_error("connection refused"));
assert!(!is_steam_tunneled_error("service unavailable"));
}
}